42. 使用线程池

前面实现了一个多线程的web视频监控服务器,由于服务器资源有限(CPU,内存,带宽),需要对请求连接数(线程数)做限制,避免因资源耗尽而瘫痪。

要求:使用线程池,替代原来的每次请求创建线程。

解决方案:

使用标准库汇总concurrent.futures下的ThreadPoolExecutor类,对象的submit()map()方法可以用来启动线程池中的线程执行任务。


  • 对于ThreadPoolExecutor类:
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Executor的一个子类,使用最多max_workers个线程的线程池来异步执行调用。

initializer是在每个工作者线程开始处调用的一个可选可调用对象。initargs是传递给初始化器的元组参数。任何向池提交更多工作的尝试,initializer都将引发一个异常,当前所有等待的工作都会引发一个BrokenThreadPool。

submit(fn, *args, **kwargs)

调度可调用对象fn,以fn(*args **kwargs)方式执行并返回Future对象代表可调用对象的执行。

map(func, *iterables, timeout=None, chunksize=1)

类似于map(func, *iterables),不过立即收集iterables而不是延迟再收集,另外func是异步执行的且对func的调用可以并发执行。

>>> import threading, time, random>>> def f(a, b):...     print(threading.current_thread().name, ':', a, b)...     time.sleep(random.randint(5, 10))...     return a * b...>>> from concurrent.futures import ThreadPoolExecutor>>> executor = ThreadPoolExecutor(3)                #创建3个线程的线程池>>> executor.submit(f, 2, 3)ThreadPoolExecutor-0_0 : 2 3<Future at 0x7f831190a4e0 state=running>>>> future = executor.submit(f, 2, 3)ThreadPoolExecutor-0_0 : 2 3>>> future.result()6

>>> executor.map(f, range(1, 6), range(2, 7))ThreadPoolExecutor-0_1 : 1 2<generator object Executor.map.<locals>.result_iterator at 0x7f830ef736d8>>>> ThreadPoolExecutor-0_0 : 2 3ThreadPoolExecutor-0_2 : 3 4ThreadPoolExecutor-0_1 : 4 5ThreadPoolExecutor-0_0 : 5 6>>> list(executor.map(f, range(1, 6), range(2, 7)))ThreadPoolExecutor-0_2 : 1 2ThreadPoolExecutor-0_1 : 2 3ThreadPoolExecutor-0_0 : 3 4ThreadPoolExecutor-0_2 : 4 5ThreadPoolExecutor-0_1 : 5 6[2, 6, 12, 20, 30]

这里执行map()方法时,首先3个线程执行任务,执行完毕后返回线程池,然后再次得到2个线程执行任务,直到所有任务全部执行完毕。

调用summit()是执行一个任务,而调用map()是对所有任务依次执行。


  • 方案示例:
import os, cv2, time, struct, threadingfrom http.server import HTTPServer, BaseHTTPRequestHandlerfrom socketserver import TCPServer, ThreadingTCPServerfrom threading import Thread, RLockfrom select import selectfrom concurrent.futures import ThreadPoolExecutorclass JpegStreamer(Thread):
    def __init__(self, camrea):
        super().__init__()
        self.cap = cv2.VideoCapture(camrea)
        self.lock = RLock()
        self.pipes = {}

    def register(self):
        pr, pw = os.pipe()
        self.lock.acquire()
        self.pipes[pr] = pw
        self.lock.release()
        return pr    def unregister(self, pr):
        self.lock.acquire()
        pw = self.pipes.pop(pr)
        self.lock.release()
        os.close(pr)
        os.close(pw)

    def capture(self):
        cap = self.cap        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send_frame(self, frame):
        n = struct.pack('1', len(frame))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.values(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send_frame(frame)class JpegRetriever:
    def __init__(self, streamer):
        self.streamer = streamer
        self.local = threading.local()

    def retrieve(self):
        while True:
            ns = os.read(self.local.pipe, 8)
            n = struct.unpack('1', ns)[0]
            data = os.read(self.local.pipe, n)
            yield data    def __enter__(self):
        if hasattr(self.local, 'pipe'):
            raise RuntimeError()

        self.local.pipe = streamer.register()
        return self.retrieve()

    def __exit__(self, *args):
        self.streamer.unregister(self.local.pipe)
        del self.local.pipe        return Trueclass WebHandler(BaseHTTPRequestHandler):
    retriever = None

    @staticmethod
    def set_retriever(retriever):
        WebHandler.retriever = retriever    def do_GET(self):
        if self.retriever is None:
            raise RuntimeError('no retriever')

        if self.path != '/':
            return

        self.send_response(200)
        self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=jpeg_frame')
        self.end_headers()

        with self.retriever as frames:
            for frame in frames:
                self.send_frame(frame)

    def send_frame(self, frame):
        sh = b'--jpeg_frame\r\n'
        sh += b'Content-Type: image/jpeg\r\n'
        sh += b'Content-Length: %d\r\n\r\n' % len(frame)
        self.wfile.write(sh)
        self.wfile.write(frame)class ThreadingPoolTCPServer(ThreadingTCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, thread_n=100):
        super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)
        self.executor = ThreadPoolExecutor(thread_n)
    
    def process_request(self, request, client_address):
        self.executor.submit(self.process_request_thread, request, client_address)if __name__ == '__main__':
    # 创建Streamer,开启摄像头采集
    streamer = JpegStreamer(0)
    streamer.start()

    # http服务器创建Retriever
    retriever = JpegRetriever(streamer)
    WebHandler.set_retriever(retriever)

    # 开启http服务器
    HOST = '192.168.30.128'
    POST = 9000
    print('Start server...(http://%s:%d)' % (HOST, POST))
    httpd = ThreadingPoolTCPServer((HOST, POST), WebHandler, thread_n=3)                #线程池线程数量为3
    httpd.serve_forever()

此时,通过浏览器访问会发现,至多3个窗口可以同时访问,即至多产生3条访问记录,更多访问无法得到响应,因为线程池中的线程数量只为3。


上一篇:力扣42题(接雨水)


下一篇:mysql 开发进阶篇系列 42 逻辑备份与恢复(mysqldump 的完全恢复)