一、单进程多线程的锁--线程锁
锁住线程的锁叫线程锁,像C#中的lock,Monitor,让线程排队,同一时刻只能有一个线程进来,让线程同步排队。
二、多进程的锁--分布式锁
锁住进程的锁就叫分布式锁,是锁住进程的一种机制,让进程排队。
三、电商秒杀场景
1、单体架构
并发量不够,秒杀服务只能并发1000,而客户端同时发送3000个请求。
2、集群架构
这时候就需要多两个角色,一个角色是网关,一个角色是秒杀集群,网关把用户请求转发到3个秒杀服务,这样每个秒杀服务并发1000个请求,就能够满足客户端同时发送3000个请求。
四、秒杀服务集群带来新的问题
第1个请求进入到秒杀服务1里面,查询数据库商品库存是10,判断有库存,扣减库存,更新数据库,当前库存是9。
第2个请求进入到秒杀服务2里面,查询数据库商品库存是10,判断有库存,扣减库存,更新数据库,当前库存是9。
第3个请求进入到秒杀服务3里面,查询数据库商品库存是10,判断有库存,扣减库存,更新数据库,当前库存是9。
实际库存只减少了1个,但是同1个商品被3个人秒杀到了,这就是超卖问题。
五、分布式锁解决什么问题?
分布式系统中,涉及到多个进程共享资源的时候,就需要使用分布式锁。
谁持有了锁,谁才能操作数据库扣减库存。
六、运行效果
1、单进程发起20个线程模拟20个用户并发请求,秒杀商品,会发现20个线程,20个请求秒杀到同1个商品。
2、对于单进程可以加lock锁解决超卖问题
商品库存有10个,开启20个线程秒杀商品,有10个请求分别秒杀到不同的商品,另外10个线程没有秒杀到商品,因为库存只有10个。
3、我现在把相同的代码Copy一份,新建个工程MyRedis.SecKill.MultiProcess.Other,也同样使用了lock锁,快速的启动2个进程,每个进程中开启20个线程就发现lock锁不住了,lock锁失效了,同一个商品编号10被2个不同的进程中的线程秒杀到了。
我们看到单进程通过加lock锁可以保证不发生超卖问题,10个线程秒杀到商品,商品编号不同,另外10个线程没有秒杀到商品。
但是因为为了提高并发量,现在是秒杀服务集群提供秒杀服务了,我们在两个秒杀服务进程中都开启20个线程去秒杀商品,就会发现如图所示控制不住了,两个进程中的线程都秒杀到同一个商品了(这里用商品库存当做商品编号),那么如何解决跨进程并发引起的商品超卖问题?这就需要分布式锁了。
七、封装Redis分布式锁--解决跨进程并发秒杀超卖问题
1、秒杀服务端
namespace MyRedis.SecKill.MultiProcess.SecKill { /// <summary> /// 商品秒杀服务 /// </summary> public class ProductSecKill { /// <summary> /// 秒杀方法 /// </summary> public void SecKillProduct() { RedisLock redisLock = new RedisLock(); redisLock.Lock(); //lock (this)//只是适合单进程 //{ // 1、获取商品库存 var productStock = GetPorductStocks(); // 2、判断商品库存是否为空 if (productStock.Conut == 0) { // 2.1 秒杀失败消息 Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:不好意思,秒杀已结束,商品编号:{productStock.Conut}"); redisLock.UnLock(); return; } // 3、秒杀成功消息 Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:恭喜你,秒杀成功,商品编号:{productStock.Conut}"); // 4、扣减商品库存 SubtracPorductStocks(productStock); //} redisLock.UnLock(); } /// <summary> /// 获取商品库存 /// </summary> /// <returns></returns> private Product_Stock GetPorductStocks() { using (ShoppingEntities shoppingEntities = new ShoppingEntities()) { // 1、查询数据库获取库存,获取第一个商品的库存数 Product_Stock productStock = shoppingEntities.Product_Stock.FirstOrDefault(s => s.Id == 1); // 2、返回库存 return productStock; } } /// <summary> /// 扣减商品库存 /// </summary> private void SubtracPorductStocks(Product_Stock stocks) { using (ShoppingEntities shoppingEntities = new ShoppingEntities()) { // 1、扣减商品库存 Product_Stock updateStocks = shoppingEntities.Product_Stock.FirstOrDefault(s => s.Id == stocks.Id); updateStocks.Conut = stocks.Conut - 1; // 2、更新数据库 shoppingEntities.SaveChanges(); } } } }
2、秒杀客户端
namespace MyRedis.SecKill.MultiProcess { class Program { static void Main(string[] args) { // 1、开始秒杀 ClientRequest.SendRequest(20); Console.ReadKey(); } } }
namespace MyRedis.SecKill.MultiProcess.SecKill { class ClientRequest { /// <summary> /// 客户端请求 /// </summary> /// <param name="threadCount">线程数</param> public static void SendRequest(int threadCount) { // 1、商品秒杀服务 ProductSecKill productSecKill = new ProductSecKill(); // 2、创建20个请求来秒杀 for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(() => { productSecKill.SecKillProduct(); }); thread.Start(); } } } }
3、Redis分布式锁
封装分布式锁4要素
3.1、锁名
3.2、加锁操作
锁对象,也就是谁持有这把锁,持有锁的才能解锁
3.3、解锁操作
3.4、锁超时时间
namespace MyRedis.SecKill.MultiProcess.Locks { /// <summary> /// redis分布式锁 /// 分布式锁四要素 /// 1、锁名 /// 2、加锁操作 /// 3、解锁操作 /// 4、锁超时时间 /// </summary> class RedisLock { // 1、redis连接管理类 private ConnectionMultiplexer connectionMultiplexer = null; // 2、redis数据操作类 private IDatabase database = null; public RedisLock() { connectionMultiplexer = ConnectionMultiplexer.Connect("localhost:6379"); database = connectionMultiplexer.GetDatabase(0); } /// <summary> /// 1、加锁 /// </summary> public void Lock() { // 1、redis加锁api--LockTake // key--锁名--redis_lock // value--锁对象(谁持有这把锁)--进程Id+线程Id // expiry--锁超时时间,为什么?解锁死锁问题! // 2、如果加锁失败?循环加锁,对于未知的事情用循环 while (true) { bool flag = database.LockTake("redis_lock", "ProcessNo1" +Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(10)); // 3、如果加锁成功,则退出循环 if (flag) { break; } // 3.1 加锁失败,线程休眠下,走循环,再尝试加锁 Thread.Sleep(200); } } /// <summary> /// 2、解锁 /// </summary> public void UnLock() { //1、redis解锁api--LockRelease // key--锁名--redis_lock // value--锁对象(谁持有这把锁)--进程Id+线程Id--使加锁和解锁是同一个对象 // 2、如果解锁失败?循环解锁,对于未知的事情用循环 bool flag = database.LockRelease("redis_lock", "ProcessNo1" +Thread.CurrentThread.ManagedThreadId); while (true) { // 3、如果解锁成功,则退出循环 if (flag) { break; } // 3.1 解锁失败,线程休眠下,走循环,再尝试解锁 Thread.Sleep(200); } // 4、关闭资源 connectionMultiplexer.Dispose(); } } }
八、再次运行效果
最后我们发现库存36个商品,2个进程,每个进程开启20个线程,都是不同的商品编号没有秒杀到同一件商品。
九、项目结构
十、思考Redis集群环境死锁
如果在Redis Master上持有了锁,但是Redis Master宕机了,需要把Redis Slave提成Redis Master,但是原来的Redis Master的锁没有释放,造成死锁了怎么办?