1、队列、线程
import MySQLdb import MySQLdb.cursors import queue import threading def update_line_thread(): connection = MySQLdb.connect(host='xx.xx.xx.xx', port=3306, user='', passwd='', db='db_name', charset='utf8mb4', connect_timeout=100000, cursorclass=MySQLdb.cursors.SSCursor, autocommit=1) cursor = connection.cursor() sql = 'UPDATE table SET col_name=%s,where id=%s' while True: stroke,Id= q.get() keys = (stroke,Id) try: cursor.execute(sql, keys) except Exception as e: print(e,fid) q.task_done() q = queue.Queue() for i in range(10): t = threading.Thread(target=update_line_thread, daemon=True) t.start() for ID in id2stroke: q.put((id2stroke[ID], ID)) q.join()
2、进程池 与 线程池
import MySQLdb
import MySQLdb.cursors
import pymongo
from multiprocessing.pool import Pool, ThreadPool def gen_doc(paper_list): with ThreadPool(7) as tp: p1 = tp.apply_async(get_paper_info, (paper_list, )) p2 = tp.apply_async(get_paper_field, (paper_list, )) p3 = tp.apply_async(get_paper_author, (paper_list, )) paper_info = p1.get() paper_field = p2.get() paper_author = p3.get() for line in paper_info: yield { "key":"value"} def update_batch(paper_list): conn= pymongo.MongoClient('xx,xx,xx,xx', 27017, username="", password="").db_name.collection_name for doc in gen_doc(paper_list): # conn.delete_one({'_id':doc['_id']}) conn.replace_one({'_id': doc['_id']}, doc, upsert=True) def update_by_list(paper_list): paper_list.sort() batch_size = 1000 batch_list = [paper_list[begin:begin+batch_size] for begin in range(0, len(paper_list), batch_size)] print("Total batch num: ", len(batch_list)) with Pool(25) as pool: # pool.map(update_batch, batch_list, chunksize=200) for idx, _ in enumerate(pool.imap_unordered(update_batch, batch_list)): if idx % 100 == 0: print(idx)