背景
- 在实际项目中,可能会通过前端框架使用 WebSocket 和后端进行通信
- 这里就来详细讲解下 FastAPI 是如何操作 WebSocket 的
模拟 WebSocket 客户端
#!usr/bin/env python # -*- coding:utf-8 _*- """ # author: 小菠萝测试笔记 # blog: https://www.cnblogs.com/poloyy/ # time: 2021/10/5 5:26 下午 # file: 46_websocket.py """ import uvicorn from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>小菠萝聊天室</title> </head> <body> <h1>小菠萝聊天室</h1> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> // 加载页面,自动创建一个 WebSocket 连接 var ws = new WebSocket("ws://localhost:8080/ws"); // 收到消息 ws.onmessage = function(event) { // 获取输入框的值 var messages = document.getElementById('messages') // 创建一个 li 元素 var message = document.createElement('li') // 接收 event 的 data var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; // 发送消息方法 function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ # 返回一段 HTML 代码给前端 @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): # 1、ws 连接 await websocket.accept() while True: # 2、接收客户端发送的内容 data = await websocket.receive_text() # 3、服务端发送内容 await websocket.send_text(f"小菠萝收到的消息是: {data}") if __name__ == '__main__': uvicorn.run(app="46_websocket:app", reload=True, host="127.0.0.1", port=8080)
启动 uvicorn 服务器,访问 127.0.0.1:8080/
客户端、服务端建立 WebSocket 连接成功
发送聊天信息
每发一条消息,均会显示在列表中
可以在其他地方使用 WebSocket
- Depends
- Security
- Cookie
- Header
- Path
- Query
在依赖项中使用 WebSocket
from typing import Optional import uvicorn from fastapi import FastAPI, WebSocket, Cookie, Query, status, Depends from fastapi.responses import HTMLResponse app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>小菠萝聊天室</h1> <form action="" onsubmit="sendMessage(event)"> <label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label> <label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label> <button onclick="connect(event)">Connect</button> <hr> <label>Message: <input type="text" id="messageText" autocomplete="off"/></label> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = null; function connect(event) { var itemId = document.getElementById("itemId") var token = document.getElementById("token") ws = new WebSocket("ws://localhost:8080/items/" + itemId.value + "/ws?token=" + token.value); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; event.preventDefault() } function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ @app.get("/") async def get(): return HTMLResponse(html) async def get_cookie_or_token( websocket: WebSocket, session: Optional[str] = Cookie(None), token: Optional[str] = Query(None) ): # 模拟:如果 session 和 token 都为空,则关闭 websocket if session or token: return session or token await websocket.close(code=status.WS_1008_POLICY_VIOLATION) @app.websocket("/items/{item_id}/ws") async def websocket_depends( websocket: WebSocket, item_id: str, q: Optional[str] = None, # 依赖项 cookie_or_token: str = Depends(get_cookie_or_token) ): # 1、创建 websocket 连接 await websocket.accept() while True: # 2、接收客户端发送的内容 data = await websocket.receive_text() # 3、服务端发送内容 await websocket.send_text(f"cookie or token value is:{cookie_or_token}") if q: # 4、如果有传查询参数 q,则再发一条 await websocket.send_text(f"query param value is:{q}") # 5、最后再发一条信息 await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}") if __name__ == '__main__': uvicorn.run(app="46_websocket:app", reload=True, host="127.0.0.1", port=8080)
发送聊天信息
不带查询参数 q
带查询参数 q
当 WebSocket 连接关闭时
await websocket.receive_text() 将引发 WebSocketDisconnect 异常,这不是期望看到的结果
处理断开连接和多个客户端
from typing import List import uvicorn from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status from fastapi.responses import HTMLResponse app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <h2>Your ID: <span id="ws-id"></span></h2> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var client_id = Date.now() document.querySelector("#ws-id").textContent = client_id; var ws = new WebSocket(`ws://localhost:8080/ws/${client_id}`); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ # 返回一段 HTML 代码给前端 @app.get("/") async def get(): return HTMLResponse(html) # 处理和广播消息到多个 WebSocket 连接 class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws/{client_id}") async def websocket_endpoint(client_id: str, websocket: WebSocket): # 1、客户端、服务端建立 ws 连接 await manager.connect(websocket) # 2、广播某个客户端进入聊天室 await manager.broadcast(f"{client_id} 进入了聊天室") try: while True: # 3、服务端接收客户端发送的内容 data = await websocket.receive_text() # 4、广播某个客户端发送的消息 await manager.broadcast(f"{client_id} 发送消息:{data}") # 5、服务端回复客户端 await manager.send_personal_message(f"服务端回复{client_id}:你发送的信息是:{data}", websocket) except WebSocketDisconnect: # 6、若有客户端断开连接,广播某个客户端离开了 manager.disconnect(websocket) await manager.broadcast(f"{client_id} 离开了聊天室") if __name__ == '__main__': uvicorn.run(app="48_websocket_handler:app", reload=True, host="127.0.0.1", port=8080)
- 模拟一个小型聊天室的场景
- 新的客户端进来,所有人都会收到新客户端进入聊天室的消息
- 某个客户端发送消息,所有人都能看到
- 某个客户端退出了(关闭浏览器),所有人都会收到该客户端退出聊天室的消息
浏览器打开三个 tab 均访问 127.0.0.1:8080
关掉其中一个客户端(tab)