# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: async_send.py
# Month: 五月
# time: 2020/5/13 16:06
""" 异步请求方法 """
import os
import six
import json
import binascii
from abc import ABC
from io import BytesIO
from urllib.parse import urlencode
from requests.utils import requote_uri
from tornado.httpclient import AsyncHTTPClient, HTTPError
__all__ = ["TornadoSend", "encode_multipart_form_data"]
class AsyncHttpClient(AsyncHTTPClient, ABC):
""" 给 AsyncHTTPClient 加 with 方法 """
async def __aenter__(self, max_clients=500):
return AsyncHTTPClient(max_clients=max_clients)
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.close()
def __enter__(self):
return AsyncHTTPClient()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class TornadoSend:
""" TornadoSend 网络请求 """
@staticmethod
async def send(url, *, data=None, headers=None, params=None, user_agent=None, http_error_callback=None, **kwargs):
"""
:param url:
:param http_error_callback: http_error_callback(code:int, e:HTTPError) -> None
:param data:
:param params:
:param user_agent:
:param headers:Optional[Union[Dict[str, str], httputil.HTTPHeaders]] = None,
timeout params:Optional[float] = None,
:param kwargs:
body: Optional[Union[bytes, str]] = None,
auth_username: Optional[str] = None,
auth_password: Optional[str] = None,
auth_mode: Optional[str] = None,
connect_timeout: Optional[float] = None,
if_modified_since: Optional[Union[float, datetime.datetime]] = None,
follow_redirects: Optional[bool] = None,
max_redirects: Optional[int] = None,
use_gzip: Optional[bool] = None,
network_interface: Optional[str] = None,
streaming_callback: Optional[Callable[[bytes], None]] = None,
header_callback: Optional[Callable[[str], None]] = None,
prepare_curl_callback: Optional[Callable[[Any], None]] = None,
proxy_host: Optional[str] = None,
proxy_port: Optional[int] = None,
proxy_username: Optional[str] = None,
proxy_password: Optional[str] = None,
proxy_auth_mode: Optional[str] = None,
allow_nonstandard_methods: Optional[bool] = None,
validate_cert: Optional[bool] = None, 验证证书
ca_certs: Optional[str] = None,
allow_ipv6: Optional[bool] = None,
client_key: Optional[str] = None,
client_cert: Optional[str] = None,
body_producer: Optional[
Callable[[Callable[[bytes], None]], "Future[None]"]
] = None,
expect_100_continue: bool = False,
decompress_response: Optional[bool] = None,
ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
:return:
"""
url_code = urlencode(params or dict())
uri = requote_uri(url)
if "?" in uri:
request = f"{uri}&{url_code}".rstrip("&")
else:
request = f"{uri}?{url_code}".rstrip('?')
method = (kwargs.pop("method", 'GET')).upper()
if method == "GET":
data = None
elif isinstance(data, dict):
if headers and "application/x-www-form-urlencoded" in headers.values():
data = urlencode(data)
elif isinstance(data, (list, tuple)):
data = json.dumps(data)
if not headers:
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " \
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
request_timeout = kwargs.pop("request_timeout", None)
timeout = int(kwargs.pop("timeout", 180))
if request_timeout:
timeout = int(request_timeout)
connect_timeout = int(kwargs.pop("connect_timeout", 180))
async with AsyncHttpClient() as http_client:
try:
return await http_client.fetch(
request=request,
method=method,
headers=headers or {"Content-Type": "application/json"},
request_timeout=timeout,
connect_timeout=connect_timeout,
body=data,
user_agent=user_agent,
validate_cert=kwargs.pop(
"validate_cert", False), # 验证证书 True or False
**kwargs
)
except HTTPError as e: # type: HTTPError
if callable(http_error_callback):
http_error_callback(e.code, e=e)
else:
raise e
@staticmethod
async def get(url, params=None, **kwargs):
"""
get 请求数据
"""
return await TornadoSend.send(url=url, params=params, **kwargs)
@staticmethod
async def post(url, params=None, data=None, **kwargs):
"""
post 请求数据
"""
return await TornadoSend.send(url=url, params=params, data=data, method='post', **kwargs)
def encode_multipart_form_data(data=None, files=None):
"""
data is a sequence of {name:value} elements for regular form fields.
files is a sequence of {filename:value} elements for data to be
uploaded as files.
Return (content_type, body) ready for httplib.HTTP instance
files
todo example one:
多个文件
[
远程接收名字 文件名 文件内容
{remote_accpt_name: {file_name: content}},
{remote_accpt_name2: {file_name2: content2}},
]
单个文件
远程接收名字 文件名 文件内容
{remote_accpt_name: {file_name: content}}
todo example two:
多个文件
[
远程接收名字 路径
{remote_accpt_name: file_path},
{remote_accpt_name2: file_path2},
]
单个文件
远程接收名字 路径
{remote_accpt_name: file_path }
data 数据
"""
data = data or dict()
files = [] if not files else [files] if isinstance(files, dict) else files
with BytesIO() as body:
boundary = choose_boundary()
for key, value in data.items():
body.write(f'--{boundary}\r\n'.encode(encoding="utf-8"))
body.write(
f'Content-Disposition:form-data;name="{key}"\r\n\r\n'.encode(encoding="utf-8"))
if isinstance(value, int):
value = str(value)
body.write(f'{value}\r\n'.encode(encoding="utf-8"))
for file in files:
for remote_name, value in file.items():
body.write(f'--{boundary}\r\n'.encode(encoding="utf-8"))
if isinstance(value, str):
if not os.path.isfile(value):
raise FileNotFoundError("文件不存在")
file_name = os.path.basename(value)
with open(value, 'rb') as fd:
content = fd.read()
elif isinstance(value, dict):
for file_name, content in value.items():
break
else:
raise ValueError("文件格式不支持")
body.write(f'Content-Disposition: form-data; '
f'name="{remote_name}"; '
f'filename="{file_name}"'
f';filelength="{len(content)}"'
f'\r\n\r\n'.encode('utf8'))
body.write(content)
body.write(b"\r\n")
body.write(f'--{boundary}--\r\n'.encode("utf-8"))
content_type = 'multipart/form-data;boundary=%s' % boundary
getvalue = body.getvalue()
content_length = str(len(getvalue))
return content_type, getvalue, content_length
def choose_boundary():
"""
Our embarrassingly-simple replacement for mimetools.choose_boundary.
"""
boundary = binascii.hexlify(os.urandom(16))
if six.PY3:
boundary = boundary.decode('ascii')
return boundary