zookeeper 实现一个简单的服务注册与发现(C++) 一:与zk保持连接

git:git@github.com:ccx19930930/services_register_and_discovery.git

参考链接:<https://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html&gt;

禁止拷贝基类:

base_class.h:

1 class CUnCopyable 
2 {
3 protected:
4     CUnCopyable() {}
5     ~CUnCopyable() {}
6 private:
7     CUnCopyable(const CUnCopyable&amp;);
8     CUnCopyable&amp; operator=(const CUnCopyable&amp;);
9 };

auto_lock.h

 1 #ifndef _AUTO_LOCK_H_
 2 #define _AUTO_LOCK_H_
 3 
 4 #include &lt;pthread.h&gt;
 5 
 6 
 7 class CAutoMutexLock
 8 {
 9 public:
10     CAutoMutexLock(pthread_mutex_t&amp; mutex)
11         : m_mutex(mutex)
12     {
13         pthread_mutex_lock(&amp;m_mutex);
14     }
15 
16     ~CAutoMutexLock()
17     {
18         pthread_mutex_unlock(&amp;m_mutex);
19     }
20 
21 private:
22     pthread_mutex_t&amp; m_mutex;
23 };
24 
25 #endif

zk_handle.h:

 1 #ifndef _ZK_HANDLE_H_
 2 #define _ZK_HANDLE_H_
 3 
 4 #include &quot;base_class.h&quot;
 5 
 6 #include &lt;set&gt;
 7 #include &lt;string&gt;
 8 
 9 #include &lt;pthread.h&gt;
10 
11 #include &lt;zookeeper.h&gt;
12 #include &lt;zookeeper_log.h&gt;
13 #include &lt;zookeeper.jute.h&gt;
14 
15 using namespace std;
16 
17 typedef void (*OnResetHandle_Fn)();
18 class CZkHandle : public CUnCopyable
19 {
20 private:
21     static pthread_mutex_t m_mutex;
22     static CZkHandle* m_pins;
23     CZkHandle();
24 public:
25     static CZkHandle* GetInstance();
26 
27 public:
28     int ZkInit(const string&amp; host_list, const int time_out);
29     int ZkClose();
30     int ZkExists(const string&amp; path, struct Stat &amp; stat);
31 
32 public:
33     int ZkCreateNode(const string&amp; path, const string&amp; value, bool is_sequential, string&amp; raw_node_name);
34     int ZkDeleteNode(const string&amp; path, const int version = -1);
35 
36 public:
37     int ZkGetChildren(const string&amp; path, set&lt;string&gt;&amp; node_list);
38     int ZkGetNodeInfo(const string&amp; path, string &amp; info, struct Stat&amp; stat);
39 
40     int ZkWgetChildren(const string&amp; path, watcher_fn watcher, set&lt;string&gt;&amp; node_list);
41     int ZkWGetNodeInfo(const string&amp; path, watcher_fn watcher, string&amp; info, struct Stat&amp; stat);
42 
43 public:
44     int ZkSetNodeInfo(const string&amp; path, const string&amp; value);
45 
46 public:
47     int AddResetHandleFn(string type, OnResetHandle_Fn func);
48     int DelResetHandleFn(string type);
49 
50 private:
51     static void ZkInitWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
52     int ResetZkHandle();
53 
54 private:
55     static void* ZkHandleCheckThread(void* param);
56     int ZkHandleCheck();
57     bool IsRunning();
58 
59 private:
60     pthread_t m_handle_check_thread_id;
61     bool m_is_running;
62     zhandle_t* m_zk_handle;
63     string m_host_list;
64     int m_time_out;
65 
66     map&lt;string, OnResetHandle_Fn&gt; m_on_reset_handle_fn_list;
67 };
68 
69 #endif

zk_handle.cpp

![]()![]()```
1 #include "zk_handle.h"
2 #include "auto_lock.h"
3
4 #include <stdio.h>
5
6 #include <sys/prctl.h>
7 #include <unistd.h>
8 #include <pthread.h>
9
10 CZkHandle CZkHandle::m_pins = nullptr;
11 pthread_mutex_t CZkHandle::m_mutex;
12
13 CZkHandle::CZkHandle()
14 : m_handle_check_thread_id(0)
15 , m_zk_handle(nullptr)
16 , m_is_running(false)
17 , m_host_list("")
18 , m_time_out(0)
19 {
20 }
21
22 CZkHandle
CZkHandle::GetInstance()
23 {
24 if (m_pins == nullptr)
25 {
26 CAutoMutexLock auto_lock(m_mutex);
27 if (m_pins == nullptr)
28 {
29 m_pins = new CZkHandle;
30 }
31 }
32 return m_pins;
33 }
34
35 int CZkHandle::ZkInit(const string& host_list, const int time_out)
36 {
37 zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
38 m_host_list = host_list;
39 m_time_out = time_out;
40
41 if (0 == m_handle_check_thread_id)
42 {
43 m_is_running = true;
44 if (0 != pthread_create(&m_handle_check_thread_id, nullptr, CZkHandle::ZkHandleCheckThread, nullptr))
45 {
46 printf("CZkHandle::ZkInit create register check thread fail.\n");
47 return -1;
48 }
49 printf("CZkHandle::ZkInit create register check thread succ.\n");
50 }
51 return 0;
52 }
53
54 int CZkHandle::ZkClose()
55 {
56 if (m_zk_handle)
57 {
58 zookeeper_close(m_zk_handle);
59 m_zk_handle = nullptr;
60 }
61 printf("CZkHandle::ZkClose: zk close.\n");
62 }
63
64 int CZkHandle::ZkExists(const string& path, struct Stat & stat)
65 {
66 int ret_code = zoo_exists(m_zk_handle, path.c_str(), 0, &stat);
67 printf("CZkHandle::ZkExists: [ret=%d]\n", ret_code);
68 if(ZOK == ret_code)
69 {
70 printf("CZkHandle::ZkExists: [path=%s] [czxid=%ld] [mzxid=%ld] [version=%d] [cversion=%d] [child_num=%d]\n",
71 path.c_str(), stat.czxid, stat.mzxid, stat.version, stat.cversion, stat.numChildren);
72 }
73 return ret_code;
74 }
75
76 int CZkHandle::ZkCreateNode(const string& path, const string& value, bool is_sequential, string& raw_node_name)
77 {
78 int ret_code = 0;
79 char tmp_name[kMaxBufferLen];
80 int flag = ZOO_EPHEMERAL;
81 if (is_sequential)
82 {
83 flag |= ZOO_SEQUENCE;
84 }
85 printf("CZkHandle::ZkCreateNode create node [path=%s] [value=%s]\n", path.c_str(), value.c_str());
86
87 ret_code = zoo_create(m_zk_handle, path.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, flag, tmp_name, kMaxBufferLen);
88 if (ZOK != ret_code)
89 {
90 printf("CZkHandle::ZkCreateNode create node fail. ret=%d\n", ret_code);
91 }
92 else
93 {
94 printf("CZkHandle::ZkCreateNode create node succ! path=%s\n", tmp_name);
95 }
96
97 raw_node_name = tmp_name;
98 return ret_code;
99 }
100
101 int CZkHandle::ZkDeleteNode(const string& path, const int version /= -1/)
102 {
103 int ret_code = zoo_delete(m_zk_handle, path.c_str(), version);
104 printf("CZkHandle::ZkDeleteNode delete node path=%s version=%d ret=%d\n", path.c_str(), version, ret_code);
105 return ret_code;
106 }
107
108 int CZkHandle::ZkGetChildren(const string& path, set<string>& node_list)
109 {
110 int ret_code = 0;
111 struct String_vector children_list;
112
113 printf("CZkHandle::ZkGetChildren get children for path=%s\n", path.c_str());
114
115 ret_code = zoo_get_children(m_zk_handle, path.c_str(), 0, &children_list);
116
117 if (ZOK != ret_code)
118 {
119 printf("CZkHandle::ZkGetChildren get children fail. ret=%d\n", ret_code);
120 return ret_code;
121 }
122
123 printf("CZkHandle::ZkGetChildren get children succ. children_num=%d\n", children_list.count);
124
125 for (unsigned int children_idx = 0; children_idx < children_list.count; ++children_idx)
126 {
127 printf("CZkHandle::ZkGetChildren children_idx=%u, children_name=%s\n", children_idx, children_list.data[children_idx]);
128 node_list.insert(children_list.data[children_idx]);
129 }
130
131 deallocate_String_vector(&children_list);
132 return ret_code;
133 }
134
135 int CZkHandle::ZkGetNodeInfo(const string& path, string& info, struct Stat& stat)
136 {
137 int ret_code = 0;
138
139 char buffer[kMaxBufferLen];
140 int buffer_len = kMaxBufferLen;
141
142 printf("CZkHandle::ZkGetNodeInfo get node info for path=%s\n", path.c_str());
143 ret_code = zoo_get(m_zk_handle, path.c_str(), 0, buffer, &buffer_len, &stat);
144
145 if (ZOK != ret_code)
146 {
147 printf("CZkHandle::ZkGetNodeInfo get node info for path=%s fail. ret=%d\n", path.c_str(), ret_code);
148 return ret_code;
149 }
150
151 buffer[buffer_len] = 0;
152 printf("CZkHandle::ZkGetNodeInfo get node info for path=%s succ. buffer=%s\n", path.c_str(), buffer);
153 printf("CZkHandle::ZkGetNodeInfo: [path=%s] [czxid=%ld] [mzxid=%ld] [version=%d] [cversion=%d] [child_num=%d]\n",
154 path.c_str(), stat.czxid, stat.mzxid, stat.version, stat.cversion, stat.numChildren);
155
156 info = buffer;
157
158 return ret_code;
159 }
160
161 int CZkHandle::ZkWgetChildren(const string& path, watcher_fn watcher, set<string>& node_list)
162 {
163 int ret_code = 0;
164 struct String_vector children_list;
165
166 printf("CZkHandle::ZkWgetChildren get children for path=%s\n", path.c_str());
167
168 ret_code = zoo_wget_children(m_zk_handle, path.c_str(), watcher, NULL, &children_list);
169
170 if (ZOK != ret_code)
171 {
172 printf("CZkHandle::ZkWgetChildren get children fail. ret=%d\n", ret_code);
173 return ret_code;
174 }
175
176 printf("CZkHandle::ZkWgetChildren get children succ. children_num=%d\n", children_list.count);
177
178 for (unsigned int children_idx = 0; children_idx < children_list.count; ++children_idx)
179 {
180 printf("CZkHandle::ZkWgetChildren children_idx=%u, children_name=%s\n", children_idx, children_list.data[children_idx]);
181 node_list.insert(children_list.data[children_idx]);
182 }
183
184 deallocate_String_vector(&children_list);
185 return ret_code;
186 }
187
188 int CZkHandle::ZkWGetNodeInfo(const string& path, watcher_fn watcher, string& info, struct Stat& stat)
189 {
190 int ret_code = 0;
191
192 char buffer[kMaxBufferLen];
193 int buffer_len = kMaxBufferLen;
194
195 printf("CZkHandle::ZkWGetNodeInfo get node info for path=%s\n", path.c_str());
196 ret_code = zoo_wget(m_zk_handle, path.c_str(), watcher, NULL, buffer, &buffer_len, &stat);
197
198 if (ZOK != ret_code)
199 {
200 printf("CZkHandle::ZkWGetNodeInfo get node info for path=%s fail. ret=%d\n", path.c_str(), ret_code);
201 return ret_code;
202 }
203
204 buffer[buffer_len] = 0;
205 printf("CZkHandle::ZkWGetNodeInfo get node info for path=%s succ. buffer=%s\n", path.c_str(), buffer);
206 printf("CZkHandle::ZkWGetNodeInfo: [path=%s] [czxid=%ld] [mzxid=%ld] [version=%d] [cversion=%d] [child_num=%d]\n",
207 path.c_str(), stat.czxid, stat.mzxid, stat.version, stat.cversion, stat.numChildren);
208
209 info = buffer;
210
211 return ret_code;
212 }
213
214 int CZkHandle::ZkSetNodeInfo(const string& path, const string& value)
215 {
216 printf("CZkHandle::ZkSetNodeInfo set node info. path=%s value=%s\n", path.c_str(), value.c_str());
217 int ret_code = zoo_set(m_zk_handle, path.c_str(), value.c_str(), value.size(), -1);
218 if (ZOK != ret_code)
219 {
220 printf("CZkHandle::ZkSetNodeInfo set node fail.");
221 }
222 return ret_code;
223 }
224
225 int CZkHandle::AddResetHandleFn(string type, OnResetHandle_Fn func)
226 {
227 CAutoMutexLock auto_lock(m_mutex);
228 m_on_reset_handle_fn_list[type] = func;
229 return 0;
230 }
231
232 int CZkHandle::DelResetHandleFn(string type)
233 {
234 CAutoMutexLock auto_lock(m_mutex);
235 if (m_on_reset_handle_fn_list.count(type))
236 {
237 m_on_reset_handle_fn_list.erase(type);
238 }
239 return 0;
240 }
241
242 void CZkHandle::ZkInitWatcher(zhandle_t zh, int type, int state, const char path, void watcherCtx)
243 {
244 printf("CZkHandle::ZkInitWatchar: [type=%d] [state=%d] [path=%s] [watcher_ctx=%p]\n", type, state, path, watcherCtx);
245 }
246
247 int CZkHandle::ResetZkHandle()
248 {
249 CAutoMutexLock auto_lock(m_mutex);
250 zhandle_t
new_zk_handle = zookeeper_init(m_host_list.c_str(), &ZkInitWatcher, m_time_out, 0, nullptr, 0);
251 if (nullptr == new_zk_handle)
252 {
253 printf("CZkHandle::ResetZkHandle: connect to zk fail.\n");
254 return -1;
255 }
256 printf("CZkHandle::ResetZkHandle: connect to zk succ.\n");
257
258 zhandle_t old_zk_handle = m_zk_handle;
259 m_zk_handle = new_zk_handle;
260
261 for (const auto & func : m_on_reset_handle_fn_list)
262 {
263 func.second();
264 }
265
266 if (old_zk_handle)
267 {
268 zookeeper_close(old_zk_handle);
269 }
270 return 0;
271 }
272
273 void
CZkHandle::ZkHandleCheckThread(void* param)
274 {
275 prctl(PR_SET_NAME, "zk_handle_check");
276 while (true == CZkHandle::GetInstance()->IsRunning())
277 {
278 CZkHandle::GetInstance()->ZkHandleCheck();
279 usleep(kZkHandleIntervalTime);
280 }
281 return nullptr;
282 }
283
284 int CZkHandle::ZkHandleCheck()
285 {
286 struct Stat stat;
287 if (m_zk_handle == nullptr)
288 {
289 ResetZkHandle();
290 }
291 else if(ZOK != ZkExists("/", stat))
292 {
293 ResetZkHandle();
294 }
295 }
296
297 bool CZkHandle::IsRunning()
298 {
299 return m_is_running;
300 }


View Code

zk\_handle\_test main.cpp

1 #include "zk_handle.h"
2
3 #include <unistd.h>
4
5 //伪分布式部署 host list最好以配置文件形式,此处为测试程序,暂时写死
6 const char host_list = "xx.xx.xx.xx:port,xx.xx.xx.xx:port,xx.xx.xx.xx:port";
7 const int time_out = 50000;
8 int main()
9 {
10 CZkHandle
zk_handle = CZkHandle::GetInstance();
11 struct Stat stat;
12 set<string> node_list;
13 string node_info;
14 string raw_name;
15
16 zk_handle->ZkInit(host_list, time_out);
17 sleep(1);
18 zk_handle->ZkExists("/", stat);
19
20 zk_handle->ZkCreateNode("/test_1", "1", true, raw_name);
21 zk_handle->ZkCreateNode("/test_1", "1", true, raw_name);
22 zk_handle->ZkCreateNode("/test_2", "2", false, raw_name);
23 zk_handle->ZkCreateNode("/test_2", "2", false, raw_name);
24
25 zk_handle->ZkGetNodeInfo("/test_2", node_info, stat);
26 zk_handle->ZkSetNodeInfo("/test_2", "3");
27 zk_handle->ZkGetNodeInfo("/test_2", node_info, stat);
28
29
30 node_list.clear();
31 zk_handle->ZkGetChildren("/", node_list);
32
33 zk_handle->ZkDeleteNode("/test_2");
34 node_list.clear();
35 zk_handle->ZkGetChildren("/", node_list);
36
37 sleep(60);
38
39 zk_handle->ZkClose();
40 return 0;
41 }


Makefile

INC_DIR:= ./ ../zk_util/ /usr/local/include/zookeeper/ /usr/local/include/json/
SRCS:= $(wildcard ./cpp ../zk_util/cpp)
OBJS:= $(patsubst %.cpp, %.o, $(SRCS))
LIBS:= -lpthread -lzookeeper_mt -ljsoncpp

CXX:= g++

CXXFLAGS:= -w -g -std=c++11 $(addprefix -I, $(INC_DIR)) $(LIBS) -Wl,-rpath="/usr/local/lib"

EXE:= ../../bin/zk_handle_test

$(EXE):$(OBJS)
$(CXX) -o $(EXE) $(OBJS) $(CXXFLAGS)

clean:
rm -rf $(EXE)
rm -rf $(OBJS)


执行结果:

ccx@ccx:~/self_test/zookeeper/services_register_and_discovery/bin$ ll
total 1068
drwxrwxrwx 1 ccx ccx 512 May 30 18:24 ./
drwxrwxrwx 1 ccx ccx 512 May 30 18:20 ../
-rwxrwxrwx 1 ccx ccx 1092352 May 30 18:24 zk_handle_test*
ccx@ccx:~/self_test/zookeeper/services_register_and_discovery/bin$ ./zk_handle_test
CZkHandle::ZkInit create register check thread succ.
CZkHandle::ResetZkHandle: connect to zk succ.
CZkHandle::ZkInitWatchar: [type=-1] [state=3] [path=] [watcher_ctx=(nil)]
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=285] [child_num=5]
CZkHandle::ZkCreateNode create node [path=/test_1] [value=1]
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=285] [child_num=5]
CZkHandle::ZkCreateNode create node succ! path=/test_10000000145
CZkHandle::ZkCreateNode create node [path=/test_1] [value=1]
CZkHandle::ZkCreateNode create node succ! path=/test_10000000146
CZkHandle::ZkCreateNode create node [path=/test_2] [value=2]
CZkHandle::ZkCreateNode create node succ! path=/test_2
CZkHandle::ZkCreateNode create node [path=/test_2] [value=2]
CZkHandle::ZkCreateNode create node fail. ret=-110
CZkHandle::ZkGetNodeInfo get node info for path=/test_2
CZkHandle::ZkGetNodeInfo get node info for path=/test_2 succ. buffer=2
CZkHandle::ZkGetNodeInfo: [path=/test_2] [czxid=17179873741] [mzxid=17179873741] [version=0] [cversion=0] [child_num=0]
CZkHandle::ZkSeeNodeInfo set node info. path=/test_2 value=3
CZkHandle::ZkGetNodeInfo get node info for path=/test_2
CZkHandle::ZkGetNodeInfo get node info for path=/test_2 succ. buffer=3
CZkHandle::ZkGetNodeInfo: [path=/test_2] [czxid=17179873741] [mzxid=17179873743] [version=1] [cversion=0] [child_num=0]
CZkHandle::ZkGetChildren get children for path=/
CZkHandle::ZkGetChildren get children succ. children_num=8
CZkHandle::ZkGetChildren children_idx=0, children_name=marrs
CZkHandle::ZkGetChildren children_idx=1, children_name=test_2
CZkHandle::ZkGetChildren children_idx=2, children_name=zookeeper
CZkHandle::ZkGetChildren children_idx=3, children_name=zk_test1
CZkHandle::ZkGetChildren children_idx=4, children_name=zk_test2
CZkHandle::ZkGetChildren children_idx=5, children_name=test_10000000146
CZkHandle::ZkGetChildren children_idx=6, children_name=test_10000000145
CZkHandle::ZkGetChildren children_idx=7, children_name=zk_test
CZkHandle::ZkDeleteNode delete node path=/test_2 version=-1 ret=0
CZkHandle::ZkGetChildren get children for path=/
CZkHandle::ZkGetChildren get children succ. children_num=7
CZkHandle::ZkGetChildren children_idx=0, children_name=marrs
CZkHandle::ZkGetChildren children_idx=1, children_name=zookeeper
CZkHandle::ZkGetChildren children_idx=2, children_name=zk_test1
CZkHandle::ZkGetChildren children_idx=3, children_name=zk_test2
CZkHandle::ZkGetChildren children_idx=4, children_name=test_10000000146
CZkHandle::ZkGetChildren children_idx=5, children_name=test_10000000145
CZkHandle::ZkGetChildren children_idx=6, children_name=zk_test
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=289] [child_num=7]
CZkHandle::ZkExists: [ret=0]
CZkHandle::ZkExists: [path=/] [czxid=0] [mzxid=0] [version=0] [cversion=289] [child_num=7]

声明:该文章系转载,转载该文章的目的在于更广泛的传递信息,并不代表本网站赞同其观点,文章内容仅供参考。

本站是一个个人学习和交流平台,网站上部分文章为网站管理员和网友从相关媒体转载而来,并不用于任何商业目的,内容为作者个人观点, 并不代表本网站赞同其观点和对其真实性负责。

我们已经尽可能的对作者和来源进行了通告,但是可能由于能力有限或疏忽,导致作者和来源有误,亦可能您并不期望您的作品在我们的网站上发布。我们为这些问题向您致歉,如果您在我站上发现此类问题,请及时联系我们,我们将根据您的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。