例如,我们实现了一个web视频监控服务器,服务器端采集摄像头数据,客户端使用浏览器通过http请求接收数据。服务器使用推送的方式(multipart/x-mixed-replace)一直使用一个tcp连接向客户端传递数据。这种方式将持续占用一个线程,导致单线程服务器无法处理多客户端请求。
要求:改写程序,在每个线程中处理一个客户端请求,支持多客户端访问。
解决方案:
threading.local()
函数可以创建线程本地数据空间,其下属性对每个线程独立存在。
- 对于线程本地数据:
线程本地数据是特定线程的数据。管理线程本地数据,只需要创建一个local(或者一个子类型)的实例并在实例中储存属性:
mydata = threading.local()mydata.x = 1
在不同的线程中,实例的值会不同。class threading.local
,一个代表线程本地数据的类。
- 方案示例:
yum install -y numpy opencv* python-qt4 pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple opencv-contrib-python==3.4.2.17
import os, cv2, time, struct, threadingfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom socketserver import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectclass JpegStreamer(Thread): def __init__(self, camrea): super().__init__() self.cap = cv2.VideoCapture(camrea) self.lock = RLock() self.pipes = {} def register(self): pr, pw = os.pipe() self.lock.acquire() self.pipes[pr] = pw self.lock.release() return pr def unregister(self, pr): self.lock.acquire() pw = self.pipes.pop(pr) self.lock.release() os.close(pr) os.close(pw) def capture(self): cap = self.cap while cap.isOpened(): ret, frame = cap.read() if ret: ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) yield data.tostring() def send_frame(self, frame): n = struct.pack('1', len(frame)) self.lock.acquire() if len(self.pipes): _, pipes, _ = select([], self.pipes.values(), [], 1) for pipe in pipes: os.write(pipe, n) os.write(pipe, frame) self.lock.release() def run(self): for frame in self.capture(): self.send_frame(frame)class JpegRetriever: def __init__(self, streamer): self.streamer = streamer def retrieve(self): while True: ns = os.read(self.pipe, 8) n = struct.unpack('1', ns)[0] data = os.read(self.pipe, n) yield data def __enter__(self): if hasattr(self, 'pipe'): raise RuntimeError() self.pipe = streamer.register() return self.retrieve() def __exit__(self, *args): self.streamer.unregister(self.pipe) del self.pipe return Trueclass WebHandler(BaseHTTPRequestHandler): retriever = None @staticmethod def set_retriever(retriever): WebHandler.retriever = retriever def do_GET(self): if self.retriever is None: raise RuntimeError('no retriever') if self.path != '/': return self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=jpeg_frame') self.end_headers() with self.retriever as frames: for frame in frames: self.send_frame(frame) def send_frame(self, frame): sh = b'--jpeg_frame\r\n' sh += b'Content-Type: image/jpeg\r\n' sh += b'Content-Length: %d\r\n\r\n' % len(frame) self.wfile.write(sh) self.wfile.write(frame)if __name__ == '__main__': # 创建Streamer,开启摄像头采集 streamer = JpegStreamer(0) streamer.start() # http服务器创建Retriever retriever = JpegRetriever(streamer) WebHandler.set_retriever(retriever) # 开启http服务器 HOST = '192.168.30.128' #本机ip POST = 9000 print('Start server...(http://%s:%d)' % (HOST, POST)) httpd = TCPServer((HOST, POST), WebHandler) httpd.serve_forever()
我这里是Windows系统,linux虚拟机root运行提示:
VIDEOIO ERROR: V4L: can't open camera by index 0 Start server...(http://192.168.30.128:9000)
不过无所谓,当通过浏览器访问192.168.30.128:9000
时,即使多个窗口同时访问,也只有一个访问记录产生,说明只有一个线程建立。
改进:通过threading.local()
函数创建线程本地数据空间,数据对每个线程独立。
import os, cv2, time, struct, threadingfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom socketserver import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectclass JpegStreamer(Thread): def __init__(self, camrea): super().__init__() self.cap = cv2.VideoCapture(camrea) self.lock = RLock() self.pipes = {} def register(self): pr, pw = os.pipe() self.lock.acquire() self.pipes[pr] = pw self.lock.release() return pr def unregister(self, pr): self.lock.acquire() pw = self.pipes.pop(pr) self.lock.release() os.close(pr) os.close(pw) def capture(self): cap = self.cap while cap.isOpened(): ret, frame = cap.read() if ret: ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40)) yield data.tostring() def send_frame(self, frame): n = struct.pack('1', len(frame)) self.lock.acquire() if len(self.pipes): _, pipes, _ = select([], self.pipes.values(), [], 1) for pipe in pipes: os.write(pipe, n) os.write(pipe, frame) self.lock.release() def run(self): for frame in self.capture(): self.send_frame(frame)class JpegRetriever: def __init__(self, streamer): self.streamer = streamer self.local = threading.local() #创建local对象 def retrieve(self): while True: ns = os.read(self.local.pipe, 8) n = struct.unpack('1', ns)[0] data = os.read(self.local.pipe, n) yield data def __enter__(self): if hasattr(self.local, 'pipe'): raise RuntimeError() self.local.pipe = streamer.register() return self.retrieve() def __exit__(self, *args): self.streamer.unregister(self.local.pipe) del self.local.pipe return Trueclass WebHandler(BaseHTTPRequestHandler): retriever = None @staticmethod def set_retriever(retriever): WebHandler.retriever = retriever def do_GET(self): if self.retriever is None: raise RuntimeError('no retriever') if self.path != '/': return self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=jpeg_frame') self.end_headers() with self.retriever as frames: for frame in frames: self.send_frame(frame) def send_frame(self, frame): sh = b'--jpeg_frame\r\n' sh += b'Content-Type: image/jpeg\r\n' sh += b'Content-Length: %d\r\n\r\n' % len(frame) self.wfile.write(sh) self.wfile.write(frame)if __name__ == '__main__': # 创建Streamer,开启摄像头采集 streamer = JpegStreamer(0) streamer.start() # http服务器创建Retriever retriever = JpegRetriever(streamer) WebHandler.set_retriever(retriever) # 开启http服务器 HOST = '192.168.30.128' POST = 9000 print('Start server...(http://%s:%d)' % (HOST, POST)) httpd = ThreadingTCPServer((HOST, POST), WebHandler) httpd.serve_forever()
接下来继续多个浏览器多个窗口访问,可以同时进行多个访问,且每个访问都会新产生一条访问记录。