zk分布式任务管理

在我们的系统开发过程 中不可避免的会使用到定时任务的功能,而当我们在生产环境部署的服务超过1台时,就需要考虑任务调度的问题,防止两台或多台服务器上执行同一个任务,这个问题今天咱们就用zookeeper来解决。

zookeeper的存储模型

Zookeeper的数据存储采用的是结构化存储,结构化存储是没有文件和目录的概念,里边的目录和文件被抽象成了节点(node),zookeeper里可以称为znode。Znode的层次结构如下图:

zk分布式任务管理

每个子目录项如 NameService 都被称作为 znode(目录节点),和文件系统一样,我们能够*的增加、删除znode,在一个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的。

znode类型

  • PERSISTENT-持久化目录节点

    客户端与zookeeper断开连接后,该节点依旧存在

  • PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点

    客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号

  • EPHEMERAL-临时目录节点

    客户端与zookeeper断开连接后,该节点被删除

  • EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点

    客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

监听通知机制

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。基于这种监听,可以实现注册中心、分布式同步等功能。

zk分布式任务管理机制

使用zookeeper的临时顺序节点,来实现分布式任务的调度功能,每一台服务启动的时候都向zookeepe指定的目录下注册一下临时顺序节点,并把该节点记录的系统里,每一次任务执行的时候,获取所有的有序节点,跟当前系统创爱你的节点对比,如果当前服务创建的节点是所有节点中最小的,则执行任务,否则不执行任务,如下如所示:

zk分布式任务管理

代码实现

1、pom引用

 <zookeeper.version>3.4.8</zookeeper.version>
<curator.version>2.11.1</curator.version> <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>

2、ZkClient类

该类封装了zookeeper的操作类,服务启动的时候回向zk上注册有序临时节点,目录为:/demo1/task/n,例如:/demo1/task/n00000001,/demo1/task/n00000002,创建的节点路径保存到变量:curTaskNodeId

package com.blogs.client;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.springframework.stereotype.Component; import lombok.Data;
import lombok.extern.slf4j.Slf4j; @Component
@Slf4j
@Data
public class ZkClient {
private CuratorFramework client;
public TreeCache cache;
//记录当前服务在zk上创建的nodeId
public String curTaskNodeId="";
//private ZookeeperProperties zookeeperProperties; public ZkClient(){
init();
} /**
* 初始化zookeeper
*/
public void init(){
try {
//初始sleep时间 ,毫秒,
int baseSleepTimeMs=1000;
//最大重试次数
int maxRetries=3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs,maxRetries);
Builder builder = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181").retryPolicy(retryPolicy)
.sessionTimeoutMs( 1000) //会话超时时间,单位为毫秒,默认60000ms,连接断开后,其它客户端还能请到临时节点的时间
.connectionTimeoutMs( 6000)//连接创建超时时间,单位为毫秒
.namespace( "demo1");//zk的根节点
//以下注释的为创建节点的用户名密码
//builder.authorization("digest", "rt:rt".getBytes("UTF-8"));
/*
builder.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
} @Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});*/
client = builder.build();
client.start(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
//连接丢失
log.info("lost session with zookeeper");
} else if (state == ConnectionState.CONNECTED) {
//连接新建
log.info("connected with zookeeper");
} else if (state == ConnectionState.RECONNECTED) {
log.info("reconnected with zookeeper");
}
}
});
System.out.println("zk初始化完成");
//获取当前服务启动时创建的节点,临时有序节点,用作定时任务的执行
curTaskNodeId=createNode(CreateMode.EPHEMERAL_SEQUENTIAL,"/task/n",""); } catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
} public void stop() {
client.close();
} public CuratorFramework getClient() {
return client;
}
/**
* 创建节点
* @param mode 节点类型
* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
*4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
* @param path 节点名称
* @param nodeData 节点数据
*/
public String createNode(CreateMode mode, String path , String nodeData) {
String nodepath="";
try {
//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
nodepath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));
System.out.println(nodepath);
} catch (Exception e) {
log.error("注册出错", e);
}
return nodepath;
} /**
* 创建节点
* @param mode 节点类型
* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
* @param path 节点名称
*/
public void createNode(CreateMode mode,String path ) {
try {
//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
log.error("注册出错", e);
}
} /**
* 删除节点数据
*
* @param path
*/
public void deleteNode(final String path) {
try {
deleteNode(path,true);
} catch (Exception ex) {
log.error("{}",ex);
}
} /**
* 删除节点数据
* @param path
* @param deleteChildre 是否删除子节点
*/
public void deleteNode(final String path,Boolean deleteChildre){
try {
if(deleteChildre){
//guaranteed()删除一个节点,强制保证删除,
// 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
}else{
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 设置指定节点的数据
* @param path
* @param datas
*/
public void setNodeData(String path, byte[] datas){
try {
client.setData().forPath(path, datas);
}catch (Exception ex) {
log.error("{}",ex);
}
} /**
* 获取指定节点的数据
* @param path
* @return
*/
public byte[] getNodeData(String path){
Byte[] bytes = null;
try {
if(cache != null){
ChildData data = cache.getCurrentData(path);
if(data != null){
return data.getData();
}
}
client.getData().forPath(path);
return client.getData().forPath(path);
}catch (Exception ex) {
log.error("{}",ex);
}
return null;
} /**
* 获取数据时先同步
* @param path
* @return
*/
public byte[] synNodeData(String path){
client.sync();
return getNodeData( path);
} /**
* 判断路径是否存在
*
* @param path
* @return
*/
public boolean isExistNode(final String path) {
client.sync();
try {
return null != client.checkExists().forPath(path);
} catch (Exception ex) {
return false;
}
} /**
* 获取节点的子节点
* @param path
* @return
*/
public List<String> getChildren(String path) {
List<String> childrenList = new ArrayList<>();
try {
childrenList = client.getChildren().forPath(path);
} catch (Exception e) {
log.error("获取子节点出错", e);
}
return childrenList;
} /**
* 随机读取一个path子路径, "/"为根节点对应该namespace
* 先从cache中读取,如果没有,再从zookeeper中查询
* @param path
* @return
* @throws Exception
*/
public String getRandomData(String path) {
try{
Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);
if(cacheMap != null && cacheMap.size() > 0) {
log.debug("get random value from cache,path="+path);
Collection<ChildData> values = cacheMap.values();
List<ChildData> list = new ArrayList<>(values);
Random rand = new Random();
byte[] b = list.get(rand.nextInt(list.size())).getData();
return new String(b,"utf-8");
}
if(isExistNode(path)) {
log.debug("path [{}] is not exists,return null",path);
return null;
} else {
log.debug("read random from zookeeper,path="+path);
List<String> list = client.getChildren().forPath(path);
if(list == null || list.size() == 0) {
log.debug("path [{}] has no children return null",path);
return null;
}
Random rand = new Random();
String child = list.get(rand.nextInt(list.size()));
path = path + "/" + child;
byte[] b = client.getData().forPath(path);
String value = new String(b,"utf-8");
return value;
}
}catch(Exception e){
log.error("{}",e);
}
return null; } /**
* 获取读写锁
* @param path
* @return
*/
public InterProcessReadWriteLock getReadWriteLock(String path){
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
return readWriteLock;
} /**
* 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
*/
ExecutorService pool = Executors.newFixedThreadPool(2); /**
* 监听数据节点的变化情况
* @param watchPath
* @param listener
*/
public void watchPath(String watchPath,TreeCacheListener listener){
// NodeCache nodeCache = new NodeCache(client, watchPath, false);
TreeCache cache = new TreeCache(client, watchPath);
cache.getListenable().addListener(listener,pool);
try {
cache.start();
} catch (Exception e) {
e.printStackTrace();
}
} }

3、定时任务调用

package com.blogs.client;

import java.time.LocalDateTime;
import java.util.List; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; @Component
@EnableScheduling
public class ScheduleTask { @Autowired
private ZkClient zkClient; //添加定时任务
@Scheduled(cron = "0/5 * * * * ?")
private void configureTasks() {
System.out.println("开始执行任务");
//获取所有节点
List<String> taskNodes=zkClient.getChildren("/task");
//查找最小节点
int minNodeNum=Integer.MAX_VALUE;
for (int i = 0; i < taskNodes.size(); i++) {
//节点前面有一个n,把n替换掉,剩下的转换为数字
int nodeNum=Integer.valueOf(taskNodes.get(i).replace("n", ""));
if(nodeNum < minNodeNum){
minNodeNum = nodeNum;
}
System.out.println("节点:"+taskNodes.get(i));
}
System.out.println("当前节点:"+zkClient.getCurTaskNodeId());
//如果最小节点 等于该服务创建的节点,则执行任务
int curNodeNum=Integer.valueOf(zkClient.getCurTaskNodeId().substring(zkClient.getCurTaskNodeId().lastIndexOf('/') + 2));
if(minNodeNum - curNodeNum == 0){
System.out.println("执行任务");
}else {
System.out.println("不执行任务");
} System.err.println("执行静态定时任务时间: " + LocalDateTime.now());
}
}

当前服务创建的服务为节点最小的,则执行服务,否则不执行服务

执行结果

把服务的端口分别修改为:8080,8081,模拟启动两个服务,查看定时任务的执行情况

zk分布式任务管理

zk分布式任务管理

当把两个服务的任何一个服务关闭,定时任务还可以正常执行。

zkCli查看查创建的目录结构

zk分布式任务管理

作者:Eric.Chen
出处:https://www.cnblogs.com/lc-chenlong

如果喜欢作者的文章,请关注“写代码的猿”订阅号以便第一时间获得最新内容。本文版权归作者所有,欢迎转载

zk分布式任务管理
上一篇:【HDOJ】4587 TWO NODES


下一篇:kettle的下载、安装和初步使用(windows平台下)(图文详解)