深入理解Zookeeper(二)如何通过zookeeper实现分布式锁

二、如何通过zookeeper实现分布式锁

(1)发现的问题

深入理解Zookeeper(二)如何通过zookeeper实现分布式锁

(2)解决方法

深入理解Zookeeper(二)如何通过zookeeper实现分布式锁
深入理解Zookeeper(二)如何通过zookeeper实现分布式锁

(3)代码实现

(1)用户支付订单代码

package com.yyds.quartzstudy.zk;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class OrderPay {

    private final static String timeOutOrderLockPrefix = "/timeOutOrderLock";
    public void pay(Long orderId){
        try {
            String zkAddr = "192.168.42.101:2181,192.168.42.102:2181,192.168.42.103:2181";
            // 只要执行一次countDown(),等待的线程就会继续执行
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(zkAddr, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {// 异步创建
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
            System.out.println("orderId-" + orderId +  "  连接zk成功...");

            String status = startPay(zooKeeper, orderId);
            System.out.println("orderId-" + orderId+ ",支付状态为:" + status);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    private String startPay(ZooKeeper zooKeeper, Long orderId) {
        /**
         * 支付前,需要先加锁 /timeOutOrderLock/{orderId}
         *   如果创建成功,就进行付款操作
         *   如果创建失败,就稍后再次付款
         */
        String timeOutOrderLockPath = timeOutOrderLockPrefix + "/" + orderId;
        try {
            zooKeeper.create(timeOutOrderLockPath,
                    "order".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL);
            // 模拟去支付订单
            System.out.println("订单-" + orderId + " 准备支付...");
            TimeUnit.SECONDS.sleep(3);

            zooKeeper.delete(timeOutOrderLockPath,-1);
            System.out.println("订单-" + orderId + " 支付成功...");
            return "success";
        }catch (KeeperException.NodeExistsException e){
            System.out.println("订单-" + orderId + " 创建失败...请稍后重试");
            return "failed";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

(2)超时处理逻辑

package com.yyds.quartzstudy.zk;

import org.apache.zookeeper.*;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class OrderTimedOut {

    private final static String timeOutOrderLockPrefix = "/timeOutOrderLock";

    public void timedOut(){
        try {
            String zkAddr = "192.168.42.101:2181,192.168.42.102:2181,192.168.42.103:2181";
            // 只要执行一次countDown(),等待的线程就会继续执行
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(zkAddr, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {// 异步创建
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
            System.out.println("OrderTimedOut-" +   "  连接zk成功...");

            startUpdateOrderTimedOut(zooKeeper);

        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private void startUpdateOrderTimedOut(ZooKeeper zooKeeper) {
        /**
         * 1、首先找出30min没有付款的订单
         * 2、处理每条订单,加上分布式锁,尝试创建/timeOutOrderLock/{orderId}
         *    如果创建成功,进行业务处理
         *    如果创建失败,不作任何处理
         * 3、修改需要修改状态的订单
         */
        ArrayList<Order> orders = new ArrayList<>();
        orders.add(new Order(1L,"NOT_PAY"));
        orders.add(new Order(2L,"NOT_PAY"));
        Iterator<Order> iterator = orders.iterator();
        while (iterator.hasNext()){
            Order order = iterator.next();
            String timeOutOrderLockPath = timeOutOrderLockPrefix + "/" + order.id;
            zooKeeper.create(timeOutOrderLockPath,
                    "".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
                        @Override
                        public void processResult(int i, String s, Object o, String s1) {
                            if(i == KeeperException.Code.OK.intValue()){
                                System.out.println("OrderTimedOut开始执行业务处理order.id-" + order.id);
                                // 模拟处理
                                try {
                                    TimeUnit.SECONDS.sleep(3);
                                    // 删除锁
                                    zooKeeper.delete(timeOutOrderLockPath,-1);
                                    System.out.println("OrderTimedOut执行业务处理完毕order.id-" + order.id);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }else if(i == KeeperException.Code.NODEEXISTS.intValue()){
                                System.out.println("OrderTimedOut不执行order.id-" + order.id);
                                iterator.remove();
                            }else {
                                System.out.println("OrderTimedOut异常,order.id-" + order.id);
                            }
                        }
                    },"call_back");
        }
    }

    private class Order{
       private Long id;
       private String status;

        public Order(Long id, String status) {
            this.id = id;
            this.status = status;
        }
        public Long getId() {
            return id;
        }
        public void setId(Long id) {
            this.id = id;
        }

        public String getStatus() {
            return status;
        }
        public void setStatus(String status) {
            this.status = status;
        }
    }
}

(3)测试

package com.yyds.quartzstudy.zk;

import java.util.concurrent.TimeUnit;

public class OrderTest {
    public static void main(String[] args) throws InterruptedException {
        Thread pay = new Thread(() -> new OrderPay().pay(1L));
        Thread timeOut = new Thread(() -> new OrderTimedOut().timedOut());

        //用户先拿到锁
        pay.start();
        TimeUnit.SECONDS.sleep(1);
        timeOut.start();
        TimeUnit.SECONDS.sleep(6);
    }
}

(4)测试结果

OrderTimedOut-  连接zk成功...
orderId-1  连接zk成功...
订单-1 准备支付...
OrderTimedOut不执行order.id-1
OrderTimedOut开始执行业务处理order.id-2
订单-1 支付成功...
orderId-1,支付状态为:success

(4)分布式读写锁的逻辑

深入理解Zookeeper(二)如何通过zookeeper实现分布式锁

深入理解Zookeeper(二)如何通过zookeeper实现分布式锁

上一篇:sync.Once concurrent map iteration and map write map并发读写


下一篇:vue使用高德地图