.net 分布式架构之分布式锁实现

分布式锁

经常用于在解决分布式环境下的业务一致性和协调分布式环境。

实际业务场景中,比如说解决并发一瞬间的重复下单,重复确认收货,重复发现金券等。

使用分布式锁的场景一般不能太多。

开源地址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock

开源相关群: .net 开源基础服务 238543768

这里整理了C#.net关于redis分布式锁和zookeeper分布式锁的实现,仅用于研究。(可能有bug)

采用ServiceStack.Redis实现Redis分布式锁


 
/*
* Redis分布式锁
* 采用ServiceStack.Redis实现的Redis分布式锁
* 详情可阅读其开源代码
* 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同 xxf里面默认使用2.2版本
*/ public class RedisDistributedLock : BaseRedisDistributedLock
{
private ServiceStack.Redis.RedisLock _lock;
private RedisClient _client;
public RedisDistributedLock(string redisserver, string key)
: base(redisserver, key)
{ } public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("检测到当前锁已获取");
_client = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient();
/* * 阅读源码发现当其获取锁后,redis连接资源会一直占用,知道获取锁的资源释放后,连接才会跳出,可能会导致连接池资源的浪费。 */ try {
this._lock = new ServiceStack.Redis.RedisLock(_client, key, getlockTimeOut);
lockresult = LockResult.Success;
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
} public override void Dispose()
{
try {
if (this._lock != null)
this._lock.Dispose();
if (_client != null)
this._client.Dispose();
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}

来自网络的java实现Redis分布式锁(C#版)

/*
* Redis分布式锁
* 采用网络上java实现的Redis分布式锁
* 参考 http://www.blogjava.net/hello-yun/archive/2014/01/15/408988.html
* 详情可阅读其开源代码
*/ public class RedisDistributedLockFromJava : BaseRedisDistributedLock
{ public RedisDistributedLockFromJava(string redisserver, string key)
: base(redisserver, key)
{ } public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("检测到当前锁已获取");
try {
// 1. 通过SETNX试图获取一个lock
         string @lock = key;
long taskexpiredMilliseconds = (taskrunTimeOut != null ? (long)taskrunTimeOut.Value.TotalMilliseconds : (long)DistributedLockConfig.MaxLockTaskRunTime);
long getlockexpiredMilliseconds = (getlockTimeOut != null ? (long)getlockTimeOut.Value.TotalMilliseconds : );
long hassleepMilliseconds = ;
while (true)
{
using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
long value = CurrentUnixTimeMillis() + taskexpiredMilliseconds + ;
/*Java以前版本都是用SetNX,但是这种是无法设置超时时间的,不是很理解为什么,
              * 可能是因为原来的redis命令比较少导致的?现在用Add不知道效果如何.
                因对redis细节不了解,但个人怀疑若异常未释放锁经常发生,可能会导致内存逐步溢出*/               bool acquired = redisclient.Add<long>(@lock, value, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));
//SETNX成功,则成功获取一个锁
if (acquired == true)
{
lockresult = LockResult.Success;
}
//SETNX失败,说明锁仍然被其他对象保持,检查其是否已经超时
               else
               {
var oldValueBytes = redisclient.Get(@lock);
//超时
if (oldValueBytes != null && BitConverter.ToInt64(oldValueBytes, ) < CurrentUnixTimeMillis())
{
/*此处虽然重设并获取锁,但是超时时间可能被覆盖,故重设超时时间;若有进程一直在尝试获取锁,那么锁存活时间应该被延迟*/                   var getValueBytes = redisclient.GetSet(@lock, BitConverter.GetBytes(value));
var o1 = redisclient.ExpireEntryIn(@lock, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));//这里如果程序异常终止,依然会有部分锁未释放的情况。 // 获取锁成功 if (getValueBytes == oldValueBytes)
{
lockresult = LockResult.Success;
}
// 已被其他进程捷足先登了
else
{
lockresult = LockResult.GetLockTimeOutFailure;
}
}
//未超时,则直接返回失败
else
{
lockresult = LockResult.GetLockTimeOutFailure;
}
}
} //成功拿到锁
if (lockresult == LockResult.Success)
break; //获取锁超时
if (hassleepMilliseconds >= getlockexpiredMilliseconds)
{
lockresult = LockResult.GetLockTimeOutFailure;
break;
} //继续等待
System.Threading.Thread.Sleep(DistributedLockConfig.GetLockFailSleepTime);
hassleepMilliseconds += DistributedLockConfig.GetLockFailSleepTime;
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
} private long CurrentUnixTimeMillis()
{
return (long)(System.DateTime.UtcNow - new System.DateTime(, , , , , , System.DateTimeKind.Utc)).TotalMilliseconds;
} public override void Dispose()
{
if (lockresult == LockResult.Success || lockresult == LockResult.LockSystemExceptionFailure)
{
try {
long current = CurrentUnixTimeMillis();
using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
var v = redisclient.Get(key);
if (v != null) {
// 避免删除非自己获取得到的锁
if (current < BitConverter.ToInt64(v, ))
{
redisclient.Del(key);
}
}
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}
}

ServiceStack.Redis内部实现版本(较旧)

 
/*
* Redis分布式锁
* 采用ServiceStack.Redis实现的Redis分布式锁
* 详情可阅读其开源代码
* 备注:不同版本的 ServiceStack.Redis 实现reidslock机制不同
* 拷贝自网络开源代码 较旧的实现版本
*/ public class RedisDistributedLockFromServiceStack : BaseRedisDistributedLock
{
public RedisDistributedLockFromServiceStack(string redisserver, string key)
: base(redisserver, key)
{ }
public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("检测到当前锁已获取");
try
{
using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
ExecExtensions.RetryUntilTrue(
() =>
{
//This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx
//Calculate a unix time for when the lock should expire
                  TimeSpan realSpan = taskrunTimeOut ?? TimeSpan.FromMilliseconds(DistributedLockConfig.MaxLockTaskRunTime); //new TimeSpan(365, 0, 0, 0); //if nothing is passed in the timeout hold for a year DateTime expireTime = DateTime.UtcNow.Add(realSpan);
string lockString = (expireTime.ToUnixTimeMs() + ).ToString(); //Try to set the lock, if it does not exist this will succeed and the lock is obtained
var nx = redisClient.SetEntryIfNotExists(key, lockString);
if (nx)
{
lockresult = LockResult.Success;
return true;
} //If we've gotten here then a key for the lock is present. This could be because the lock is
//correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly).
//Therefore we need to get the value of the lock to see when it should expire
redisClient.Watch(key);
string lockExpireString = redisClient.Get<string>(key);
long lockExpireTime;
if (!long.TryParse(lockExpireString, out lockExpireTime))
{
redisClient.UnWatch(); // since the client is scoped externally
                      lockresult = LockResult.GetLockTimeOutFailure;
return false;
} //If the expire time is greater than the current time then we can't let the lock go yet
                   if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs())
{
redisClient.UnWatch(); // since the client is scoped externally
                     lockresult = LockResult.GetLockTimeOutFailure;
return false;
} //If the expire time is less than the current time then it wasn't released properly and we can attempt to //acquire the lock. The above call to Watch(_lockKey) enrolled the key in monitoring, so if it changes //before we call Commit() below, the Commit will fail and return false, which means that another thread //was able to acquire the lock before we finished processing. using (var trans = redisClient.CreateTransaction()) // we started the "Watch" above; this tx will succeed if the value has not moved {
trans.QueueCommand(r => r.Set(key, lockString));
//return trans.Commit(); //returns false if Transaction failed var t = trans.Commit();
if (t == false)
lockresult = LockResult.GetLockTimeOutFailure;
else
lockresult = LockResult.Success;
return t;
}
},
getlockTimeOut
); }
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁系统级别严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
} public override void Dispose()
{
try {
using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
redisClient.Remove(key);
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式尝试锁释放严重异常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}

Zookeeper 版本实现分布式锁

/*  * 来源java网络源码的zookeeper分布式锁实现(目前仅翻译并简单测试ok,未来集成入sdk)
* 备注: 共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,
然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,
如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,
从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。
*/ public class ZooKeeprDistributedLockFromJava : IWatcher
{
private ZooKeeper zk;
private string root = "/locks"; //根
private string lockName; //竞争资源的标志
private string waitNode; //等待前一个锁
private string myZnode; //当前锁
//private CountDownLatch latch; //计数器
private AutoResetEvent autoevent;
private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds();
private IList<Exception> exception = new List<Exception>(); /// <summary>
/// 创建分布式锁,使用前请确认config配置的zookeeper服务可用 </summary>
/// <param name="config"> 127.0.0.1:2181 </param>
/// <param name="lockName"> 竞争资源标志,lockName中不能包含单词lock </param>
public ZooKeeprDistributedLockFromJava(string config, string lockName)
{
this.lockName = lockName;
// 创建一个与服务器的连接
try
       {
zk = new ZooKeeper(config, sessionTimeout, this);
var stat = zk.Exists(root, false);
if (stat == null)
{
// 创建根节点
zk.Create(root, new byte[], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
}
}
catch (KeeperException e)
{
throw e;
}
} /// <summary>
/// zookeeper节点的监视器
/// </summary>
public virtual void Process(WatchedEvent @event)
{
if (this.autoevent != null)
{
this.autoevent.Set();
}
} public virtual bool tryLock()
{
try {
string splitStr = "_lock_";
if (lockName.Contains(splitStr))
{
//throw new LockException("lockName can not contains \\u000B");
          }
//创建临时子节点
myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
Console.WriteLine(myZnode + " is created ");
//取出所有子节点
IList<string> subNodes = zk.GetChildren(root, false);
//取出所有lockName的锁
IList<string> lockObjNodes = new List<string>();
foreach (string node in subNodes)
{
if (node.StartsWith(lockName))
{
lockObjNodes.Add(node);
}
}
Array alockObjNodes = lockObjNodes.ToArray();
Array.Sort(alockObjNodes);
Console.WriteLine(myZnode + "==" + lockObjNodes[]);
if (myZnode.Equals(root + "/" + lockObjNodes[]))
{
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + );
waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - ];
}
catch (KeeperException e)
{
throw e;
}
return false;
} public virtual bool tryLock(TimeSpan time)
{
try {
if (this.tryLock())
{
return true;
}
return waitForLock(waitNode, time);
}
catch (KeeperException e)
{
throw e;
}
return false;
} private bool waitForLock(string lower, TimeSpan waitTime)
{
var stat = zk.Exists(root + "/" + lower, true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if (stat != null)
{
Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + " waiting for " + root + "/" + lower);
autoevent = new AutoResetEvent(false);
bool r = autoevent.WaitOne(waitTime);
autoevent.Dispose();
autoevent = null;
return r;
}
else return true;
} public virtual void unlock()
{
try {
Console.WriteLine("unlock " + myZnode);
zk.Delete(myZnode, -);
myZnode = null;
zk.Dispose();
}
catch (KeeperException e)
{
throw e;
}
} }

以上代码仅做参考,未压测。

代码粘贴有些问题,详细请下载开源包运行研究。

上一篇:Servlet接收JSP参数乱码问题解决办法


下一篇:python 3.6 + numpy + matplotlib + opencv + scipy 安装