一. Gevent实例
import gevent
import requests
from gevent import monkey
# socket发送请求以后就会进入等待状态,gevent更改了这个机制
# socket.setblocking(False) -->发送请求后就不会等待服务器响应
monkey.patch_all() # 找到内置的socket并更改为gevent自己的东西
def fetch_async(method, url, req_kwargs):
print(method, url, req_kwargs)
response = requests.request(method=method, url=url, **req_kwargs)
print(response.url, response.content)
# ##### 发送请求 #####
gevent.joinall([
# 这里spawn是3个任务[实际是3个协程],每个任务都会执行fetch_async函数
gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])
二. grequests实例
grequests实际上就是封装了gevent里面的方法,然后配合requests实现异步的IO
grequests = gevent + request
grequests.map() 内部实现
import grequests # 实际上就是requests + gevent
request_list = [
# 发送get请求
grequests.get('https://www.baidu.com/', timeout=10.001),
grequests.get('https://www.taobao.com/'),
grequests.get('https://hao.360.cn/')
]
# ##### 执行并获取响应列表 #####
response_list = grequests.map(request_list) # 实际上内部循环执行gevent内部的joinall()方法
print(response_list)
# ##### 执行并获取响应列表(处理异常) #####
# def exception_handler(request, exception):
# print(request,exception)
# print("Request failed")
# response_list = grequests.map(request_list, exception_handler=exception_handler)
# print(response_list)
转载至:https://www.cnblogs.com/ftl1012/p/9424822.html
三. 项目中的应用
def user_batch_import(self, token, file, data):
v = self._user_role(token, {})
if "errcode" in v.keys():
return v
if 'user[college_id]' not in v:
return {"errcode": 2, "errmsg": str(v)}
import xlrd
show_url = Graphics.upload_image(file, data["context_type"], data["content_type"])
data_path = Graphics.format_nginx_data_url(show_url)
book = xlrd.open_workbook(data_path)
names = book.sheet_names()
res = None
for name in names:
sheet = book.sheet_by_name(name)
nrows = sheet.nrows # 行
if nrows == 0:
print(nrows)
continue
else:
q = queue.Queue(maxsize=nrows)
threading.Thread(args=(q,))
first_line = sheet.row_values(0)
if first_line != ['编号', '姓名', '邮箱', '密码', '电话', '地址', '角色'] and len(first_line) != 7:
return {"errcode": 400, "errmsg": u"不支持此格式顺序的输入;正确格式:['编号', '姓名', '邮箱', '密码', '电话', '地址', '角色']"}
for temp in range(1, nrows):
rows = sheet.row_values(temp, start_colx=0, end_colx=None)
if rows[6] == "老师":
rows[6] = "TeacherEnrollment"
elif rows[6] == "学生":
rows[6] = "StudentEnrollment"
elif rows[6] == "助教":
rows[6] = "TaEnrollment"
elif rows[6] == "设计者":
rows[6] = "DesignerEnrollment"
elif rows[6] == "观察者":
rows[6] = "ObserverEnrollment"
else:
return {"errcode": 400, "errmsg": u"所写角色不存在,可选:【老师, 学生, 助教, 观察者, 设计者】"}
query = {"user[name]": rows[1], "pseudonym[unique_id]": rows[2], "pseudonym[password]": rows[3],
"user[tel]": rows[4], "user[company]": rows[5], "user[time_zone]": "Beijing",
"user[locale]": "zh-Hans", "user[terms_of_use]": True, "user[role_name]": rows[6],
"user[college_id]": v["user[college_id]"]}
q.put(query)
res = self._user_batch_import(q)
return {"errcode": 0, "errmsg": str(res)}
def _user_batch_import(self, q):
from app.api.apis import very_email, very_password, very_phone
header = {"Authorization": "Bearer %s" % AccessToken}
request_res = []
while q.qsize() > 0:
query = q.get()
if very_email(query["pseudonym[unique_id]"]) and very_phone(query["user[tel]"], "tel") and very_password(
query["pseudonym[password]"], "password"):
pass
else:
del query
response = grequests.post(API_CANVAS_HOST + '/api/v1/accounts/{account_id}/users'.format(account_id=1),
headers=header, data=query)
request_res.append(response)
res = grequests.map(request_res)
return res