httpx_send_file

# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: httpx_send_file.py
# Month: 九月
# time: 2021/9/16 15:37
""" Write an introduction to the module here """
import httpx
import os

__all__ = ["send_file", "async_send_file"]


def send_file(url, files, clint: httpx.Client = None, *args, **kwargs):
    """

    @param url:
    @param files:
            FileTypes = Union[
                # file (or text or path)
                FileContent,
                # (filename, file (or text))
                Tuple[str, FileContent],
                # (filename, file (or text), content_type)
                Tuple[str, FileContent, str],
            ]
        Files = Union[dict[str, FileTypes], [[str, FileTypes]], [dict[str, FileTypes]]]
        Files = {"remote_name": {"file_name": Union[path, bytes]}}
        Files = path
        Files = bytes
    @param args:
    param remote_name = None 远程接收名字
    @param kwargs:
    @param clint:
    @return:
    """
    files = __md5_check(__parse_file(files, kwargs))
    close_flag = False
    if not clint:
        clint = httpx.Client(
            verify=kwargs.pop("verify", True),
            cert=kwargs.pop("cert", None),
            http2=kwargs.pop("http2", False),
            event_hooks=kwargs.pop("event_hooks", None),
            proxies=kwargs.pop("proxies", None),
            mounts=kwargs.pop("mounts", None),
            base_url=kwargs.pop("base_url", ''),
            transport=kwargs.pop("transport", None),
            trust_env=kwargs.pop("trust_env", True),
        )
        close_flag = True
    try:
        timeout = kwargs.pop('timeout', 20)
        method = kwargs.pop('method', 'post')
        assert method.lower() in ['post', "put", "patch"]
        clint_function = getattr(clint, method.lower())
        response = clint_function(url, files=files, timeout=timeout, *args, **kwargs)
    finally:
        if close_flag:
            clint.close()
    return response


async def async_send_file(url, files, clint: httpx.AsyncClient = None, *args, **kwargs):
    """

       @param url:
       @param files:
               FileTypes = Union[
                   # file (or text or path)
                   FileContent,
                   # (filename, file (or text))
                   Tuple[str, FileContent],
                   # (filename, file (or text), content_type)
                   Tuple[str, FileContent, str],
               ]
           Files = Union[dict[str, FileTypes], [[str, FileTypes]], [dict[str, FileTypes]]]
           Files = {"remote_name": {"file_name": Union[path, bytes]}}
           Files = path
           Files = bytes
       @param args:
       param remote_name = None 远程接收名字
       @param kwargs:
       @param clint:
       @return:
       """

    files = __md5_check(__parse_file(files, kwargs))
    timeout = kwargs.pop('timeout', 20)
    method = kwargs.pop('method', 'post')
    assert method.lower() in ['post', "put", "patch"]
    if clint:
        clint_function = getattr(clint, method.lower())
        response = await clint_function(url, files=files, timeout=timeout, *args, **kwargs)
    else:
        async with httpx.AsyncClient(
                verify=kwargs.pop("verify", True),
                cert=kwargs.pop("cert", None),
                http2=kwargs.pop("http2", False),
                event_hooks=kwargs.pop("event_hooks", None),
                proxies=kwargs.pop("proxies", None),
                mounts=kwargs.pop("mounts", None),
                base_url=kwargs.pop("base_url", ''),
                transport=kwargs.pop("transport", None),
                trust_env=kwargs.pop("trust_env", True),
        ) as clint:
            clint_function = getattr(clint, method.lower())
            response = await clint_function(url, files=files, timeout=timeout, *args, **kwargs)
    return response


def __parse_file(files, kwargs):
    """
    文件参数处理
    @param files:
    @param kwargs:
    @return:
    """
    remote_name = kwargs.pop("remote_name", "file")

    def get_file(path, return_name=False):
        if os.path.isfile(path):
            filename = os.path.split(path)[-1]
            with open(path, 'rb') as __f:
                path = __f.read()
            if return_name:
                return [filename, path]
        return path

    if isinstance(files, dict):
        for key, value in files.items():
            if not value:
                files = None
                break
            elif isinstance(value, bytes):
                continue
            elif isinstance(value, str):
                files[key] = tuple(get_file(value, return_name=True))
            elif isinstance(value, dict):
                for file_name, content in value.items():
                    if isinstance(content, bytes):
                        files[key] = (file_name, content)
                    elif isinstance(content, str):
                        if not file_name:
                            files[key] = tuple(get_file(content, return_name=True))
                        else:
                            files[key] = (file_name, get_file(content))
                    break
            else:
                if 2 <= len(value) <= 3:
                    files[key] = list(value)
                    if isinstance(files[key][1], str):
                        if not files[key][0]:
                            files[key][0], files[key][1] = get_file(
                                files[key][1], return_name=True)
                        else:
                            files[key][1] = get_file(files[key][1])
                        files[key] = tuple(files[key])
                else:
                    raise ValueError("value length must (2 or 3)")
    elif isinstance(files, (tuple, list)):
        files = list(files)

        for _, value in enumerate(files):
            if isinstance(value, dict):
                # {'doc': ('doc', 'C:\\Users\\yuhaipeng\\Desktop\\网神信息.txt', 'application/plan')}
                for k, v in value.items():
                    value = (k, v)
                    break
            value = list(value)
            files[_] = value
            if isinstance(value[1], str):
                value[1] = tuple(get_file(value[1], return_name=True))
                files[_] = tuple(value)
            elif isinstance(value[1], (tuple, list)):
                value[1] = list(value[1])
                if 2 <= len(value[1]) <= 3:
                    if isinstance(value[1][1], str):
                        if not value[1][0]:
                            value[1][0], value[1][1] = get_file(value[1][1], return_name=True)
                        else:
                            value[1][1] = get_file(value[1][1])
                        value[1] = tuple(value[1])
                    elif isinstance(value[1][1], bytes):
                        value[1] = tuple(value[1])
                    else:
                        if not value[1][0]:
                            value[1][0] = "name"
                        value[1] = tuple(value[1])
                    files[_] = tuple(value)
                else:
                    raise ValueError("value length must (2 or 3)")
            elif isinstance(value[1], bytes):
                files[_] = tuple(value)
    elif isinstance(files, str):
        files = ((remote_name, tuple(get_file(files, return_name=True))),)
    elif isinstance(files, bytes):
        files = ((remote_name, files),)
    else:
        raise ValueError('error')
    return files


def __md5_check(files):
    """
    文件重复去重
    @return:
    """
    md5_check_li = list()
    import hashlib
    if isinstance(files, (tuple, list)):
        file_len = len(files)
        if file_len > 1:
            files = list(files)
            row = 0
            while row < file_len:
                file = files[row]
                if not isinstance(file[1][1], bytes):
                    digest = hashlib.md5(file[1][:10000]).hexdigest()
                else:
                    digest = hashlib.md5(file[1][1][:10000]).hexdigest()
                if digest not in md5_check_li:
                    md5_check_li.append(digest)
                else:
                    files.remove(file)
                    file_len -= 1
                    continue
                row += 1

    elif isinstance(files, dict):
        file_list = list(files.keys())
        file_len = len(file_list)
        if file_len > 1:
            row = 0
            while row < file_len:
                digest = hashlib.md5(files[file_list[row]][1][:10000]).hexdigest()
                if digest not in md5_check_li:
                    md5_check_li.append(digest)
                else:
                    files.pop(file_list[row], None)
                row += 1

    return files


if __name__ == '__main__':
    r = r"C:\Users\yuhaipeng\Desktop\网神信息.txt"
    # with httpx.Client() as session:
    #     print(send_file(
    #         "http://192.168.1.247/api/doc/html",
    #         files=r,
    #         # clint=session,
    #         remote_name="doc",
    #         data={"a": 1},
    #         params={"b": 1},
    #         method='post'
    #     ).text)

    import asyncio


    #
    # loop = asyncio.get_event_loop()
    #
    # x, b = loop.run_until_complete(
    #     asyncio.wait(
    #         [
    #             async_send_file(
    #                 "http://192.168.1.247/api/doc/html",
    #                 files=r,
    #                 # clint=session,
    #                 remote_name="doc",
    #             )
    #         ]
    #     )
    # )
    # for i in x:
    #     print(i.result().json())

    async def send_main(**kwargs):
        async with httpx.AsyncClient() as cli:
            # response = await async_send_file(
            #     "http://192.168.1.247/api/doc/html",
            #     files=[("doc", r)],
            #     clint=cli,
            #     remote_name="doc",
            # )
            # response = await async_send_file(
            #     "http://192.168.1.247/api/doc/html",
            #     files=r,
            #     clint=cli,
            #     remote_name="doc",
            # )

            response = await async_send_file(
                "http://192.168.1.247/api/doc/html",
                files=r,
                clint=cli,
                remote_name="doc",
            )

            # response = await async_send_file(
            #     "http://192.168.1.247/api/doc/html",
            #     files={"doc": ("file_name", r, "abcd")},
            #     clint=cli,
            #     remote_name="doc",
            # )
        print(response.json())


    loop = asyncio.get_event_loop()

    loop.run_until_complete(
        asyncio.wait(
            [
                send_main()
            ]
        )
    )
上一篇:python 装饰器计算函数耗时


下一篇:python 函数传递不确定多少个的参数方法