分布式改造剧集2---DIY分布式锁

前言:

​ 好了,终于又开始播放分布式改造剧集了。前面一集中(http://www.cnblogs.com/Kidezyq/p/8748961.html)我们DIY了一个Hessian转发实现,最后我们也留下了一个展望方向:可以实现一个管理界面管理节点,实现简单的服务治理的功能。这一集我们接着继续DIY分布式锁。

第二集:分布式锁DIY

探索之路

​ 由于业务互斥的需要,当前项目中实现了一个内存锁。锁的大致模型是分为锁类型和锁键值,只有当锁类型和键值都相同的时候,整个业务才互斥。但是必须提供一个方法,来判断某种类型的锁是否存在。大致代码如下:

/**
 * 内存锁
 *
 */
public class MemoryLock {
    /**
     * 同步锁
     */
    private final Object lock = new Object();
    
    /**
     * 内存锁模型
     */
    private ConcurrentHashMap<String, ConcurrentHashMap<String, String>> lockMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, String>>();

    /**
     * 尝试获取到锁
     * @param lockType 锁类型
     * @param key       锁键值
     * @return 如果当前获取到锁,则返回true。否则,返回false。
     */
    private boolean tryLock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentHashMap<String, String> map = this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, String>();
                this.lockMap.put(lockType, map);
            }
            return (map.putIfAbsent(key, key) == null);
        }
    }
    
    /**
     * 判断某种类型的锁是不是空的
     * @param lockType  锁类型
     * @return true,不存在某种类型的锁;false,存在某种类型的锁。
     */
    public boolean isLockTypeEmpty(String lockType) {
        if (null != this.lockMap.get(lockType)) {
            return this.lockMap.get(lockType).size() == 0;
        }
        return true;
    }

    /**
     * 获取锁
     * @param lockType  锁类型
     * @param key       锁键值
     * @param timeout   超时时间(毫秒)
     * @throws TimeoutException 如果超时之后还没有获得到锁,则抛出超时异常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否没有超时设置,当传入的超时时间为负数或者为0时,表示没有超时时间
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));
        
        throw new TimeoutException();
    }

    /**
     * 释放锁
     * @param lockType 锁类型
     * @param key      锁键值
     */
    public void unlock(String lockType, String key) {
        synchronized (this.lock) {
            ConcurrentMap<String, String> map = this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
    }
}

​ 可以看到,单机模式下的互斥锁是直接在内存中保存一个ConcurrentHashMap,然后利用putIfAbsent的原子特性。该锁的使用方式如下:

try {
    memoryLock.lock(lockType, lockKey, 0l);
} catch(TimeOutException e) {
    // TODO: Exception caught  
} finally {
    memoryLock.unlock(lockType, lockKey);
}

​ 当应用部署在分布式环境中的时候。显然,原来的内存锁已经不适用。那么在分布式情况下,如何实现锁服务呢?网上给出的分布式锁的实现方案一般有三种:

  1. 利用数据库的for update行锁
  2. 利用Redis的setnx
  3. 利用zookeeper的分布式一致性算法

​ 考虑到尽量不增加新的应用部署,那么先排除2、3,只剩下数据库的行级锁。但其实数据库的行级锁在并发量特别大的时候会对数据库性能造成较大影响,而且估计我想使用DBA都不会允许.....

​ 那么,有没有什么其他更好的办法呢?这次我们利用曲线救国的方式来实现,将分布式转变成非分布式。


实现Demo

​ 在分布式改造剧集第一集中,我们的实现方式中有一个主节点,主节点为配置文件中默认配置的Hessian服务的地址。只有加上了Distribute注解的服务,才会在客户端进行Hessian调用的时候进行路由,否则最终调用的Hessian服务地址即为配置文件中配置的主节点。依赖于这个特性,我们可以不给锁服务添加Distribute注解,使得所有分机部署的服务请求都落到主节点上。具体实现步骤如下:

定义一个内存锁Hessian服务

​ 其实简单来说我们直接将原来的MemoryLock发布成Hessian服务,并且不使用Distribute注解就可以实现将分布式锁转换成单机锁。但是还有以下两点需要特殊考虑:

  1. 分布式服务的多机特性: 内存锁的释放必须显示释放,如果一个服务调用unlock方法之前就挂掉,就可能导致某一个锁永远被锁住。所以我们还需要一个类似于Redis分布式锁实现中的锁超时移除机制。
  2. 远程RPC调用的可能超时: 最终锁的服务调用是需要通过Hessian来实现的,考虑到Hessian调用存在超时时间,如果将前面MemoryLocklock方法等待实现在Hessian服务中,那么等待时间超长的话会直接导致Hessian服务调用超时。所以改造后的MemoryLock不实现lock方法,只实现tryLock方法,调用该方法时立即返回当前是否可以获得到锁。
  3. 本地服务实现锁等待以及减少Hessian调用: 如第2点所说,我们的锁等待特性不能在内存锁的Hessian服务中实现,只能通过本地服务中实现。另外频繁的Hessian调用会影响应用程序的性能,也需要一个本地的锁服务来巧妙地减少远程服务调用

​ 改造后的MemoryLock代码如下:

@Service("moemoryLockServiceFacade")
public class MemoryLockServiceImpl implements MemoryLockService {
    
    /**
     * 自动超时时间:当前设置为10分钟 单位为纳秒
     */
    private final static long AUTO_EXPIRE_TIME = 1000000000l * 60 * 10;
    
    /**
     * 锁
     */
    private Object semaphore = new Object();

    /**
     * 内存锁结构,双层Map 首层Map的Key存锁类型,value为内层Map。内层Map额key为锁键值,value为锁的加入时间
     */
    private ConcurrentMap<String, ConcurrentMap<Object, Long>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<Object, Long>>();
    
    /**
     * 守护线程: 用来清理过期内存缓存(如果加锁的客户端由于各种原因没有显示解锁,则可能出现其他服务无法获取锁的情况)
     */
    private Thread daemonThread;
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLockServiceImpl.class);
    
    /**
     * 是否终止守护线程的标识
     */
    private volatile boolean stop = false;
    
    /**
     * 清理失效锁的线程
     *
     */
    private class ClearExpireLockThread extends Thread {
        
        @Override
        public void run() {
            Iterator<Entry<String, ConcurrentMap<Object, Long>>> outerIterator = null;
            Iterator<Entry<Object, Long>> innerIterator = null;
            
            // 清理超过超时时间的锁
            while (!stop) {
                synchronized (semaphore) {
                    long expireNanoTimes = System.nanoTime() - AUTO_EXPIRE_TIME;    // 算出超时时间,小于该时间的缓存都应该被移除
                    outerIterator = lockMap.entrySet().iterator();
                    while (outerIterator.hasNext()) {
                        Entry<String, ConcurrentMap<Object, Long>> entrySet = outerIterator.next();
                        innerIterator = entrySet.getValue().entrySet().iterator();
                        boolean allDeleted = true;  // 是否全部删除的标识,默认设为true
                        while (innerIterator.hasNext()) {
                            Entry<Object, Long> innerEntry = innerIterator.next();
                            if (expireNanoTimes > innerEntry.getValue()) {
                                innerIterator.remove();
                                LOGGER.info("守护线程移除类型为【{}】键值为【{}】的锁......", entrySet.getKey(), innerEntry.getKey());
                            } else {
                                allDeleted = false;
                            }
                        }
                        
                        // 如果所类型下的所有锁都被清除,则锁类型也该被移除
                        if (allDeleted) {
                            outerIterator.remove();
                            LOGGER.info("守护线程移除类型为【{}】的锁......", entrySet.getKey());
                        }
                    }
                }
                
                try {
                    // 如果超时时间为1秒,则等待千分之一秒
                    Thread.sleep(AUTO_EXPIRE_TIME / 1000000000l);
                } catch (InterruptedException e) {
                }
            }
        }
    }
    
    /**
     * 终止守护线程
     */
    @PreDestroy
    public void stopDeamonThread() {
        this.stop = true;
        this.daemonThread.interrupt();
    }
    
    /**
     * 初始化守护线程,用来扫描移除超时的内存锁
     */
    @PostConstruct
    public void initDeamonThread() {
        daemonThread = new ClearExpireLockThread();
        daemonThread.setDaemon(true);
        daemonThread.start();
    }

    @Override
    public boolean tryLock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<Object, Long>();
                this.lockMap.put(lockType, map);
            }
            
            // 这里的value值设置为加锁的初始时间
            return (map.putIfAbsent(key, System.nanoTime()) == null);
        }
    }

    @Override
    public boolean isLockTypeEmpty(String lockType){
        return MapUtils.isEmpty(this.lockMap.get(lockType));
    }
    
    @Override
    public void unlock(String lockType, Object key) {
        synchronized (this.semaphore) {
            ConcurrentMap<Object, Long> map = (ConcurrentMap<Object, Long>) this.lockMap.get(lockType);
            if (map != null) {
                map.remove(key);
                LOGGER.info("手工释放类型为【{}】键值为【{}】的锁......", lockType, key);
            }
        }
    }
}

定义一个分布式锁管理服务实现

​ 定义一个DistributeLock服务,该服务作为本地服务,用来实现锁等待以及减少Hessian锁请求调用。在本地锁服务中注入原来的内存锁Hessian服务实现。具体代码如下:

/**
 * 分布式锁管理类
 *
 */
@Service
public class DistributeLock {
    /**
     * 注入hessian接口的实现类
     */
    @Resource(name="moemoryLockServiceFacade")
    private MemoryLockService memoryLockService;
    
    private Object semaphore = new Object(); 

    /**
     * 内存锁结构,双层Map 首层Map的Key存锁类型,value为内层Map。内层Map额key为锁键值,value为锁住的尝试远程hessian调用获取锁的线程
     */
    private ConcurrentMap<String, ConcurrentMap<String, Thread>> lockMap = new ConcurrentHashMap<String, ConcurrentMap<String, Thread>>();

    /**
     * 判断是否能够获得锁,不阻塞立即返回
     * @param lockType 锁类型
     * @param key  锁的键值
     * @return  true,能够获得锁.false,不能获得锁.
     */
    private boolean tryLock(String lockType, String key) {
        // 提升效率,先内部map判断是否存在锁,如果存在,则直接等待
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map == null) {
                map = new ConcurrentHashMap<String, Thread>();
                this.lockMap.put(lockType, map);
            }
            Thread t = map.putIfAbsent(key, Thread.currentThread());
            
             // 单个服务只有首先获得本机内存锁的线程才有机会去远程调用hessian服务判断是否有锁
            if (t != null && Thread.currentThread() != t) {
                return false;
            }
        }
        
        return memoryLockService.tryLock(lockType, key);
    }
    
    /**
     * 获得锁,在获得锁之前阻塞
     * @param lockType  锁类型
     * @param key   锁键值
     * @param timeout 超时时间
     * @throws TimeoutException 超时抛出超时异常
     */
    public void lock(String lockType, String key, long timeout) throws TimeoutException {
        // 是否没有超时设置,当传入的超时时间为负数或者为0时,表示没有超时时间
        boolean noTimeOutFlag = false;
        if (timeout <= 0L) {
            noTimeOutFlag = true;
        }

        long expireTime = System.currentTimeMillis() + timeout;
        do {
            if (tryLock(lockType, key))
                return;
            try {
                Thread.sleep(100L);
            } catch (InterruptedException localInterruptedException) {
            }
        } while ((noTimeOutFlag) || (System.currentTimeMillis() < expireTime));

        synchronized(this.semaphore) {
            // 需释放当前线程占用的本地内存锁
            this.lockMap.get(lockType).remove(key, Thread.currentThread());
        }
        
        throw new TimeoutException();
    }

    /**
     * 是否指定的锁类型,当前锁的数量为空
     * @param lockType 锁类型
     * @return true,当前锁类型的锁的数量为空;false,当前锁类型的锁锁的数量不为空
     */
    public boolean isLockTypeEmpty(String lockType){
        // 直接内部判断
        if (MapUtils.isNotEmpty(lockMap.get(lockType))) {
            return false;
        }
        
        // 内部判断成功还需远程调用判断
        return memoryLockService.isLockTypeEmpty(lockType);
    }
    
    /**
     * 释放锁
     * @param lockType 锁类型
     * @param key       锁的键值
     */
    public void unlock(String lockType, String key) {
        // 移除本机内存锁模型
        synchronized (this.semaphore) {
            ConcurrentMap<String, Thread> map = (ConcurrentMap<String, Thread>) this.lockMap.get(lockType);
            if (map != null)
                map.remove(key);
        }
        
        // 远程调用释放锁
        memoryLockService.unlock(lockType, key);
    }
}

​ 好了,分布式锁的Demo顺利完成。使用的时候只要将原来的MemoryLock替换成DistributeLock即可。


展望

​ 分布式锁的实现就到这里,其实现的本质在于将分布式转变成非分布式。这里也可以说我是钻了"分布式"的空子

​ 那么既然分布式锁的最终实现也是通过内存锁实现的,且利用了主节点的特性。那么其实我们在实现分布式锁之后,还有下面两个方向可以优化:

  1. 锁管理: 可以增加一个锁管理页面,来展示当前内存中存在的锁,以及移除需要马上移除的锁
  2. 主节点替换: 当前的分布式锁的实现还是依赖于主节点。考虑到主节点可能也挂掉,需要增加主节点可以动态切换的功能。严格上来讲这个是分布式改造剧集1应该实现的功能

后续

​ 好了,分布式锁的改造暂且到此。可以看到其实分布式其实并没有我们想象的这么复杂,分布式技术也没有特别地遥不可及。面对不断革新的技术,我们应该除了拿来主义之外,多思考,真正了解技术背后的实现原理。就像我一直认为的:相比于用*造*的能力要重要的多

黎明前最黑暗,成功前最绝望!
上一篇:非root用户安装redis


下一篇:ActiveMQ专题2: 持久化