关于python协程中aiorwlock 使用问题

最近工作中多个项目都开始用asyncio aiohttp aiomysql aioredis ,其实也是更好的用python的协程,但是使用的过程中也是遇到了很多问题,最近遇到的就是

关于aiorwlock 的问题,在使用中碰到了当多个协程同时来请求锁的时候 在其中一个还没释放锁的时候,另外一个协程也获取到锁,这里进行整理,也希望知道问题你解决方法的,一起讨论一下,正好最近经常用到协程的东西,所以准备建一个群,也欢迎大家一起进来讨论python协程的内容,群号:692953542

关于场景的描述

关于python协程中aiorwlock 使用问题

数据库的要操作的表的信息为:

id name nickname count flag crdate
1 800100 aa 100 1 2018-11-18 10:07:22
2 800101 bb 200 1 2018-11-18 10:07:23

当多个请求都到数据库操作接口程序的时候,针对同一个name的count进行增加或者减少,就要保证操作的同一个时刻只有一个可以去获取count的值并进行update操作,所以我是在这一步增加了锁,因为使用aiohttp写的,所以想要在这里也用了aiorwlock,但是在我测试的过程中发现了,当一个协程获取锁还没释放锁的时候,另外一个协程也获取到锁,下面我是具体的代码

程序代码

核心的处理类:

class CntHandler(object):

    def __init__(self, db, loop):
self.db = db
self.loop = loop
self.company_lock = {} def response(self, request, msg):
peer = request.transport.get_extra_info('peername')
logging.info("request url[%s] from[%s]: %s", request.raw_path, peer, msg)
origin = request.headers.get("Origin")
if origin is not None:
headers = {"Access-Control-Allow-Origin": origin, "Access-Control-Allow-Credentials": "true"}
resp = web.Response(text=util.dictToJson(msg), content_type='application/json', headers=headers)
else:
resp = web.Response(text=util.dictToJson(msg), content_type='application/json')
return resp async def cnt_set(self, request):
"""
用于设置company表中的count值
:param request:
:return:
"""
post = await request.post()
logging.info('post %s', post)
company_name = post.get("company")
cnt = post.get("cnt")
sql = "update shield.company set count=%s where name=%s"
args_values = [cnt, company_name]
rwlock = self.company_lock.get(company_name, "")
if not rwlock:
rwlock = aiorwlock.RWLock(loop=self.loop)
self.company_lock[company_name] = rwlock
async with rwlock.writer:
msg = dict()
po_sql = "select * from shield.company where name=%s"
po = await self.db.get(po_sql, company_name)
if not po: # 找不到企业
logging.error("not found company name [%s]", company_name)
msg["code"] = 404
msg["code"] = "not found company"
return self.response(request, msg)
res = await self.db.execute(sql, args_values)
if not isinstance(res, int):
logging.error("sql update is err:", res)
msg["code"] = 403
msg["reason"] = "set fail"
return self.response(request, msg)
logging.info("company [%s] set cnt [%s] is success", company_name, cnt)
msg["code"] = 200
msg["reason"] = "ok"
return self.response(request, msg) async def cnt_inc(self, request):
"""
用于增加company表中的count值
:param request:
:return:
"""
post = await request.post()
logging.info('post %s', post)
company_name = post.get("company")
cnt = int(post.get("cnt", 0))
rwlock = self.company_lock.get(company_name, "")
if not rwlock:
rwlock = aiorwlock.RWLock(loop=self.loop)
self.company_lock[company_name] = rwlock
async with rwlock.writer:
uuid_s = uuid.uuid1().hex
logging.debug("[%s]---[%s]", uuid_s, id(rwlock))
msg = dict()
sql = "select * from shield.company where name=%s"
po = await self.db.get(sql, company_name)
if not po: # 找不到企业
logging.error("not found company name [%s]", company_name)
msg["code"] = 404
msg["code"] = "not found company"
return self.response(request, msg)
old_cnt = po.get("count")
po_cnt = int(po.get("count"))
res = po_cnt + cnt
update_sql = "update shield.company set count=%s where name=%s"
args_values = [res, company_name]
update_res = await self.db.execute(update_sql, args_values)
if not isinstance(update_res, int): # 数据库update失败
logging.error("sql update is err:", update_res)
msg["code"] = 403
msg["reason"] = "inc fail"
return self.response(request, msg)
logging.info("uuid [%s] lock [%s] company [%s] inc cnt [%s] old cnt [%s] true will is [%s] success", uuid_s,id(rwlock), company_name, cnt, old_cnt, res)
msg["code"] = 200
msg["reason"] = "ok"
return self.response(request, msg) async def cnt_dec(self, request):
"""
用于减少company表中count的值
:param request:
:return:
"""
post = await request.post()
logging.info('post %s', post)
company_name = post.get("company")
cnt = int(post.get("cnt", 0))
rwlock = self.company_lock.get(company_name, "")
if not rwlock:
rwlock = aiorwlock.RWLock(loop=self.loop)
self.company_lock[company_name] = rwlock
async with rwlock.writer:
uuid_s = uuid.uuid1().hex
logging.debug("[%s]---[%s]", uuid_s, id(rwlock))
msg = dict()
sql = "select * from shield.company where name=%s"
po = await self.db.get(sql, company_name)
if not po: # 找不到企业
logging.error("not found company name [%s]", company_name)
msg["code"] = 404
msg["code"] = "not found company"
return self.response(request, msg)
po_cnt = int(po.get("count"))
old_cnt = po.get("count")
if po_cnt == 0:
logging.error("company [%s] cnt is 0", company_name)
msg["code"] = 400
msg["reason"] = "cnt is 0"
return self.response(request, msg)
if po_cnt < cnt: # 数据库余额不足
logging.error("company [%s] count is not enough", company_name)
msg["code"] = 405
msg["reason"] = "count is not enough"
return self.response(request, msg)
res = po_cnt - cnt
update_sql = "update shield.company set count=%s where name=%s"
args_values = [res, company_name]
update_res = await self.db.execute(update_sql, args_values)
if not isinstance(update_res, int): # 执行update 失败
logging.error("sql update is err:", update_res)
msg["code"] = 403
msg["reason"] = "inc fail"
return self.response(request, msg)
logging.info("uuid [%s] lock [%s] company [%s] dec cnt [%s] old cnt [%s] true will is [%s] success",uuid_s,id(rwlock), company_name, cnt, old_cnt, res) msg["code"] = 200
msg["reason"] = "ok"
return self.response(request, msg)

上面代码出问题的代码是在增加和减少的时候:

async with rwlock.writer:

在一个协程还没有释放锁的时候,另外一个操作也就进来了,到之后我在测试并发的时候,对同一个name的count进行操作导致最后的count值不符合的问题

可能是我本身代码的问题,或者我哪里处理的不对,欢迎大家一起讨论

这个完整的代码地址:https://github.com/pythonsite/test_aiorwlock

上一篇:Python协程与Go协程的区别二


下一篇:【算法】万字九大排序宝藏汇总 轻松拿下九大排序算法【带动画】 (包含超详细的解释和注释)