(P4)多线程,线程池

文章目录

1.Python 创建多线程的方法

  • 步骤:
1、准备一个函数
def my_func(a, b):
   do_craw(a,b)

2、怎样创建一个线程
import threading
t = threading.Thread(target=my_func, args=(100, 200)

3、启动线程
t.start()

4、等待结束
t.join()
  • eg:blog_spider.py
import requests
from bs4 import BeautifulSoup

urls = [
    f"https://www.cnblogs.com/sitehome/p/{page}"
    for page in range(1, 50 + 1)
]


def craw(url):
    #print("craw url: ", url)
    r = requests.get(url)
    ##len(r.text)表示该url网页的长度,r.text表示网页的内容
    # print(url, len(r.text))
    return r.text


def parse(html):
    # class="post-item-title"
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    ##解析出链接:(link["href"]
    ##解析出来标题:link.get_text()
    return [(link["href"], link.get_text()) for link in links]


if __name__ == "__main__":
    for result in parse(craw(urls[2])):
        print(result)


  • 1.multi_thread_craw.py
import blog_spider
import threading
import time


def single_thread():
    print("single_thread begin")
    for url in blog_spider.urls:
        blog_spider.craw(url)
    print("single_thread end")


def multi_thread():
    print("multi_thread begin")
    threads = []
    ##开启了50个线程,因为blog_spider.urls有50个
    for url in blog_spider.urls:
        threads.append(
            ##(url,)表示其元组,(url)表示的是字符串
            threading.Thread(target=blog_spider.craw, args=(url,))
        )

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("multi_thread end")


if __name__ == "__main__":
    start = time.time()
    single_thread()
    end = time.time()
    print("single thread cost:", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi thread cost:", end - start, "seconds")

2.Python实现生产者消费者爬虫

  • eg:02. producer_consumer_spider.py
import queue
import blog_spider
import time
import random
import threading

##输入队列url_queue,输出队列:html_queue
##queue.Queue仅仅做标明类型作用,不用也行
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = blog_spider.craw(url)
        html_queue.put(html)

        ##打印当前线程的名字:threading.current_thread().name
        print(threading.current_thread().name, f"craw {url}",
              "url_queue.size=", url_queue.qsize())
        ##随机睡眠1-2s
        time.sleep(random.randint(1, 2))


def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()
        results = blog_spider.parse(html)
        for result in results:
            fout.write(str(result) + "\n")
        print(threading.current_thread().name, f"results.size", len(results),
              "html_queue.size=", html_queue.qsize())
        time.sleep(random.randint(1, 2))


if __name__ == "__main__":
    url_queue = queue.Queue()
    html_queue = queue.Queue()

    ##主线程将数据扔到生产者
    for url in blog_spider.urls:
        url_queue.put(url)

    ##3个生产者线程,生产者产生中间的数据仍到html_queue
    for idx in range(3):
        ##name=f"craw{idx}"表示线程名字
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue),
                             name=f"craw{idx}")
        t.start()

    ##2个消费者线程对html_queue进行处理
    fout = open("02.data.txt", "w")
    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout),
                             name=f"parse{idx}")
        t.start()

爬取标题
(P4)多线程,线程池

  • 测试:
    (P4)多线程,线程池

3.Python线程安全

  • 线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成

  • 由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全

  • Lock 用于解决线程安全问题

用法1:try-finally模式
import threading

lock = threading.Lock()

lock.acquire()
try:
    # do something
finally:
    lock.release()

用法2:with 模式
import threading

lock = threading.Lock()

with lock:
    # do something
  • eg:
import threading
import time

lock = threading.Lock()

class Account:
    def __init__(self, balance):
        self.balance = balance


def draw(account, amount):
    with lock:
        if account.balance >= amount:
            ##sleep一定会导致线程阻塞,进行线程切换
            time.sleep(0.1)
            print(threading.current_thread().name,
                  "取钱成功")
            account.balance -= amount
            print(threading.current_thread().name,
                  "余额", account.balance)
        else:
            print(threading.current_thread().name,
                  "取钱失败,余额不足")


if __name__ == "__main__":
    account = Account(1000)
    ta = threading.Thread(name="ta", target=draw, args=(account, 800))
    tb = threading.Thread(name="tb", target=draw, args=(account, 800))

    ta.start()
    tb.start()

  • 测试:
    (P4)多线程,线程池

4.线程池

  • 线程:新建线程系统需要分配资源、终止线程系统需要回收资源
    如果可以重用线程,则可以减去新建/终止的开销
    (P4)多线程,线程池

  • 线程池:线程池(可重用线程)+任务队列实现了线程池的功能
    提前建好的线程,线程可以重复使用,好处是:减少了新建/终止线程的开销以及线程切换的开销。
    新任务到来时先放到任务队列中,线程池中的线程会挨个从任务队列中取出任务依次执行,没有任务了就回到线程池等待下一个任务的到来。
    (P4)多线程,线程池

  • 使用线程池的好处
    (1)提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源;
    (2)适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
    (3)防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
    (4)代码优势:使用线程池的语法比自己新建线程执行线程更加简洁

from concurrent.futures import ThreadPoolExecutor, as_completed
用法1:map函数,很简单
注意map的结果和入参是顺序对应的
with ThreadPoolExecutor() as pool:
	##urls是参数列表
    results = pool.map(craw, urls)

    for result in results:
        print(result)

用法2:future模式,更强大
注意如果用as_completed顺序是不定的

with ThreadPoolExecutor() as pool:
    
   futures = [ pool.submit(craw, url)
                     for url in urls ]
	##按照顺序进行打印
    for future in futures:
        print(future.result())
    ##谁先执行完,就打印谁
    for future in as_completed(futures):
        print(future.result())


  • eg:04. thread_pool.py

import concurrent.futures
import blog_spider

# craw
with concurrent.futures.ThreadPoolExecutor() as pool:
    htmls = pool.map(blog_spider.craw, blog_spider.urls)
    ##zip(blog_spider.urls, htmls)将每个url和html对应起来
    htmls = list(zip(blog_spider.urls, htmls))
    for url, html in htmls:
        print(url, len(html))

print("craw over")

# parse
with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    for url, html in htmls:
        ##参数是单个html
        future = pool.submit(blog_spider.parse, html)
        futures[future] = url

    ##输出方法1
    #for future, url in futures.items():
    #    print(url, future.result())

    ##输出方法2
    for future in concurrent.futures.as_completed(futures):
        url = futures[future]
        print(url, future.result())
  • 测试:
    按照顺序获取
    (P4)多线程,线程池
    输出方法1的结果,按照顺序解析
    (P4)多线程,线程池
    输出方法2的结果,不是按照顺序解析
    (P4)多线程,线程池

5.使用线程池优化Web服务器

  • Web服务的架构以及特点
    (1)Web服务对响应时间要求非常高,比如要求200MS返回
    (2)Web服务有大量的依赖IO操作的调用,比如磁盘文件、数据库、远程API
    (3)Web服务经常需要处理几万人、几百万人的同时请求
    (P4)多线程,线程池
  • 使用线程池ThreadPoolExecutor加速
    使用线程池ThreadPoolExecutor的好处:
    (1)方便的将磁盘文件、数据库、远程API的IO调用并发执行
    (2)线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能
    (P4)多线程,线程池
  • eg:05. flask_thread_pool.py
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor

##起个名字
app = flask.Flask(__name__)

##初始化一个pool对象
pool = ThreadPoolExecutor()


def read_file():
    ##100毫秒,sleep模拟IO操作
    time.sleep(0.1)
    return "file result"


def read_db():
    time.sleep(0.2)
    return "db result"


def read_api():
    time.sleep(0.3)
    return "api result"


@app.route("/")
def index():
    # 不用pool
    ##模拟读取web的3个三个操作
    # result_file = read_file
    # result_db = read_db
    # result_api = read_api

    # return json.dumps({
    #     "result_file": result_file,
    #     "result_db": result_db,
    #     "result_api": result_api,
    # })


    # 使用pool
    ##模拟读取web的3个三个操作
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)

    return json.dumps({
        "result_file": result_file.result(),
        "result_db": result_db.result(),
        "result_api": result_api.result(),
    })


if __name__ == "__main__":
    ##启动flask方法
    app.run()

  • 测试:
    点开就可以看到浏览器访问的结果
    (P4)多线程,线程池
    各方法访问的时间的测试,win可以使用postman。下面的结果是不用pool
    (P4)多线程,线程池
    使用pool加速后的结果,运行的最长时间是300ms,另外2个是在300ms运行期间完成的,由原来三者方法加和时间,变成了耗费最长方法的时间
    (P4)多线程,线程池

  • 参考:链接

上一篇:P4编程环境搭建


下一篇:Oracle中用触发器实现自动记录表数据被修改的历史信息