我们使用curator建立连接,curator有session维护,重试机制,对递归创建节点和删除节点有较好的支持:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("10.10.210.123:2181") .sessionTimeoutMs(50000000) .connectionTimeoutMs(50000000) .retryPolicy(retryPolicy) .build(); client.start();//start以后就可以使用Curator操作zk了,用完了以后不要忘记释放调用close方法。
常规代码可以先判断是否存在后,创建消息通讯的根节点:
Stat s = client.checkExists().forPath("/msg_node_list"); if (s == null) { String o = client.create().withMode(CreateMode.PERSISTENT).forPath("/msg_node_list"); }
针对我们的每一个netty服务,根据自己的服务id向msg_node_list注册临时节点:
client.create().withMode(CreateMode.EPHEMERAL).forPath("/msg_node_list/b");
而我们的监控中心,负责给客户端分发具体的netty链接服务器,则需要轮询监听每一个临时子节点的状态,并且通过watch监听异常下线节点,去redis中更新服务列表,从而给客户端提供消息服务:
List<String> list = client.getChildren().forPath("/msg_node_list"); //遍历list中的每一个临时子节点后调用如下代码,参数为list中取出的临时节点名称: client.getChildren().usingWatcher(w).forPath("/msg_node_list/b");
Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.EventType type = watchedEvent.getType(); if (type.equals(Event.EventType.NodeDeleted)) { System.out.println(watchedEvent.getPath()); } } };
如果客户端连接的netty服务异常下线,则在次访问监控服务器获取可用的netty地址。