
python 协程
python 生成器的作用


fastapi 并发

使用 fastapi api 的形式处理任务,每个任务使用协程的方式,处理多个流式输出。
该方式支持多个 fastapi 并发请求。


import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse

import asyncio
from asyncio import Queue

app = FastAPI()
async def get_models():
    data = {
  "data": [
        "id": "Qwen1.5-7B",						# openai 支持模型id
        "object": "model",					    # openai 支持模型类别
        "owned_by": "organization-owner",	    # openai 支持模型所有者
        "permission": []						# openai 支持模型权限,暂时不支持
        "id": "chatglm3-6b",
        "object": "model",
        "owned_by": "organization-owner",
        "permission": []
    "object": "list"							# data 类型
    return data

async def output_data(text: str, model: str):
    output = ""
    for idx, word in enumerate(text):
        output += word
        chunk = {
            "id": None,
            "choices": [
                    "delta": {
                        "content": f"{model} {idx} {output}",
                        "function_call": None, 					# OpenAI返回,未知
                        "role": "assistant",					# OpenAI系统消息角色
                        "tool_calls": None						# OpenAI返回,未知
                    "finish_reason": "length",					# OpenAI停止码
                    "index": 0,								    # OpenAI返回,未知
                    "logprobs": None							# OpenAI返回,未知
            "created": 1715238637, 						    # 时间戳
            "model": model,							        # OpenAI模型id
            "object": "chat.completion.chunk",				# OpenAI消息类型
            "system_fingerprint": None						# OpenAI返回,未知

        data = json.dumps(chunk, ensure_ascii=False)
        yield data
        await asyncio.sleep(1)

async def flush_stream(request: Request):
    models = ["chatglm3", "qwen"]

    async def async_generate(index:int, model: str, queue: Queue):
        text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
        if model == "chatglm3":
            text = "我是chatglm3的流式输出嘿嘿!!!"
        items_data = output_data(text=text, model=model)
        async for item_data in items_data:
            # print(f"generate### {model} {item_data}")
            # yield item_data
            queue.put_nowait((index, item_data))
        queue.put_nowait((index, None))

    async def async_consumer(queue: Queue, indices: list, timeout: float):
        indices = set(indices)
        finished = set()

        while indices != finished:
                index, response = await asyncio.wait_for(queue.get(), timeout)
                if response is None:
                    print("consumer queue indices finished", indices, finished)
                yield (index, response)
            except TimeoutError:

    async def async_process(models: list):
        ### 1. 支持多个模型的流式输出
        queue = Queue()
        tasks = [
            asyncio.create_task(async_generate(index=index, model=model, queue=queue))
            for index, model in enumerate(models)
        print("###################", tasks)
        all_text = [dict() for _ in range(len(models))]
        async for index, response in async_consumer(queue=queue, indices=list(range(len(tasks))), timeout=10):
            if response is not None:
                all_text[index] = response

                res_text = json.dumps(all_text)+"\n"
                yield res_text
        print(f"thread00000_master ENDENDENDEND")
    # async def async_process(models: list):
    #     ### 2.支持单个模型的流式输出
    #     text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
    #     items_data = output_data(text=text, model=models[0])
    #     async for item_data in items_data:
    #         yield item_data

    return EventSourceResponse(async_process(models), media_type="text/event-stream")
    # return EventSourceResponse(async_process(models), media_type="text/plain")
if __name__ == '__main__':
    uvicorn.run(app, host="", port=8080)


> python.exe .\main.py
INFO:     Started server process [12872]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on (Press CTRL+C to quit)
INFO: - "POST /v1/chat/completions HTTP/1.1" 200 OK
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', {}]
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", 
"tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
consumer queue indices finished {0, 1} {0, 1}
process END END


from openai import OpenAI

client = OpenAI(
response = client.chat.completions.create(
            # {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "你好"}
            # {"role": "user", "content": "谁是特朗普"},
            # {"role": "assistant", "content": "特朗普是美国前总统"},
            # {"role": "user", "content": "特朗普多大年纪了"},

print("#####", response)
ret_text = ""
for part in response:
    # print("\n33333", type(part), part)
    print("\n33333", type(part), part[0])
    print("33333", type(part), part[1])


> python.exe .\stream.py
##### <openai.Stream object at 0x0000024D1C322590>

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> ChatCompletionChunk(id=None, choices=None, created=None, model=None, object=None, service_tier=None, system_fingerprint=None, usage=None)        

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}




import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse

import asyncio
from asyncio import Queue

import time
from threading import Thread

app = FastAPI()
async def get_models():

async def output_data(text: str, model: str):
    output = ""
    for idx, word in enumerate(text):
        output += word
        chunk = {
            "id": None,
            "choices": [
                    "delta": {
                        "content": f"{model} {idx} {output}",
                        "function_call": None, 					# OpenAI返回,未知
                        "role": "assistant",					# OpenAI系统消息角色
                        "tool_calls": None						# OpenAI返回,未知
                    "finish_reason": "length",					# OpenAI停止码
                    "index": 0,								    # OpenAI返回,未知
                    "logprobs": None							# OpenAI返回,未知
            "created": 1715238637, 						    # 时间戳
            "model": model,							        # OpenAI模型id
            "object": "chat.completion.chunk",				# OpenAI消息类型
            "system_fingerprint": None						# OpenAI返回,未知

        data = json.dumps(chunk, ensure_ascii=False)
        yield data
        await asyncio.sleep(1)

async def flush_stream(request: Request):

async def async_task(task_name: str, models: list):
    async def task_generate(task_name: str, model: str):
        text = f"这是{model}模型的流式输出他会将每个字挨个的输出哈哈!!!"
        # if model == "chatglm3":
        #     text = "我是chatglm3的流式输出嘿嘿!!!"
        items_data = output_data(text=text, model=model)
        output = ""
        async for item_data in items_data:
            print(f"{task_name}### {model} {item_data}")
            # yield item_data
            # queue.put_nowait((index, item_data))
            output = item_data
        # queue.put_nowait((index, None))
        return output

    # results = await asyncio.gather(task_generate("chatglm3"), task_generate("qwen1.5"))
    results = await asyncio.gather(*[
                                        task_generate(task_name=task_name, model=model)
                                        for model in models
    print(f"{task_name} ENDENDENDEND {results}")

def task_thread(thread_name: str):
    print("另外开始一个子线程做任务啦", thread_name)
    models = ["model11111", "model22222"]
    # 1. 只执行一次任务
    # asyncio.run(async_task(thread_name, models))

    # 2. 可以执行多此任务
    count = 0
    flag = True
    while True:
        # 执行任务
        if flag:
            task_name = f"{thread_name}_task"
            asyncio.run(async_task(task_name, models))
            flag = False

        # 没有任务等待
        print(f"{thread_name} count:{count}")
    print("子线程任务结束啦", thread_name)

if __name__ == '__main__':
    # 1. 批量线程创建
    thread_names = ["thread11111", "thread22222", "thread33333"]
    threads = [
        Thread(target=task_thread, args=(thread_name,)).start()
        for thread_name in thread_names

    # 2. 逐个线程创建
    # t1 = Thread(target=task, args=("task11111",))
    # t1.start()
    # time.sleep(1)
    # t2 = Thread(target=task, args=("task22222",))
    # t2.start()

    uvicorn.run(app, host="", port=8080)


> python.exe .\main.py
另外开始一个子线程做任务啦 thread11111
另外开始一个子线程做任务啦 thread22222
另外开始一个子线程做任务啦 thread33333
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}
INFO:     Started server process [19308]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on (Press CTRL+C to quit)
thread33333_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread22222_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread33333_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread22222_task### model22222 {"id": null, "choices": [{"delta": {"content": "model22222 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model22222", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread11111_task### model11111 {"id": null, "choices": [{"delta": {"content": "model11111 1 这是", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "model11111", "object": "chat.completion.chunk", "system_fingerprint": null}       
thread11111_task### model22222 {"id": null, "choices": [{"delta": {"con
上一篇:Macos m系列芯片环境下安装python3以及mysqlclient流程以及遇到的一系列问题

下一篇:当遇到 502 错误(Bad Gateway)怎么办