Apache curator-client详解

Apache curator框架中curator-client组件可以作为zookeeper client来使用,它提供了zk实例创建/重连机制等,简单便捷.不过直接使用curator-client并不能减少太多的开发量,因为它相对比较底层,稍后我们继续了解curator-framework组件提供的更多的便捷特性.

一.核心API

1. CuratorZookeeperClient: zookeeper客户端,根据指定的配置信息创建zookeeper实例.

2. RetryPolicy接口: 重连策略,当zookeeper失去链接时使用的"重连策略":

<> RetryOneTime: 只重连一次.

<> RetryNTime: 指定重连的次数N.

<> RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功.

<> ExponentialBackoffRetry: 基于"backoff"方式重连,和RetryUtilElapsed的区别是重连的时间间隔是动态的.

  1. 时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))).

<> BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry(最大重试次数的控制),增加了最大睡眠时间.

3. RetryLoop: 操作重试,如果在执行一个操作时,遇到了zk链接异常,怎么办?RetryLoop可以不断重试,直到网络正常且操作执行成功为止.SessionFailRetryLoop类是一个特列,可以兼容当session失效时,如何进行操作重试.

4. EnsembleProvider: 配置提供者,创建zk客户端时,需要指定connectionString(例如:127.0.0.1:2181),在zookeeper API中只能显式的指定,curator在这个方面提供了更加灵活性的方式,你可以通过任何方式获取或者构建connectionString.

<> FixedEnsembleProvider: 使用固定的字符串作为connectionString.

<> ExhibitorEnsembleProvider: 动态的获取connectionString,可以指定一个URL用来提供connectionString的输出服务.此后此Provider将会间歇性的获取最新的connectionString字符串,并保存.事实上,ExhibitorEnsembleProvider只是一个样例,展示了一种动态获取connectionString的方式,如果在真正的开发中,你可能需要参考它,来定制自己的Provider.

二. 通用客户端代码示例

  1. public class ZooKeeperClient extends Thread{
  2. protected final CuratorZookeeperClient zkClient;
  3. protected String parent;
  4. public static final Charset charset = Charset.forName("utf-8");
  5. private ZNodeWatcher zNodeWatcher = new ZNodeWatcher();//自定义watcher
  6. public ZooKeeperClient(String connectString, int sessionTimeout, String parent) throws Exception {
  7. this.parent = parent;
  8. zkClient = new CuratorZookeeperClient(connectString, sessionTimeout, sessionTimeout, zNodeWatcher, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));
  9. zkClient.start();//must,but anytime before zookeeper operation
  10. zkClient.blockUntilConnectedOrTimedOut(); //first connection should be successful
  11. }
  12. public boolean exist(String path,boolean watched) throws Exception{
  13. return zkClient.getZooKeeper().exists(path,watched) == null ? false : true;
  14. }
  15. /**
  16. * 此path必须存在,如果不存在则立即创建
  17. * @param path
  18. * @return
  19. */
  20. public boolean ensurePath(final String path) throws Exception{
  21. PathUtils.validatePath(path);
  22. return RetryLoop.callWithRetry(zkClient, new Callable<Boolean>(){
  23. @Override
  24. public Boolean call() throws Exception {
  25. EnsurePath ensure = new EnsurePath(path);
  26. ensure.ensure(zkClient);
  27. return true;
  28. }
  29. });
  30. }
  31. /**
  32. *
  33. * @param path
  34. * @param data
  35. * @return   如果path已经存在或者创建成功,则返回true,否则返回false。
  36. * @throws Exception
  37. */
  38. public boolean create(final String path, final String data) throws Exception {
  39. PathUtils.validatePath(path);//if bad format,here will throw some Exception;
  40. return RetryLoop.callWithRetry(zkClient, new Callable<Boolean>() {
  41. @Override
  42. public Boolean call() throws Exception {
  43. int _current = 0;
  44. while (_current < 3) {
  45. _current++;
  46. try {
  47. //zkClient.blockUntilConnectedOrTimedOut();
  48. //确保父节点存在
  49. EnsurePath ensure = new EnsurePath(path).excludingLast();
  50. //parent path should be existed.
  51. //EnsurePath: retry + block
  52. ensure.ensure(zkClient); //ugly API
  53. zkClient.getZooKeeper().create(path, data.getBytes(charset), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  54. return true;
  55. } catch (KeeperException.NodeExistsException e) {
  56. return true;
  57. }
  58. //retry only for KeeperException,not for other runtimeException。
  59. //other exception will be thrown,and stop retry!!
  60. //if no Exception thrown,retry will be stopped and return successfully.
  61. }
  62. return false;
  63. }
  64. }) ;
  65. }
  66. public  class ZNodeWatcher implements Watcher{
  67. @Override
  68. public void process(WatchedEvent event) {
  69. Event.EventType eventType = event.getType();
  70. Event.KeeperState keeperState =  event.getState();
  71. String path = event.getPath();
  72. switch(event.getType()) {
  73. case None:
  74. //connection Error:会自动重连
  75. logger.info("[Watcher],Connecting...");
  76. if(keeperState == Event.KeeperState.SyncConnected){
  77. logger.info("[Watcher],Connected...");
  78. //检测临时节点是否失效等。
  79. }
  80. break;
  81. case NodeCreated:
  82. logger.info("[Watcher],NodeCreated:" + path);
  83. break;
  84. case NodeDeleted:
  85. logger.info("[Watcher],NodeDeleted:" + path);
  86. break;
  87. default:
  88. //
  89. }
  90. }
  91. }
  92. }

三. Provider代码实例

    本实例展示了如何使用curator-client开发简单的API,展示了RetryPolicy,RetryLoop的使用方式;实例中使用Curator自带的ExhibitorEnsembleProvider动态获取zookeeper服务器列表信息.

1. pom.xml

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.4.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-recipes</artifactId>
  9. <version>2.3.0</version>
  10. </dependency>

 2. curator-config.propeties 

  1. host.rest.servers=127.0.0.1,localhost
  2. host.rest.port=8080
  3. host.backup=127.0.0.1:2181
  4. host.rest.path=/servers/zk
  5. host.rest.period=180000

3. IZkClient.java 

  1. package com.test.demo.curator;
  2. import java.util.*;
  3. import java.util.concurrent.Callable;
  4. import java.util.concurrent.TimeUnit;
  5. import org.apache.curator.CuratorZookeeperClient;
  6. import org.apache.curator.RetryLoop;
  7. import org.apache.curator.RetryPolicy;
  8. import org.apache.curator.TimeTrace;
  9. import org.apache.curator.drivers.TracerDriver;
  10. import org.apache.curator.ensemble.EnsembleProvider;
  11. import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
  12. import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
  13. import org.apache.curator.ensemble.exhibitor.ExhibitorRestClient;
  14. import org.apache.curator.ensemble.exhibitor.Exhibitors;
  15. import org.apache.curator.retry.ExponentialBackoffRetry;
  16. import org.apache.curator.retry.RetryNTimes;
  17. import org.apache.curator.utils.EnsurePath;
  18. import org.apache.curator.utils.PathUtils;
  19. import org.apache.curator.utils.ZKPaths;
  20. import org.apache.zookeeper.CreateMode;
  21. import org.apache.zookeeper.KeeperException;
  22. import org.apache.zookeeper.Transaction;
  23. import org.apache.zookeeper.ZooDefs;
  24. import org.apache.zookeeper.data.ACL;
  25. public class IZkClient {
  26. private final CuratorZookeeperClient zkClient;
  27. public IZkClient(String configLocation) throws Exception {
  28. Properties properties = new Properties();
  29. properties.load(ClassLoader.getSystemResourceAsStream(configLocation));
  30. EnsembleProvider provider = buildProvider(properties);
  31. String pTimeout = properties.getProperty("zk.timeout");
  32. Integer timeout = 30000;
  33. if (pTimeout != null) {
  34. timeout = Integer.valueOf(pTimeout);
  35. }
  36. zkClient = new CuratorZookeeperClient(provider, timeout, timeout, null, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));
  37. zkClient.setTracerDriver(new PrintTraceDrive());
  38. zkClient.start();//must,but anytime before zookeeper operation
  39. zkClient.blockUntilConnectedOrTimedOut(); //first connection should be successful
  40. }
  41. /**
  42. * build provider,all of params from config-file
  43. * @param properties
  44. * @return
  45. */
  46. private EnsembleProvider buildProvider(Properties properties) {
  47. String servers = properties.getProperty("host.rest.servers");   //hosts.servers = 127.0.0.1,127.0.0.2
  48. if (servers == null || servers.isEmpty()) {
  49. throw new IllegalArgumentException("host.servers cant be empty");
  50. }
  51. List<String> hostnames = Arrays.asList(servers.split(","));
  52. String port = properties.getProperty("host.rest.port");
  53. Integer restPort = 80;   //default
  54. if (port != null) {
  55. restPort = Integer.valueOf(port);
  56. }
  57. final String backupAddress = properties.getProperty("host.backup");//127.0.0.1:2181
  58. //if network is error,you should sepcify a backup zk-connectString
  59. Exhibitors exhibitors = new Exhibitors(hostnames, restPort, new Exhibitors.BackupConnectionStringProvider() {
  60. @Override
  61. public String getBackupConnectionString() throws Exception {
  62. return backupAddress;
  63. }
  64. });
  65. //rest,as meaning of getting fresh zk-connectString list.
  66. ExhibitorRestClient restClient = new DefaultExhibitorRestClient();
  67. String restUriPath = properties.getProperty("host.rest.path");
  68. String period = properties.getProperty("host.rest.period");
  69. Integer pollingMs = 180000; //3 min
  70. if (period != null) {
  71. pollingMs = Integer.valueOf(period);
  72. }
  73. return new ExhibitorEnsembleProvider(exhibitors, restClient, restUriPath, pollingMs, new RetryNTimes(10, 1000));
  74. }
  75. public CuratorZookeeperClient getZkClient() {
  76. return zkClient;
  77. }
  78. /**
  79. * how to use RtryLoop ,another style
  80. * if Znode has been existed,will delete it,and create it again.
  81. *
  82. */
  83. public boolean replace(final String path,final byte[] value){
  84. PathUtils.validatePath(path);
  85. boolean result = false;
  86. try{
  87. result = RetryLoop.callWithRetry(zkClient,new Callable<Boolean>() {
  88. @Override
  89. public Boolean call() throws Exception {
  90. int _current = 0;
  91. while(_current < 3){
  92. _current++;
  93. try{
  94. zkClient.blockUntilConnectedOrTimedOut();
  95. Transaction tx = zkClient.getZooKeeper().transaction();
  96. tx.delete(path, -1);
  97. tx.create(path,value,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  98. tx.commit();
  99. return true;
  100. } catch (KeeperException.NoNodeException e){
  101. //
  102. } catch (KeeperException.NodeExistsException e){
  103. //
  104. }
  105. }
  106. return false;  //To change body of implemented methods use File | Settings | File Templates.
  107. }
  108. }) ;
  109. }catch (Exception e){
  110. e.printStackTrace();
  111. }
  112. return result;
  113. }
  114. //API : on for test
  115. public String createPath(String path, byte[] value) throws Exception {
  116. PathUtils.validatePath(path);//if bad format,here will throw some Exception;
  117. EnsurePath ensure = new EnsurePath(path).excludingLast();
  118. //parent path should be existed.
  119. //EnsurePath: retry + block
  120. ensure.ensure(zkClient); //ugly API
  121. return zkClient.getZooKeeper().create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  122. }
  123. //API: on for test
  124. public boolean createPath(String path, byte[] value,int blockTimes){
  125. if (!zkClient.isConnected() && blockTimes == 0) {
  126. return false;
  127. }
  128. TimeTrace trace = zkClient.startTracer("createPath:" + path);//log message
  129. try{
  130. EnsurePath ensure = new EnsurePath(path).excludingLast();
  131. ensure.ensure(zkClient);//only for persistent node
  132. RetryLoop loop = zkClient.newRetryLoop();
  133. int _current = 0;
  134. while(loop.shouldContinue()){
  135. try{
  136. if(_current >= blockTimes){
  137. loop.markComplete(); //stop here.
  138. continue;
  139. }
  140. //blocking
  141. boolean isConnected = zkClient.blockUntilConnectedOrTimedOut();
  142. if(!isConnected){
  143. _current++;
  144. continue;
  145. }
  146. zkClient.getZooKeeper().create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  147. loop.markComplete();
  148. } catch (KeeperException.NodeExistsException e){
  149. loop.markComplete();//exist ,stop here
  150. } catch (Exception e){
  151. loop.takeException(e);
  152. }
  153. }
  154. } catch (Exception e){
  155. e.printStackTrace();
  156. return false;  //cant create path
  157. } finally{
  158. trace.commit();
  159. }
  160. return true;
  161. }
  162. public byte[] getData(String path) throws Exception{
  163. PathUtils.validatePath(path);
  164. return zkClient.getZooKeeper().getData(path,false,null);
  165. }
  166. public void close(){
  167. zkClient.close();
  168. }
  169. class PrintTraceDrive implements TracerDriver {
  170. @Override
  171. public void addTrace(String name, long time, TimeUnit unit) {
  172. System.out.println("<Trace>" + name + ";time:" + TimeUnit.MILLISECONDS.convert(time, unit) + " ms");
  173. }
  174. @Override
  175. public void addCount(String name, int increment) {
  176. }
  177. }
  178. }

4. IZkClientMain.java(for testing)

  1. public class IZkClientMain {
  2. public static void main(String[] args) throws Exception {
  3. String configLocation = "curator-config.properties";
  4. IZkClient iZkClient = new IZkClient(configLocation);
  5. String value = "curator-demo";
  6. String path = "/curator/child/0";
  7. iZkClient.replace(path, value.getBytes("utf-8"));
  8. //simple method;
  9. String nodeName = ZKPaths.getNodeFromPath(path);
  10. System.out.print(nodeName);
  11. //value
  12. byte[] bytes = iZkClient.getData(path);
  13. System.out.println(new String(bytes, "utf-8"));
  14. Thread.sleep(180000 * 2);
  15. iZkClient.close();
  16. }
  17. }

5. ExhibitorEnsembleProvider需要使用远端的一个REST风格的Url来提供zookeeper服务器列表,如下为Spring方式:

  1. @Controller
  2. @RequestMapping("/servers")
  3. public class ServersController {
  4. @RequestMapping(value = "/zk",headers="Accept=application/x-www-form-urlencoded")
  5. public void zk(HttpServletResponse response) throws  Exception{
  6. FormHttpMessageConverter converter = new FormHttpMessageConverter();
  7. converter.setCharset(Charset.forName("utf-8"));
  8. HttpOutputMessage output = new ServletServerHttpResponse(response);
  9. converter.write(buildServers(), MediaType.APPLICATION_FORM_URLENCODED,output);
  10. //String servers = "count=2&port=2181&server0=127.0.0.1&server1=localhost";
  11. }
  12. private MultiValueMap<String,Object> buildServers(){
  13. MultiValueMap<String,Object> map = new LinkedMultiValueMap<String, Object>();
  14. map.add("count","2");
  15. map.add("port","2181");
  16. map.add("server0","127.0.0.1");
  17. map.add("server1","localhost");
  18. return map;
  19. }
  20. }

备注:Curator-client可以帮助我们进行链接重连操作,重连过程中,我们不需要关注太多,不过你仍然可以通过注册Watcher的手段来活的通知.如果在操作过程中,zk的链接有异常,你可以通过RetryLoop的方式实现阻塞.

上一篇:标签的rel属性全解析


下一篇:php里ezpdo orm框架初探