网络并发编程

一.僵尸进程与孤儿进程

1.1 僵尸进程

进程代码运行结束之后并没有直接结束而是需要等待回收子进程资源才会结束

1.2 孤儿进程

主程序已经死亡,子程序还在运行

二.守护进程

守护进程即守护者某个进程,一旦该进程结束那么也随之结束

from multiprocessing import Process
import time


def test(name):
    print('总管:%s is running' % name)
    time.sleep(3)
    print('总管:%s is over' % name)


if __name__ == '__main__':
    p = Process(target=test, args=('jason',))
    p.daemon = True  # 设置为守护进程(一定要放在start语句上方)
    p.start()
    print("皇帝jason寿终正寝")
    time.sleep(0.1)

三.互斥锁

问题:并发情况下操作同一份数据,极其容易造成数据错乱

解决措施:将并发变成串行,虽然降低了效率,但是提升了数据安全。

锁就可以实现将并发变成串行的效果

锁分为:①行锁 ②表锁

import json
import time
import random
from multiprocessing import Process, Lock


# 查票
def find(name):
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
        print('您好,%s,目前剩余票量为%s' % (name, ticket_num))


# 买票
def buy(name):
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
        # 创建一个延迟
        time.sleep(1)
        if ticket_num > 0:
            data_dict['ticket_num'] -= 1
            with open(r'data.txt', 'w', encoding='utf8') as f:
                json.dump(data_dict, f)
                print('%s购票成功' % name)
        else:
            print('没票了')


def run(name):
    find(name)
    mutex.acquire()
    buy(name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(1, 11):
        p = Process(target=run, args=('用户%s' % i,))
        p.start()

四.消息队列

队列就是先进先出

 

from multiprocessing import Queue
q = Queue(5)
# put存放数据
q.put(3333)
q.put(111)
q.put(22)
# get拿取数据 顺序就是先进先出
print(q.get())
print(q.get())
print(q.get())
print(q.full())  # 判断队列中数据是否满了
try:
    print(q.get_nowait())  # 判断队列中是否还有数据 没有数据立即报错
except Exception as e:
    print(e)
    

 

五.ipc机制

from multiprocessing import Queue, Process


def producer(q):
    q.put("子进程p放的数据")


def consumer(q):
    print("子进程c取的数据", q.get())


if __name__ == '__main__':
    q = Queue()
    # 开一个进程
    p = Process(target=producer, args=(q,))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()
    # p.join()
    # c.join()

六.生产者消费者模型

生产者

负责产生数据(做包子的)

消费者

负责处理数据(吃包子的)

该模型需要解决恭喜不平衡现象

"""
生产者
    负责产生数据(做包子的)
消费者
    负责处理数据(吃包子的)
该模型需要解决恭喜不平衡现象
"""
from multiprocessing import Queue, Process, JoinableQueue
import time
import random


def producer(name, food, q):
    for i in range(10):
        print('%s 生产了 %s' % (name, food))
        q.put(food)
        time.sleep(random.random())


def consumer(name, q):
    while True:
        data = q.get()
        print('%s 吃了 %s' % (name, data))
        q.task_done()


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer, args=('大厨jason', '玛莎拉', q))
    p2 = Process(target=producer, args=('印度阿三', '飞饼', q))
    p3 = Process(target=producer, args=('泰国阿人', '榴莲', q))
    c1 = Process(target=consumer, args=('班长阿飞', q))

    p1.start()
    p2.start()
    p3.start()
    c1.daemon = True
    c1.start()

    p1.join()
    p2.join()
    p3.join()

    q.join()  # 等待队列中所有的数据被取干净

    print('主')

七.线程理论

什么是线程?
进程其实是一个资源单位 真正被CPU执行的其实是进程里面的线程
"""
进程类似于是工厂 线程类似于是工厂里面的一条条流水线
所有的进程肯定含有最少一个线程
"""
进程间数据默认是隔离的 但是同一个进程内的多个线程数据是共享的

八.开设线程的两种方式

from threading import Thread
import time


# 第一种
def test(name):
    start_time = time.time()
    print('%s哈哈哈' % name)
    time.sleep(2)  # io操作
    print('%s哈哈哈,结束了' % name)


t = Thread(target=test, args=('jason',))
t.start()
print('主')


# 第二种
class MyClass(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s哈哈哈' % self.name)
        time.sleep(2)  # io操作
        print('%s哈哈哈,结束了' % self.name)


a = MyClass('jason')
a.start()
a.join()
print('主')

九.守护线程

"""
主线程的结束意味着整个进程的结束
    所以主线程需要等待里面所有非守护线程的结束才能结束
"""
from threading import Thread
from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(3)
    print("end123")
def bar():
    print(456)
    time.sleep(1)
    print("end456")
if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")

十.线程数据共享

from threading import Thread
money = 100

def test():
    global money
    money = 999

t = Thread(target=test)
t.start()
t.join()
print(money)

十一.线程互斥锁

from threading import Thread, Lock
from multiprocessing import Lock
import time


num = 100


def test(mutex):
    global num
    mutex.acquire()
    # 先获取num的数值
    tmp = num
    # 模拟延迟效果
    time.sleep(0.1)
    # 修改数值
    tmp -= 1
    num = tmp
    mutex.release()

t_list = []
mutex = Lock()
for i in range(100):
    t = Thread(target=test, args=(mutex,))
    t.start()
    t_list.append(t)
# 确保所有的子线程全部结束
for t in t_list:
    t.join()
print(num)

十二.tcp服务端实行多线程并发

服务端

网络并发编程
import socket
from threading import Thread
from multiprocessing import Process


server = socket.socket()
server.bind(('127.0.0.1', 8081))
server.listen(5)  # 半连接池


def talk(sock):
    while True:
        try:
            data = sock.recv(1024)  # 接收消息
            if len(data) == 0:
                continue
            else:
                print(data.decode('utf8'))
                sock.send(data + b'jack, my_lover')
        except ConnectionResetError as e:
            print(e)
            break


while True:
    sock, address = server.accept()
    print(address)
    t = Thread(target=talk, args=(sock,))
    t.start()
View Code

客户端

网络并发编程
import socket
from threading import Thread
from multiprocessing import Process


client = socket.socket()
client.connect(('127.0.0.1', 8081))


while True:
    msg = input('你输入发送内容>>>:').strip()
    if len(msg) == 0:
        print('格式错误,重新输入')
        continue
    client.send(msg.encode('utf8'))  # 发送消息
    data = client.recv(1024)  # 接受消息
    print(data.decode('utf8'))
View Code

网络并发编程

 

上一篇:Spring Boot中使用Swagger3生成API文档


下一篇:关于Oracle导出数据的几个错误EXP-00008,ORA-00904,ORA-01003,EXP-00091