zookeeper提供了两个库,zookeeper_st和 zookeeper_mt。
前者是单线程库,仅仅提供了异步API和集成在应用程序实现循环中的回调函数,这个库是为了支持pthread库不支持或者不稳定的系统而存在的。使用过程中要自己通过zoo_interest和zoo_process实现事件处理及通知机制。
其他情况下应该使用后者多线程库。因为它同时支持同步和异步API。使用起来也方便很多,可以看我的例子。
The package includes two shared libraries: zookeeper_st and zookeeper_mt. The former only provides the asynchronous APIs and callbacks for integrating into the application's event loop. The only reason this library exists is to support the platforms were a pthread library is not available or is unstable (i.e. FreeBSD 4.x). In all other cases, application developers should link with zookeeper_mt, as it includes support for both Sync and Async API.
OK,talk is cheap。
单线程库使用示例: 在zookeeper上先创建/xyz才能执行成功,因为st库仅支持异步,而异步create没有时序保证。 该程序可以监控/xyz节点及子节点的变化。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_log.h>
#include <errno.h> #define TRUE 1 static const char* state2String(int state){
if (state == )
return "CLOSED_STATE";
if (state == ZOO_CONNECTING_STATE)
return "CONNECTING_STATE";
if (state == ZOO_ASSOCIATING_STATE)
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
return "AUTH_FAILED_STATE"; return "INVALID_STATE";
} static const char* type2String(int state){
if (state == ZOO_CREATED_EVENT)
return "CREATED_EVENT";
if (state == ZOO_DELETED_EVENT)
return "DELETED_EVENT";
if (state == ZOO_CHANGED_EVENT)
return "CHANGED_EVENT";
if (state == ZOO_CHILD_EVENT)
return "CHILD_EVENT";
if (state == ZOO_SESSION_EVENT)
return "SESSION_EVENT";
if (state == ZOO_NOTWATCHING_EVENT)
return "NOTWATCHING_EVENT"; return "UNKNOWN_EVENT_TYPE";
}
void zktest_dump_stat(const struct Stat *stat) {
char tctimes[];
char tmtimes[];
time_t tctime;
time_t tmtime; if (!stat) {
fprintf(stderr,"null\n");
return;
}
tctime = stat->ctime/;
tmtime = stat->mtime/; ctime_r(&tmtime, tmtimes);
ctime_r(&tctime, tctimes); fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
"\tmtime=%s\tmzxid=%llx\n"
"\tversion=%x\taversion=%x\n"
"\tephemeralOwner = %llx\n",
tctimes, stat->czxid,
tmtimes, stat->mzxid,
(unsigned int)stat->version, (unsigned int)stat->aversion,
stat->ephemeralOwner);
}
void zktest_data_completion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data) {
fprintf(stderr, "in data completion [%s]: rc = %d\n", value, rc);
zktest_dump_stat(stat);
}
/*process a list of string and stat*/
void zktest_strings_stat_completion(int rc, const struct String_vector *strings, const struct Stat *stat, const void *data) {
int i=;
fprintf(stderr, "in strings state completion [%s]: rc = %d, string count %d\n", (char *)data, rc, strings ->count);
for (i=; i<strings->count; i++) {
printf("%d: %s\n", i, strings->data[i]);
}
deallocate_String_vector(strings);
zktest_dump_stat(stat);
} void zktest_string_completion(int rc, const char *name, const void *data) {
fprintf(stderr, "in string completion [%s]: rc = %d\n", (char*)(data==?"null":data), rc);
if (!rc) {
fprintf(stderr, "\tname = %s\n", name);
}
} void zktest_stat_completion(int rc, const struct Stat *stat, const void *data) { if (rc == ZNONODE)
{
printf("node not exists\n");
}
fprintf(stderr, "in state completion rc = %d data=%s stat:\n", rc, (char *)data);
zktest_dump_stat(stat); } void zktest_void_completion(int rc, const void *data) {
fprintf(stderr, "in void completion [%s]: rc = %d\n", (char*)(data==?"null":data), rc);
} void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) {
printf("Something happened.\n");
printf("type: %s\n", type2String(type));
printf("state: %s\n", state2String(state));
printf("path: %s\n", path);
printf("watcherCtx: %s\n", (char *)watcherCtx);
int ret;
const clientid_t *zk_clientid;
if (path == NULL || strlen(path) == )
{
path="/xyz";
}
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_EXPIRED_SESSION_STATE) {
printf("[%s %d]zookeeper session expired\n", __FUNCTION__, __LINE__);
} else if (state == ZOO_CONNECTED_STATE) {
zk_clientid = zoo_client_id(zh);
printf("[%s %d] connected to zookeeper server with clientid=%lu\n", __FUNCTION__, __LINE__, zk_clientid ->client_id);
if ((ret=zoo_aexists(zh, path, TRUE, zktest_stat_completion, path)) != ZOK)
{
printf("[%s %d] zoo_aexists judge error, msg = %s\n", __FUNCTION__, __LINE__, zerror(ret)); }
if ((ret = zoo_aget(zh, (char *)path, TRUE, zktest_data_completion, "get param")) !=ZOK)
{
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
}
if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK)
{
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
}
}
} else if (type == ZOO_CREATED_EVENT) {
printf("create a child node\n"); if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK) {
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
}
if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) {
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
} } else if (type == ZOO_DELETED_EVENT) {
printf("path %s deleted\n", path);
if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) {
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
}
} else if (type == ZOO_CHANGED_EVENT) {
printf("path %s changed\n", path);
if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK) {
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
}
if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) {
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
} } else if (type == ZOO_CHILD_EVENT) {
printf("path %s child event\n");
if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) {
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
}
if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK) {
printf("[%s %d] fail to get znode %s, err=%d, msg= %s\n", __FUNCTION__, __LINE__, path, ret, zerror(ret));
}
}
else if (type == ZOO_NOTWATCHING_EVENT){
}
else {
printf("unkown zoo event type\n");
} } int main(int argc, const char *argv[])
{
static const char* host = "localhost:2181";
int timeout = ;
zhandle_t* zkhandle;
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
zoo_deterministic_conn_order();
zkhandle = zookeeper_init(host, zktest_watcher_g, , , "hello world", );
if (zkhandle == NULL) {
fprintf(stderr, "Error when connecting to zookeeper servers...\n");
exit(EXIT_FAILURE);
} /* Wait for asynchronous zookeeper call done.*/
int fd, interest, events;
int rc;
fd_set rfds, wfds, efds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while(){
struct timeval tv;
zookeeper_interest(zkhandle, &fd, &interest, &tv);
if (fd != -) {
if(interest&ZOOKEEPER_READ)
{
FD_SET(fd, &rfds);
} else {
FD_CLR(fd, &rfds);
} if(interest&ZOOKEEPER_WRITE)
{
FD_SET(fd, &wfds);
} else{
FD_CLR(fd, &wfds);
}
} else{
fd = ;
} if (select(fd+, &rfds, &wfds, &efds, &tv) < )
{
printf("[%s %d]select failed, err=%d, msg=%s\n", __FUNCTION__, __LINE__, errno, strerror(errno));
}
events = ;
if (FD_ISSET(fd, &rfds))
{
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds))
{
events |= ZOOKEEPER_WRITE;
}
zookeeper_process(zkhandle, events);
}
zookeeper_close(zkhandle);
}
gcc -o zookeeper_s_test zookeeper_s_test.c -lzookeeper_st -I /usr/local/include/zookeeper/
执行结果:
2014-10-21 22:05:41,856:49845:ZOO_INFO@zookeeper_init@786: Initiating client connection, host=localhost:2181 sessionTimeout=30000 watcher=0x401444 sessionId=0 sessionPasswd=<null> context=0x402213 flags=0
2014-10-21 22:05:41,859:49845:ZOO_INFO@check_events@1703: initiated connection to server [::1:2181]
2014-10-21 22:05:41,860:49845:ZOO_INFO@check_events@1750: session establishment complete on server [::1:2181], sessionId=0x14929440e3d0027, negotiated timeout=30000
2014-10-21 22:05:41,860:49845:ZOO_DEBUG@check_events@1756: Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE
2014-10-21 22:05:41,860:49845:ZOO_DEBUG@process_completions@2107: Calling a watcher for node [], type = -1 event=ZOO_SESSION_EVENT
Something happened.
type: SESSION_EVENT
state: CONNECTED_STATE
path:
watcherCtx: hello world
[zktest_watcher_g 124] connected to zookeeper server with clientid=92650639611199527
2014-10-21 22:05:41,860:49845:ZOO_DEBUG@zoo_awexists@2835: Sending request xid=0x54473b25 for path [/xyz] to ::1:2181
2014-10-21 22:05:41,860:49845:ZOO_DEBUG@zoo_awget@2655: Sending request xid=0x54473b26 for path [/xyz] to ::1:2181
2014-10-21 22:05:41,860:49845:ZOO_DEBUG@zoo_awget_children2_@2916: Sending request xid=0x54473b27 for path [/xyz] to ::1:2181
2014-10-21 22:05:41,862:49845:ZOO_DEBUG@zookeeper_process@2264: Queueing asynchronous response
2014-10-21 22:05:41,862:49845:ZOO_DEBUG@deserialize_response@2005: Calling COMPLETION_STAT for xid=0x54473b25 failed=0 rc=0
in state completion rc = 0 data=/xyz stat:
ctime = Tue Oct 21 21:48:42 2014
czxid=400000052
mtime=Tue Oct 21 21:56:06 2014
mzxid=40000005a
version=2 aversion=0
ephemeralOwner = 0
2014-10-21 22:05:41,862:49845:ZOO_DEBUG@zookeeper_process@2264: Queueing asynchronous response
2014-10-21 22:05:41,862:49845:ZOO_DEBUG@deserialize_response@1992: Calling COMPLETION_DATA for xid=0x54473b26 failed=0 rc=0
in data completion [123]: rc = 0
ctime = Tue Oct 21 21:48:42 2014
czxid=400000052
mtime=Tue Oct 21 21:56:06 2014
mzxid=40000005a
version=2 aversion=0
ephemeralOwner = 0
2014-10-21 22:05:41,862:49845:ZOO_DEBUG@zookeeper_process@2264: Queueing asynchronous response
2014-10-21 22:05:41,862:49845:ZOO_DEBUG@deserialize_response@2029: Calling COMPLETION_STRINGLIST_STAT for xid=0x54473b27 failed=0 rc=0
in strings state completion [get children param]: rc = 0, string count 0
ctime = Tue Oct 21 21:48:42 2014
czxid=400000052
mtime=Tue Oct 21 21:56:06 2014
mzxid=40000005a
version=2 aversion=0
ephemeralOwner = 0
建议初学者使用时直接开debug,可以看清楚zookeeper的处理机制。
多线程库使用示例。该程序自己创建,读取,并删除一个节点,并能够触发watcher。
#include<stdio.h>
#include<string.h>
#include<zookeeper.h>
#include<zookeeper_log.h>
#include <net/if.h>
#include <netinet/in.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
static const char* state2String(int state){
if (state == 0)
return "CLOSED_STATE";
if (state == ZOO_CONNECTING_STATE)
return "CONNECTING_STATE";
if (state == ZOO_ASSOCIATING_STATE)
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
return "AUTH_FAILED_STATE";
return "INVALID_STATE";
}
static const char* type2String(int state){
if (state == ZOO_CREATED_EVENT)
return "CREATED_EVENT";
if (state == ZOO_DELETED_EVENT)
return "DELETED_EVENT";
if (state == ZOO_CHANGED_EVENT)
return "CHANGED_EVENT";
if (state == ZOO_CHILD_EVENT)
return "CHILD_EVENT";
if (state == ZOO_SESSION_EVENT)
return "SESSION_EVENT";
if (state == ZOO_NOTWATCHING_EVENT)
return "NOTWATCHING_EVENT";
return "UNKNOWN_EVENT_TYPE";
}
void zookeeper_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) {
printf("\nSomething happened.\n");
printf("type: %s\n", type2String(type));
printf("state: %s\n", state2String(state));
printf("path: %s\n", path);
printf("watcherCtx: %s\n", (char *)watcherCtx);
const clientid_t *zk_clientid;
int rc;
if (type == ZOO_CREATED_EVENT)
{
printf("[%s %d] znode %s created.\n", __FUNCTION__, __LINE__, path);
} else if (type == ZOO_DELETED_EVENT){
printf("[%s %d] znode %s deleted.\n", __FUNCTION__, __LINE__, path);
} else if (type == ZOO_CHANGED_EVENT){
printf("[%s %d] znode %s changed.\n", __FUNCTION__, __LINE__, path);
} else if(type == ZOO_CHILD_EVENT) {
printf("[%s %d] znode %s children changed.\n", __FUNCTION__, __LINE__, path);
} else if(type == ZOO_SESSION_EVENT) {
if(state == ZOO_EXPIRED_SESSION_STATE){
printf("[%s %d] zookeeper session expired\n", __FUNCTION__, __LINE__);
} else if (state == ZOO_AUTH_FAILED_STATE) {
printf("[%s %d] zookeeper session auth failed\n", __FUNCTION__, __LINE__);
} else if (state == ZOO_CONNECTING_STATE) {
printf("[%s %d] zookeeper session is connecting\n", __FUNCTION__, __LINE__);
} else if (state == ZOO_ASSOCIATING_STATE) {
printf("[%s %d] zookeeper session is associating state\n", __FUNCTION__, __LINE__);
} else if (state == ZOO_CONNECTED_STATE){
zk_clientid = zoo_client_id(zh);
printf("[%s %d] connected to zookeeper server with clientid=%lu\n", __FUNCTION__, __LINE__, zk_clientid ->client_id);
} else if (state == ZOO_NOTWATCHING_EVENT) {
printf("[%s %d] zookeeper session remove watch\n", __FUNCTION__, __LINE__);
} else {
printf("unknown session event state = %s, path = %s, ctxt=%s\n", state2String(state), path, (char *)watcherCtx);
}
}
}
void create(zhandle_t *zkhandle,char *str) {
char path_buffer[64]="abc";
int bufferlen=sizeof(path_buffer);
printf("create node in synchronous mode-----------------------\n");
int flag = zoo_create(zkhandle, str,"syn-node", 9,
&ZOO_OPEN_ACL_UNSAFE,0,
path_buffer,bufferlen);
if (flag!=ZOK) {
printf("create syn-node failed\n");
exit(EXIT_FAILURE);
} else {
printf("created node is %s\n",path_buffer);
}
}
void exists(zhandle_t *zkhandle,char *path)
{
int flag = zoo_exists(zkhandle, path, 1, NULL);
if (flag) {
printf("%s node already exist\n", path);
} else {
printf("%s node not exist\n", path);
}
}
void getACL(zhandle_t *zkhandle,char *str)
{
struct ACL_vector acl;
struct Stat stat;
int flag = zoo_get_acl(zkhandle,str,&acl,&stat);
if (flag==ZOK && acl.count > 0) {
printf("-----------------the ACL of %s:\n------------",str);
printf("%d\n",acl.count);
printf("%d\n",acl.data->perms);
printf("%s\n",acl.data->id.scheme);
printf("%s\n",acl.data->id.id);
}
}
void delete(zhandle_t *zkhandle,char *str)
{
int flag = zoo_delete(zkhandle,str,-1);
if (flag==ZOK) {
printf("delete node %s success\n", str);
}
}
int main(int argc, const char *argv[])
{
if (argc != 2) {
printf("usage: zookeeper_m_test server_address\n");
exit(-1);
}
const char* host = argv[1];
int timeout = 30000;
char buffer[512];
int *bufferlen;
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
zhandle_t* zkhandle = zookeeper_init(host, zookeeper_watcher_g, timeout, 0, NULL, 0);
if (zkhandle ==NULL) {
fprintf(stderr, "Error when connecting to zookeeper servers...\n");
exit(EXIT_FAILURE);
}
create(zkhandle, "/abc");
exists(zkhandle, "/abc");
getACL(zkhandle, "/abc");
delete(zkhandle, "/abc");
while(1);
zookeeper_close(zkhandle);
}
cc -o zookeeper_m_test zookeeper_m_test.o -I/usr/local/include/zookeeper/ -L/usr/local/lib/ -L/usr/local/lib/x86_64-linux-gnu/ -lzookeeper_mt
执行:./zookeeper_m_test localhost:2181
2014-10-21 22:10:14,598:49903(0x7fc80aa16740):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=localhost:2181 sessionTimeout=30000 watcher=0x400dfc sessionId=0 sessionPasswd=<null> context=(nil) flags=0
2014-10-21 22:10:14,598:49903(0x7fc80aa16740):ZOO_DEBUG@start_threads@221: starting threads...
2014-10-21 22:10:14,599:49903(0x7fc8094c2700):ZOO_DEBUG@do_io@367: started IO thread
2014-10-21 22:10:14,599:49903(0x7fc8094c2700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181]
2014-10-21 22:10:14,599:49903(0x7fc808cc1700):ZOO_DEBUG@do_completion@459: started completion thread
create node in synchronous mode-----------------------
2014-10-21 22:10:14,599:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_acreate@2756: Sending request xid=0x54473c37 for path [/abc] to 127.0.0.1:2181
2014-10-21 22:10:14,604:49903(0x7fc8094c2700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x14929440e3d0028, negotiated timeout=30000
2014-10-21 22:10:14,604:49903(0x7fc8094c2700):ZOO_DEBUG@check_events@1756: Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE
2014-10-21 22:10:14,604:49903(0x7fc808cc1700):ZOO_DEBUG@process_completions@2107: Calling a watcher for node [], type = -1 event=ZOO_SESSION_EVENT Something happened.
type: SESSION_EVENT
state: CONNECTED_STATE
path:
watcherCtx: (null)
[zookeeper_watcher_g 81] connected to zookeeper server with clientid=92650639611199528
2014-10-21 22:10:14,608:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=6 xid=0x54473c37 rc=0
created node is /abc
2014-10-21 22:10:14,608:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_awexists@2835: Sending request xid=0x54473c38 for path [/abc] to 127.0.0.1:2181
2014-10-21 22:10:14,609:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=1 xid=0x54473c38 rc=0
/abc node not exist
2014-10-21 22:10:14,609:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_aget_acl@2989: Sending request xid=0x54473c39 for path [/abc] to 127.0.0.1:2181
2014-10-21 22:10:14,611:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=5 xid=0x54473c39 rc=0
-----------------the ACL of /abc:
------------1
31
world
anyone
2014-10-21 22:10:14,611:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_adelete@2796: Sending request xid=0x54473c3a for path [/abc] to 127.0.0.1:2181
2014-10-21 22:10:14,613:49903(0x7fc8094c2700):ZOO_DEBUG@zookeeper_process@2193: Processing WATCHER_EVENT
2014-10-21 22:10:14,613:49903(0x7fc808cc1700):ZOO_DEBUG@process_completions@2107: Calling a watcher for node [/abc], type = -1 event=ZOO_DELETED_EVENT Something happened.
type: DELETED_EVENT
state: CONNECTED_STATE
path: /abc
watcherCtx: (null)
[zookeeper_watcher_g 58] znode /abc deleted.
2014-10-21 22:10:14,613:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=0 xid=0x54473c3a rc=0
delete node /abc success
很明显,多线程库使用起来更方便。
更多API使用详细可参考以下:
参考:
http://www.cnblogs.com/haippy/archive/2013/02/21/2920426.html
http://zookeeper.apache.org/doc/r3.4.6/zookeeperProgrammers.html#C+Binding