async_send

# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: async_send.py
# Month: 五月
# time: 2020/5/13 16:06
""" 异步请求方法 """
import os
import six
import json
import binascii
from abc import ABC
from io import BytesIO
from urllib.parse import urlencode
from requests.utils import requote_uri
from tornado.httpclient import AsyncHTTPClient, HTTPError

__all__ = ["TornadoSend", "encode_multipart_form_data"]


class AsyncHttpClient(AsyncHTTPClient, ABC):
    """ 给 AsyncHTTPClient 加 with 方法 """

    async def __aenter__(self, max_clients=500):
        return AsyncHTTPClient(max_clients=max_clients)

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __enter__(self):
        return AsyncHTTPClient()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


class TornadoSend:

    """ TornadoSend 网络请求 """

    @staticmethod
    async def send(url, *, data=None, headers=None, params=None, user_agent=None, http_error_callback=None, **kwargs):
        """
        :param url:
        :param http_error_callback: http_error_callback(code:int, e:HTTPError) -> None
        :param data:
        :param params:
        :param user_agent:
        :param headers:Optional[Union[Dict[str, str], httputil.HTTPHeaders]] = None,
        timeout params:Optional[float] = None,
        :param kwargs:
        body: Optional[Union[bytes, str]] = None,
        auth_username: Optional[str] = None,
        auth_password: Optional[str] = None,
        auth_mode: Optional[str] = None,
        connect_timeout: Optional[float] = None,
        if_modified_since: Optional[Union[float, datetime.datetime]] = None,
        follow_redirects: Optional[bool] = None,
        max_redirects: Optional[int] = None,
        use_gzip: Optional[bool] = None,
        network_interface: Optional[str] = None,
        streaming_callback: Optional[Callable[[bytes], None]] = None,
        header_callback: Optional[Callable[[str], None]] = None,
        prepare_curl_callback: Optional[Callable[[Any], None]] = None,
        proxy_host: Optional[str] = None,
        proxy_port: Optional[int] = None,
        proxy_username: Optional[str] = None,
        proxy_password: Optional[str] = None,
        proxy_auth_mode: Optional[str] = None,
        allow_nonstandard_methods: Optional[bool] = None,
        validate_cert: Optional[bool] = None, 验证证书
        ca_certs: Optional[str] = None,
        allow_ipv6: Optional[bool] = None,
        client_key: Optional[str] = None,
        client_cert: Optional[str] = None,
        body_producer: Optional[
            Callable[[Callable[[bytes], None]], "Future[None]"]
        ] = None,
        expect_100_continue: bool = False,
        decompress_response: Optional[bool] = None,
        ssl_options: Optional[Union[Dict[str, Any], ssl.SSLContext]] = None,
        :return:
        """
        url_code = urlencode(params or dict())
        uri = requote_uri(url)
        if "?" in uri:
            request = f"{uri}&{url_code}".rstrip("&")
        else:
            request = f"{uri}?{url_code}".rstrip('?')
        method = (kwargs.pop("method", 'GET')).upper()
        if method == "GET":
            data = None
        elif isinstance(data, dict):
            if headers and "application/x-www-form-urlencoded" in headers.values():
                data = urlencode(data)
        elif isinstance(data, (list, tuple)):
            data = json.dumps(data)
        if not headers:
            user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " \
                         "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
        request_timeout = kwargs.pop("request_timeout", None)
        timeout = int(kwargs.pop("timeout", 180))
        if request_timeout:
            timeout = int(request_timeout)
        connect_timeout = int(kwargs.pop("connect_timeout", 180))
        async with AsyncHttpClient() as http_client:
            try:
                return await http_client.fetch(
                    request=request,
                    method=method,
                    headers=headers or {"Content-Type": "application/json"},
                    request_timeout=timeout,
                    connect_timeout=connect_timeout,
                    body=data,
                    user_agent=user_agent,
                    validate_cert=kwargs.pop(
                        "validate_cert", False),  # 验证证书 True or False
                    **kwargs
                )
            except HTTPError as e:  # type: HTTPError
                if callable(http_error_callback):
                    http_error_callback(e.code, e=e)
                else:
                    raise e

    @staticmethod
    async def get(url, params=None, **kwargs):
        """
         get 请求数据
        """
        return await TornadoSend.send(url=url, params=params, **kwargs)

    @staticmethod
    async def post(url, params=None, data=None, **kwargs):
        """
        post 请求数据
        """
        return await TornadoSend.send(url=url, params=params, data=data, method='post', **kwargs)


def encode_multipart_form_data(data=None, files=None):
    """
    data is a sequence of {name:value} elements for regular form fields.
    files is a sequence of {filename:value} elements for data to be
    uploaded as files.
    Return (content_type, body) ready for httplib.HTTP instance
    files
        todo example one:
            多个文件
                [
                    远程接收名字          文件名      文件内容
                    {remote_accpt_name: {file_name: content}},
                    {remote_accpt_name2: {file_name2: content2}},
                ]
            单个文件
                远程接收名字          文件名      文件内容
                {remote_accpt_name: {file_name: content}}
         todo example two:
             多个文件
                [
                    远程接收名字          路径
                    {remote_accpt_name: file_path},
                    {remote_accpt_name2: file_path2},
                ]
            单个文件
                远程接收名字          路径
                {remote_accpt_name: file_path }
    data 数据
    """
    data = data or dict()
    files = [] if not files else [files] if isinstance(files, dict) else files

    with BytesIO() as body:
        boundary = choose_boundary()
        for key, value in data.items():
            body.write(f'--{boundary}\r\n'.encode(encoding="utf-8"))
            body.write(
                f'Content-Disposition:form-data;name="{key}"\r\n\r\n'.encode(encoding="utf-8"))
            if isinstance(value, int):
                value = str(value)
            body.write(f'{value}\r\n'.encode(encoding="utf-8"))

        for file in files:
            for remote_name, value in file.items():
                body.write(f'--{boundary}\r\n'.encode(encoding="utf-8"))
                if isinstance(value, str):
                    if not os.path.isfile(value):
                        raise FileNotFoundError("文件不存在")
                    file_name = os.path.basename(value)
                    with open(value, 'rb') as fd:
                        content = fd.read()
                elif isinstance(value, dict):
                    for file_name, content in value.items():
                        break
                else:
                    raise ValueError("文件格式不支持")
                body.write(f'Content-Disposition: form-data; '
                           f'name="{remote_name}"; '
                           f'filename="{file_name}"'
                           f';filelength="{len(content)}"'
                           f'\r\n\r\n'.encode('utf8'))
                body.write(content)
                body.write(b"\r\n")
        body.write(f'--{boundary}--\r\n'.encode("utf-8"))
        content_type = 'multipart/form-data;boundary=%s' % boundary
        getvalue = body.getvalue()
    content_length = str(len(getvalue))
    return content_type, getvalue, content_length


def choose_boundary():
    """
    Our embarrassingly-simple replacement for mimetools.choose_boundary.
    """
    boundary = binascii.hexlify(os.urandom(16))
    if six.PY3:
        boundary = boundary.decode('ascii')
    return boundary

上一篇:shell编程之免交互


下一篇:Shell 编程-免交互