# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: httpx_send_file.py
# Month: 九月
# time: 2021/9/16 15:37
""" Write an introduction to the module here """
import httpx
import os
__all__ = ["send_file", "async_send_file"]
def send_file(url, files, clint: httpx.Client = None, *args, **kwargs):
"""
@param url:
@param files:
FileTypes = Union[
# file (or text or path)
FileContent,
# (filename, file (or text))
Tuple[str, FileContent],
# (filename, file (or text), content_type)
Tuple[str, FileContent, str],
]
Files = Union[dict[str, FileTypes], [[str, FileTypes]], [dict[str, FileTypes]]]
Files = {"remote_name": {"file_name": Union[path, bytes]}}
Files = path
Files = bytes
@param args:
param remote_name = None 远程接收名字
@param kwargs:
@param clint:
@return:
"""
files = __md5_check(__parse_file(files, kwargs))
close_flag = False
if not clint:
clint = httpx.Client(
verify=kwargs.pop("verify", True),
cert=kwargs.pop("cert", None),
http2=kwargs.pop("http2", False),
event_hooks=kwargs.pop("event_hooks", None),
proxies=kwargs.pop("proxies", None),
mounts=kwargs.pop("mounts", None),
base_url=kwargs.pop("base_url", ''),
transport=kwargs.pop("transport", None),
trust_env=kwargs.pop("trust_env", True),
)
close_flag = True
try:
timeout = kwargs.pop('timeout', 20)
method = kwargs.pop('method', 'post')
assert method.lower() in ['post', "put", "patch"]
clint_function = getattr(clint, method.lower())
response = clint_function(url, files=files, timeout=timeout, *args, **kwargs)
finally:
if close_flag:
clint.close()
return response
async def async_send_file(url, files, clint: httpx.AsyncClient = None, *args, **kwargs):
"""
@param url:
@param files:
FileTypes = Union[
# file (or text or path)
FileContent,
# (filename, file (or text))
Tuple[str, FileContent],
# (filename, file (or text), content_type)
Tuple[str, FileContent, str],
]
Files = Union[dict[str, FileTypes], [[str, FileTypes]], [dict[str, FileTypes]]]
Files = {"remote_name": {"file_name": Union[path, bytes]}}
Files = path
Files = bytes
@param args:
param remote_name = None 远程接收名字
@param kwargs:
@param clint:
@return:
"""
files = __md5_check(__parse_file(files, kwargs))
timeout = kwargs.pop('timeout', 20)
method = kwargs.pop('method', 'post')
assert method.lower() in ['post', "put", "patch"]
if clint:
clint_function = getattr(clint, method.lower())
response = await clint_function(url, files=files, timeout=timeout, *args, **kwargs)
else:
async with httpx.AsyncClient(
verify=kwargs.pop("verify", True),
cert=kwargs.pop("cert", None),
http2=kwargs.pop("http2", False),
event_hooks=kwargs.pop("event_hooks", None),
proxies=kwargs.pop("proxies", None),
mounts=kwargs.pop("mounts", None),
base_url=kwargs.pop("base_url", ''),
transport=kwargs.pop("transport", None),
trust_env=kwargs.pop("trust_env", True),
) as clint:
clint_function = getattr(clint, method.lower())
response = await clint_function(url, files=files, timeout=timeout, *args, **kwargs)
return response
def __parse_file(files, kwargs):
"""
文件参数处理
@param files:
@param kwargs:
@return:
"""
remote_name = kwargs.pop("remote_name", "file")
def get_file(path, return_name=False):
if os.path.isfile(path):
filename = os.path.split(path)[-1]
with open(path, 'rb') as __f:
path = __f.read()
if return_name:
return [filename, path]
return path
if isinstance(files, dict):
for key, value in files.items():
if not value:
files = None
break
elif isinstance(value, bytes):
continue
elif isinstance(value, str):
files[key] = tuple(get_file(value, return_name=True))
elif isinstance(value, dict):
for file_name, content in value.items():
if isinstance(content, bytes):
files[key] = (file_name, content)
elif isinstance(content, str):
if not file_name:
files[key] = tuple(get_file(content, return_name=True))
else:
files[key] = (file_name, get_file(content))
break
else:
if 2 <= len(value) <= 3:
files[key] = list(value)
if isinstance(files[key][1], str):
if not files[key][0]:
files[key][0], files[key][1] = get_file(
files[key][1], return_name=True)
else:
files[key][1] = get_file(files[key][1])
files[key] = tuple(files[key])
else:
raise ValueError("value length must (2 or 3)")
elif isinstance(files, (tuple, list)):
files = list(files)
for _, value in enumerate(files):
if isinstance(value, dict):
# {'doc': ('doc', 'C:\\Users\\yuhaipeng\\Desktop\\网神信息.txt', 'application/plan')}
for k, v in value.items():
value = (k, v)
break
value = list(value)
files[_] = value
if isinstance(value[1], str):
value[1] = tuple(get_file(value[1], return_name=True))
files[_] = tuple(value)
elif isinstance(value[1], (tuple, list)):
value[1] = list(value[1])
if 2 <= len(value[1]) <= 3:
if isinstance(value[1][1], str):
if not value[1][0]:
value[1][0], value[1][1] = get_file(value[1][1], return_name=True)
else:
value[1][1] = get_file(value[1][1])
value[1] = tuple(value[1])
elif isinstance(value[1][1], bytes):
value[1] = tuple(value[1])
else:
if not value[1][0]:
value[1][0] = "name"
value[1] = tuple(value[1])
files[_] = tuple(value)
else:
raise ValueError("value length must (2 or 3)")
elif isinstance(value[1], bytes):
files[_] = tuple(value)
elif isinstance(files, str):
files = ((remote_name, tuple(get_file(files, return_name=True))),)
elif isinstance(files, bytes):
files = ((remote_name, files),)
else:
raise ValueError('error')
return files
def __md5_check(files):
"""
文件重复去重
@return:
"""
md5_check_li = list()
import hashlib
if isinstance(files, (tuple, list)):
file_len = len(files)
if file_len > 1:
files = list(files)
row = 0
while row < file_len:
file = files[row]
if not isinstance(file[1][1], bytes):
digest = hashlib.md5(file[1][:10000]).hexdigest()
else:
digest = hashlib.md5(file[1][1][:10000]).hexdigest()
if digest not in md5_check_li:
md5_check_li.append(digest)
else:
files.remove(file)
file_len -= 1
continue
row += 1
elif isinstance(files, dict):
file_list = list(files.keys())
file_len = len(file_list)
if file_len > 1:
row = 0
while row < file_len:
digest = hashlib.md5(files[file_list[row]][1][:10000]).hexdigest()
if digest not in md5_check_li:
md5_check_li.append(digest)
else:
files.pop(file_list[row], None)
row += 1
return files
if __name__ == '__main__':
r = r"C:\Users\yuhaipeng\Desktop\网神信息.txt"
# with httpx.Client() as session:
# print(send_file(
# "http://192.168.1.247/api/doc/html",
# files=r,
# # clint=session,
# remote_name="doc",
# data={"a": 1},
# params={"b": 1},
# method='post'
# ).text)
import asyncio
#
# loop = asyncio.get_event_loop()
#
# x, b = loop.run_until_complete(
# asyncio.wait(
# [
# async_send_file(
# "http://192.168.1.247/api/doc/html",
# files=r,
# # clint=session,
# remote_name="doc",
# )
# ]
# )
# )
# for i in x:
# print(i.result().json())
async def send_main(**kwargs):
async with httpx.AsyncClient() as cli:
# response = await async_send_file(
# "http://192.168.1.247/api/doc/html",
# files=[("doc", r)],
# clint=cli,
# remote_name="doc",
# )
# response = await async_send_file(
# "http://192.168.1.247/api/doc/html",
# files=r,
# clint=cli,
# remote_name="doc",
# )
response = await async_send_file(
"http://192.168.1.247/api/doc/html",
files=r,
clint=cli,
remote_name="doc",
)
# response = await async_send_file(
# "http://192.168.1.247/api/doc/html",
# files={"doc": ("file_name", r, "abcd")},
# clint=cli,
# remote_name="doc",
# )
print(response.json())
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.wait(
[
send_main()
]
)
)