python 协程
python 生成器的作用
协程在多个模型流式输出中的使用实例
继续学习:
https://www.cnblogs.com/traditional/p/17398542.html
https://blog.****.net/m0_51180924/article/details/124612738
fastapi 并发
使用 fastapi api 的形式处理任务,每个任务使用协程的方式,处理多个流式输出。
该方式支持多个 fastapi 并发请求。
服务端
import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse
import asyncio
from asyncio import Queue
app = FastAPI()
@app.get("/v1/models")
async def get_models():
data = {
"data": [
{
"id": "Qwen1.5-7B", # openai 支持模型id
"object": "model", # openai 支持模型类别
"owned_by": "organization-owner", # openai 支持模型所有者
"permission": [] # openai 支持模型权限,暂时不支持
},
{
"id": "chatglm3-6b",
"object": "model",
"owned_by": "organization-owner",
"permission": []
}
],
"object": "list" # data 类型
}
return data
async def output_data(text: str, model: str):
output = ""
for idx, word in enumerate(text):
output += word
chunk = {
"id": None,
"choices": [
{
"delta": {
"content": f"{model} {idx} {output}",
"function_call": None, # OpenAI返回,未知
"role": "assistant", # OpenAI系统消息角色
"tool_calls": None # OpenAI返回,未知
},
"finish_reason": "length", # OpenAI停止码
"index": 0, # OpenAI返回,未知
"logprobs": None # OpenAI返回,未知
}
],
"created": 1715238637, # 时间戳
"model": model, # OpenAI模型id
"object": "chat.completion.chunk", # OpenAI消息类型
"system_fingerprint": None # OpenAI返回,未知
}
data = json.dumps(chunk, ensure_ascii=False)
yield data
await asyncio.sleep(1)
@app.post("/v1/chat/completions")
async def flush_stream(request: Request):
models = ["chatglm3", "qwen"]
async def async_generate(index:int, model: str, queue: Queue):
text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
if model == "chatglm3":
text = "我是chatglm3的流式输出嘿嘿!!!"
items_data = output_data(text=text, model=model)
async for item_data in items_data:
# print(f"generate### {model} {item_data}")
# yield item_data
queue.put_nowait((index, item_data))
queue.put_nowait((index, None))
async def async_consumer(queue: Queue, indices: list, timeout: float):
indices = set(indices)
finished = set()
while indices != finished:
try:
index, response = await asyncio.wait_for(queue.get(), timeout)
if response is None:
finished.add(index)
print("consumer queue indices finished", indices, finished)
yield (index, response)
except TimeoutError:
break
async def async_process(models: list):
### 1. 支持多个模型的流式输出
queue = Queue()
tasks = [
asyncio.create_task(async_generate(index=index, model=model, queue=queue))
for index, model in enumerate(models)
]
print("###################", tasks)
all_text = [dict() for _ in range(len(models))]
async for index, response in async_consumer(queue=queue, indices=list(range(len(tasks))), timeout=10):
if response is not None:
all_text[index] = response
print(f"thread00000_master###{all_text}")
res_text = json.dumps(all_text)+"\n"
yield res_text
print(f"thread00000_master ENDENDENDEND")
# async def async_process(models: list):
# ### 2.支持单个模型的流式输出
# text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
# items_data = output_data(text=text, model=models[0])
# async for item_data in items_data:
# yield item_data
return EventSourceResponse(async_process(models), media_type="text/event-stream")
# return EventSourceResponse(async_process(models), media_type="text/plain")
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8080)
运行:
> python.exe .\main.py
INFO: Started server process [12872]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
INFO: 127.0.0.1:64346 - "POST /v1/chat/completions HTTP/1.1" 200 OK
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', {}]
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
......
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant",
"tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
consumer queue indices finished {0, 1} {0, 1}
process END END
客户端
from openai import OpenAI
client = OpenAI(
api_key="EMPTY",
base_url="http://127.0.0.1:8080/v1/"
)
response = client.chat.completions.create(
model="EMPTY",
messages=[
# {"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "你好"}
# {"role": "user", "content": "谁是特朗普"},
# {"role": "assistant", "content": "特朗普是美国前总统"},
# {"role": "user", "content": "特朗普多大年纪了"},
],
functions=None,
temperature=1,
top_p=0,
max_tokens=20,
stream=True,
)
print("#####", response)
ret_text = ""
for part in response:
# print("\n33333", type(part), part)
print("\n33333", type(part), part[0])
print("33333", type(part), part[1])
运行:
> python.exe .\stream.py
##### <openai.Stream object at 0x0000024D1C322590>
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> ChatCompletionChunk(id=None, choices=None, created=None, model=None, object=None, service_tier=None, system_fingerprint=None, usage=None)
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}
......
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}
多线程执行任务
开启多个线程执行任务,每个任务使用协程的方式,处理多个流式输出。
服务端
import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse
import asyncio
from asyncio import Queue
import time
from threading import Thread
app = FastAPI()
@app.get("/v1/models")
async def get_models():
......
async def output_data(text: str, model: str):
output = ""
for idx, word in enumerate(text):
output += word
chunk = {
"id": None,
"choices": [
{
"delta": {
"content": f"{model} {idx} {output}",
"function_call": None, # OpenAI返回,未知
"role": "assistant", # OpenAI系统消息角色
"tool_calls": None # OpenAI返回,未知
},
"finish_reason": "length", # OpenAI停止码
"index": 0, # OpenAI返回,未知
"logprobs": None # OpenAI返回,未知
}
],
"created": 1715238637, # 时间戳
"model": model, # OpenAI模型id
"object": "chat.completion.chunk", # OpenAI消息类型
"system_fingerprint": None # OpenAI返回,未知
}
data = json.dumps(chunk, ensure_ascii=False)
yield data
await asyncio.sleep(1)
@app.post("/v1/chat/completions")
async def flush_stream(request: Request):
......
async def async_task(task_name: str, models: list):
async def task_generate(task_name: str, model: str):
text = f"这是{model}模型的流式输出他会将每个字挨个的输出哈哈!!!"
# if model == "chatglm3":
# text = "我是chatglm3的流式输出嘿嘿!!!"
items_data = output_data(text=text, model=model)
output = ""
async for item_data in items_data:
print(f"{task_name}### {model} {item_data}")
# yield item_data
# queue.put_nowait((index, item_data))
output = item_data
# queue.put_nowait((index, None))
return output
# results = await asyncio.gather(task_generate("chatglm3"), task_generate("qwen1.5"))
results = await asyncio.gather(*[
task_generate(task_name=task_name, model=model)
for model in models
]
)
print(f"{task_name} ENDENDENDEND {results}")
def task_thread(thread_name: str):
print("另外开始一个子线程做任务啦", thread_name)
models = ["model11111", "model22222"]
# 1. 只执行一次任务
# asyncio.run(async_task(thread_name, models))
# 2. 可以执行多此任务
count = 0
flag = True
while True:
# 执行任务
if flag:
task_name = f"{thread_name}_task"
asyncio.run(async_task(task_name, models))
flag = False
# 没有任务等待
time.sleep(1)
print(f"{thread_name} count:{count}")
count+=1
print("子线程任务结束啦", thread_name)
if __name__ == '__main__':
# 1. 批量线程创建
thread_names = ["thread11111", "thread22222", "thread33333"]
threads = [
Thread(target=task_thread, args=(thread_name,)).start()
for thread_name in thread_names
]
# 2. 逐个线程创建
# t1 = Thread(target=task, args=("task11111",))
# t1.start()
# time.sleep(1)
# t2 = Thread(target=task, args=("task22222",))
# t2.start()
uvicorn.run(app, host="0.0.0.0", port=8080)
运行:
> python.exe .\main.py
另外开始一个子线程做任务啦 thread11111
另外开始一个子线程做任务啦 thread22222
另外开始一个子线程做任务啦 thread33333
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
INFO: Started server process [19308]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"con