协程在多个模型流式输出中的使用实例

python 协程
python 生成器的作用
协程在多个模型流式输出中的使用实例

继续学习:
https://www.cnblogs.com/traditional/p/17398542.html
https://blog.****.net/m0_51180924/article/details/124612738

fastapi 并发

使用 fastapi api 的形式处理任务,每个任务使用协程的方式,处理多个流式输出。
该方式支持多个 fastapi 并发请求。

服务端

import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse

import asyncio
from asyncio import Queue

app = FastAPI()
 
@app.get("/v1/models")
async def get_models():
    data = {
  "data": [
        {
        "id": "Qwen1.5-7B",						# openai 支持模型id
        "object": "model",					    # openai 支持模型类别
        "owned_by": "organization-owner",	    # openai 支持模型所有者
        "permission": []						# openai 支持模型权限,暂时不支持
        },
        {
        "id": "chatglm3-6b",
        "object": "model",
        "owned_by": "organization-owner",
        "permission": []
        }
    ],
    "object": "list"							# data 类型
    }
    return data

async def output_data(text: str, model: str):
    output = ""
    for idx, word in enumerate(text):
        output += word
        chunk = {
            "id": None,
            "choices": [
                {
                    "delta": {
                        "content": f"{model} {idx} {output}",
                        "function_call": None, 					# OpenAI返回,未知
                        "role": "assistant",					# OpenAI系统消息角色
                        "tool_calls": None						# OpenAI返回,未知
                    },
                    "finish_reason": "length",					# OpenAI停止码
                    "index": 0,								    # OpenAI返回,未知
                    "logprobs": None							# OpenAI返回,未知
                }
            ],
            "created": 1715238637, 						    # 时间戳
            "model": model,							        # OpenAI模型id
            "object": "chat.completion.chunk",				# OpenAI消息类型
            "system_fingerprint": None						# OpenAI返回,未知
        }

        data = json.dumps(chunk, ensure_ascii=False)
        yield data
        await asyncio.sleep(1)

@app.post("/v1/chat/completions")
async def flush_stream(request: Request):
    models = ["chatglm3", "qwen"]

    async def async_generate(index:int, model: str, queue: Queue):
        text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
        if model == "chatglm3":
            text = "我是chatglm3的流式输出嘿嘿!!!"
        items_data = output_data(text=text, model=model)
        
        async for item_data in items_data:
            # print(f"generate### {model} {item_data}")
            # yield item_data
            queue.put_nowait((index, item_data))
        queue.put_nowait((index, None))

    async def async_consumer(queue: Queue, indices: list, timeout: float):
        indices = set(indices)
        finished = set()

        while indices != finished:
            try:
                index, response = await asyncio.wait_for(queue.get(), timeout)
                if response is None:
                    finished.add(index)
                    print("consumer queue indices finished", indices, finished)
                yield (index, response)
            except TimeoutError:
                break

    async def async_process(models: list):
        ### 1. 支持多个模型的流式输出
        queue = Queue()
        tasks = [
            asyncio.create_task(async_generate(index=index, model=model, queue=queue))
            for index, model in enumerate(models)
        ]
        print("###################", tasks)
        
        all_text = [dict() for _ in range(len(models))]
        async for index, response in async_consumer(queue=queue, indices=list(range(len(tasks))), timeout=10):
            if response is not None:
                all_text[index] = response

                print(f"thread00000_master###{all_text}")
                res_text = json.dumps(all_text)+"\n"
                yield res_text
        print(f"thread00000_master ENDENDENDEND")
    
    # async def async_process(models: list):
    #     ### 2.支持单个模型的流式输出
    #     text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
    #     items_data = output_data(text=text, model=models[0])
    #     async for item_data in items_data:
    #         yield item_data

    return EventSourceResponse(async_process(models), media_type="text/event-stream")
    # return EventSourceResponse(async_process(models), media_type="text/plain")
 
if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8080)

运行:

> python.exe .\main.py
INFO:     Started server process [12872]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
INFO:     127.0.0.1:64346 - "POST /v1/chat/completions HTTP/1.1" 200 OK
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', {}]
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
......
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", 
"tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
consumer queue indices finished {0, 1} {0, 1}
process END END

客户端

from openai import OpenAI

client = OpenAI(
        api_key="EMPTY",
        base_url="http://127.0.0.1:8080/v1/"
    )
response = client.chat.completions.create(
        model="EMPTY",
        messages=[
            # {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "你好"}
            # {"role": "user", "content": "谁是特朗普"},
            # {"role": "assistant", "content": "特朗普是美国前总统"},
            # {"role": "user", "content": "特朗普多大年纪了"},
        ],
        functions=None,
        temperature=1,
        top_p=0,
        max_tokens=20,
        stream=True,
    )

print("#####", response)
ret_text = ""
for part in response:
    # print("\n33333", type(part), part)
    print("\n33333", type(part), part[0])
    print("33333", type(part), part[1])

运行:

> python.exe .\stream.py
##### <openai.Stream object at 0x0000024D1C322590>

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> ChatCompletionChunk(id=None, choices=None, created=None, model=None, object=None, service_tier=None, system_fingerprint=None, usage=None)        

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}
......

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}

多线程执行任务

开启多个线程执行任务,每个任务使用协程的方式,处理多个流式输出。

服务端

import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse

import asyncio
from asyncio import Queue

import time
from threading import Thread

app = FastAPI()
 
@app.get("/v1/models")
async def get_models():
    ......

async def output_data(text: str, model: str):
    output = ""
    for idx, word in enumerate(text):
        output += word
        chunk = {
            "id": None,
            "choices": [
                {
                    "delta": {
                        "content": f"{model} {idx} {output}",
                        "function_call": None, 					# OpenAI返回,未知
                        "role": "assistant",					# OpenAI系统消息角色
                        "tool_calls": None						# OpenAI返回,未知
                    },
                    "finish_reason": "length",					# OpenAI停止码
                    "index": 0,								    # OpenAI返回,未知
                    "logprobs": None							# OpenAI返回,未知
                }
            ],
            "created": 1715238637, 						    # 时间戳
            "model": model,							        # OpenAI模型id
            "object": "chat.completion.chunk",				# OpenAI消息类型
            "system_fingerprint": None						# OpenAI返回,未知
        }

        data = json.dumps(chunk, ensure_ascii=False)
        yield data
        await asyncio.sleep(1)

@app.post("/v1/chat/completions")
async def flush_stream(request: Request):
    ......

async def async_task(task_name: str, models: list):
    async def task_generate(task_name: str, model: str):
        text = f"这是{model}模型的流式输出他会将每个字挨个的输出哈哈!!!"
        # if model == "chatglm3":
        #     text = "我是chatglm3的流式输出嘿嘿!!!"
        items_data = output_data(text=text, model=model)
        
        output = ""
        async for item_data in items_data:
            print(f"{task_name}### {model} {item_data}")
            # yield item_data
            # queue.put_nowait((index, item_data))
            output = item_data
        # queue.put_nowait((index, None))
        return output

    # results = await asyncio.gather(task_generate("chatglm3"), task_generate("qwen1.5"))
    results = await asyncio.gather(*[
                                        task_generate(task_name=task_name, model=model)
                                        for model in models
                                    ]
                                    )
    print(f"{task_name} ENDENDENDEND {results}")

def task_thread(thread_name: str):
    print("另外开始一个子线程做任务啦", thread_name)
    models = ["model11111", "model22222"]
    # 1. 只执行一次任务
    # asyncio.run(async_task(thread_name, models))

    # 2. 可以执行多此任务
    count = 0
    flag = True
    while True:
        # 执行任务
        if flag:
            task_name = f"{thread_name}_task"
            asyncio.run(async_task(task_name, models))
            flag = False

        # 没有任务等待
        time.sleep(1)
        print(f"{thread_name} count:{count}")
        count+=1
    
    print("子线程任务结束啦", thread_name)

if __name__ == '__main__':
    # 1. 批量线程创建
    thread_names = ["thread11111", "thread22222", "thread33333"]
    threads = [
        Thread(target=task_thread, args=(thread_name,)).start()
        for thread_name in thread_names
    ]

    # 2. 逐个线程创建
    # t1 = Thread(target=task, args=("task11111",))
    # t1.start()
    # time.sleep(1)
    # t2 = Thread(target=task, args=("task22222",))
    # t2.start()

    uvicorn.run(app, host="0.0.0.0", port=8080)

运行:

> python.exe .\main.py
另外开始一个子线程做任务啦 thread11111
另外开始一个子线程做任务啦 thread22222
另外开始一个子线程做任务啦 thread33333
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
INFO:     Started server process [19308]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"con
上一篇:Macos m系列芯片环境下安装python3以及mysqlclient流程以及遇到的一系列问题


下一篇:当遇到 502 错误(Bad Gateway)怎么办