首先搭建zookeeper集群docker-compose.yml
version: '2'networks: zk: services: zk1: image: zookeeper:3.4 container_name: zk1 networks: - zk ports: - "21811:2181" environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888 zk2: image: zookeeper:3.4 container_name: zk2 networks: - zk ports: - "21812:2181" environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zk1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zk3:2888:3888 zk3: image: zookeeper:3.4 container_name: zk3 networks: - zk ports: - "21813:2181" environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=0.0.0.0:2888:3888
docker-compose up -d 创建并启动
检查状态
[root@localhost test]# docker exec -it zk1 bash ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /conf/zoo.cfg Mode: follower [root@localhost test]# docker exec -it zk2 bash ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /conf/zoo.cfg Mode: follower [root@localhost test]# docker exec -it zk3 bash ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /conf/zoo.cfg Mode: leader
下面我们来程序验证:
我们用现成的curator来操作zk分布式锁
import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.GetDataBuilder;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Time;import java.util.Date;import java.util.concurrent.TimeUnit;public class DistributedLock { public static Logger log = LoggerFactory.getLogger(DistributedLock.class); private InterProcessMutex interProcessMutex; //可重入排它锁 private String lockName; //竞争资源标志 private String root = "/distributed/lock/";//根节点 private static CuratorFramework curatorFramework; private static String ZK_URL = "127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21813"; static{ curatorFramework= CuratorFrameworkFactory.newClient(ZK_URL,new ExponentialBackoffRetry(1000,3)); curatorFramework.start(); } /** * 实例化 * @param lockName */ public DistributedLock(String lockName){ try { this.lockName = lockName; interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName); }catch (Exception e){ log.error("initial InterProcessMutex exception="+e); } } /** * 获取锁 */ public void acquireLock(){ int flag = 0; try { //重试N次,每次最大等待1s while (!interProcessMutex.acquire(1, TimeUnit.SECONDS)){ flag++; if(flag>5){ //重试次 break; } } } catch (Exception e) { log.error("distributed lock acquire exception="+e); } if(flag>5){ log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock busy"+ new Date().getTime()); }else{ log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock success"+ new Date().getTime()); } } /** * 释放锁 */ public void releaseLock(){ try { if(interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()){ interProcessMutex.release(); curatorFramework.delete().inBackground().forPath(root+lockName); //byte[] data = curatorFramework.getData().forPath(root + lockName); log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock success"+ new Date().getTime()); } }catch (Exception e){ log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock exception="+e); } } }
接下来我们开2个线程来竞争分布式锁:
public class TestLock { public static void main(String[] args) throws InterruptedException { String lockName = "lock1"; DistributedLock lockFoo = new DistributedLock(lockName); //lockFoo.acquireLock(); //lockFoo.releaseLock(); // System.out.println("主线程ID是:" + Thread.currentThread().getId()); Thread thread1 = new MyThread("thread1",lockFoo); Thread thread2 = new MyThread("thread2",lockFoo); thread1.start(); Thread.sleep(1000); thread2.start(); } }/** * 自定义线程 */class MyThread extends Thread { /*线程名称*/ private String name; private DistributedLock lockFoo; public MyThread(String name,DistributedLock lockFoo) { this.name = name; this.lockFoo = lockFoo; } @Override public void run() { if(this.name.equals("thread1")) { this.lockFoo.acquireLock(); } System.out.println("名称" + name + "的线程ID是:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if(this.name.equals("thread2")) { this.lockFoo.releaseLock(); } } }
程序中thread1获取了锁后,thread2解锁失败。