#!/user/bin/env python
# @Time :2018/7/7 11:42
# @Author :PGIDYSQ
#@File :DaemonTest.py
import threading,time
# 1.线程的简单使用
# class MyThread(threading.Thread):
# def __init__(self,num,threadname):
# threading.Thread.__init__(self,name=threadname)
# self.num = num
# def run(self):
# time.sleep(self.num)
# print(self.num)
#
# t1 = MyThread(1,'t1')
# t2 = MyThread(5,'t2')
#
# t2.daemon = True
# print(t1.daemon)
# print(t2.daemon)
#
# t1.start()
# t2.start() #2.多线程使用,线程同步技术 # class MyThread2(threading.Thread):
# def __init__(self):
# threading.Thread.__init__(self)
# #重写run方法
# def run(self):
# global x
# #获取锁,
# lock.acquire()#相当于同步语块
# x = x+3
# print(x)
# #释放锁
# lock.release()
#
# lock = threading.RLock()
# #存放多个线程列表
# t12 = []
# #创建线程
# for i in range(10):
# #创建线程并添加到列表中
# t = MyThread2()
# t12.append(t)
# #多个线程互斥访问的变量
# x = 0
# #启动所有线程
# for i in t12:
# i.start()
#3.使用Condition实现生产者与消费者关系
from random import randint
from time import sleep
#自定义生产者线程类
class Producer(threading.Thread):
def __init__(self,threadname):
threading.Thread.__init__(self,name = threadname)
def run(self):
global x
while True:
con.acquire()
if(len(x) == 10):
con.wait()
print('生产者等待中...')
else:
print('生产者:',end=' ')
x.append(randint(1,1000))
print('生产的数据:%s'%x)
sleep(1)
#唤醒等待条件的线程
con.notify()
#释放锁
con.release()
#自定义消费者线程类
class Consumer(threading.Thread):
def __init__(self,threadname):
threading.Thread.__init__(self,name = threadname)
def run(self):
global x
while True:
#获取锁
con.acquire()
if not x:
con.wait()
print("消费者等待中...")
else:
print('消费的数据:%s'%x.pop(0))
print('消费后剩余的数据:%s'%x)
sleep(2)
con.notify()
con.release() #创建Condition对象并测试
con = threading.Condition()
x = []
p = Producer("Producer")
c = Consumer("Consumer") # p.start()
# c.start()
# p.join()
# c.join()
#4.还有Queue对象,Event对象使用... #5.进程的使用
# from multiprocessing import Process
# import os
# from statistics import mean
#
# def f(name):
# print(os.getppid())
# print(name)
# if __name__ == '__main__':
# pro1 = Process(target=f,args=('yao',))
# pro1.start()
# pro1.join()
#6.计算二维数组每行的平均值
# x =[list(range(10)),list(range(20,30)),list(range(40,50)),list(range(70,90))]
# from multiprocessing import Pool
# from statistics import mean
# def f(x):
# return mean(x)#求平均值
# if __name__ == '__main__':
# with Pool(5) as p:
# print(p.map(f,x)) #7.使用管道实现进程间数据交换
# from multiprocessing import Process,Pipe
# def f(conn):
# conn.send('hello world')
# conn.close()
# if __name__ == "__main__":
# parent_conn,child_conn = Pipe()#创建管道对象,并设置接收端与发送端
# p = Process(target=f,args=(parent_conn,))#将管道的一方做为参数传递给子进程
# p.start()
# p.join()
# print(child_conn.recv())#接收管道发送方的信息
# child_conn.close()
#8.使用共享内存实现进程间数据的交换
# from multiprocessing import Process,Value,Array
# def f(n,a):
# n.value = 3.1415927
# for i in range(len(a)):
# a[i] = a[i] * a[i]
# if __name__ == "__main__":
# num = Value('d',0.0) #实型
# arr = Array('i',range(10)) #整型
# p = Process(target=f,args=(num,arr)) #创建进程对象
# p.start()
# p.join()
# print(num.value)
# print(arr[:])
#9.使用Manager对象实现进程间数据交互,Manager控制拥有list,dict,Lock,RLock,Semaphore,Condition,Event,Queue,Value等对象的服务端进程
# from multiprocessing import Manager,Process
# def f(d,l,t):
# d['name'] = 'Yaos'
# d['age'] = 34
# l.reverse()
# t.value=23
# if __name__ == "__main__":
# with Manager() as manager:
# d = manager.dict()
# l = manager.list(range(10))
# t = manager.Value('i',0)
# p = Process(target=f,args=(d,l,t))
# p.start()
# p.join()
# for item in d.items():
# print(item)
# print(l)
# print(t.value)
#10.进程同步技术与现场同步技术类似,可以使用Lock,Event对象
# from multiprocessing import Process,Event
# def f(e,i):
# if e.is_set():
# e.wait()
# print('hello world',i)
# e.clear()
# else:
# e.set()
# if __name__ == "__main__":
# e = Event()
# for num in range(6):
# Process(target=f,args=(e,num)).start()
#11.MapReduce框架的使用---大数据处理
#一种编程模型,用于大规模数据集(大于1TB)的并行计算
# import os,os.path,time
# '''大文件分割'''
# def FileSplit(sourceFile,targetFile):
# if not os.path.isfile(sourceFile):
# print(sourceFile,' does not File!')
# return
# if not os.path.isdir(targetFile):
# os.mkdir(targetFile)
# tempData = []
# number = 1000#切分后的每一个小文件包含number行数据
# fileNum = 1
# with open(sourceFile,'r',encoding='UTF-8') as srcFile:
# dataLine = srcFile.readline().strip()
# while dataLine:
# for i in range(number):
# tempData.append(dataLine)
# dataLine = srcFile.readline()
# if not dataLine:
# break
# desFile = os.path.join(targetFile,sourceFile[0:-4] + str(fileNum) + ".txt")
# with open(desFile,'a+',encoding='UTF-8') as f:
# f.writelines(tempData)
# tempData = []
# fileNum = fileNum + 1
#
# if __name__ == "__main__":
# FileSplit(sourceFile="test.txt",targetFile="test/SourceFileSplit") '''12.使用Map处理数据'''
# import os,re,threading,time
# def Map(sourceFile):
# if not os.path.exists(sourceFile):
# print(sourceFile,'does not exist')
# return
# pattern = re.compile(r'[0-9]{4}/[0-9]{1,2}/[0-9]{1,2}')
# result = {}
# with open(sourceFile,'r',encoding='UTF-8') as srcFile:
# for dataLine in srcFile:
# r = pattern.findall(dataLine)#查找符合日期格式的字符串
# if r:
# # result.get(r[0],0):get()方法用于返回指定键的对应值,当键不存在时,返回指定特定的值;与setdefault()一样,当键不存在时,新增指定的值
# result[r[0]] = result.get(r[0],0) + 1
# desFile = sourceFile.replace('SourceFileSplit','MapFile')[0:-4] + '_map.txt'
# with open(desFile,'a+',encoding='UTF-8') as fp:
# for k,v in result.items():
# fp.write(k+':'+str(v)+'\n')
# if __name__ == "__main__":
# desFolder = 'test/SourceFileSplit'
# files = os.listdir(desFolder)
# def Main(i):
# Map(desFolder+'\\'+files[i])
# fileNumber = len(files)
# for i in range(fileNumber):#多线程处理
# t = threading.Thread(target=Main,args=(i,))
# t.start()
'''13.使用Reducer获取最终数据'''
# from os.path import isdir
# from os import listdir
#
# def Reduce(sourceFolder,targetFile):
# if not isdir(sourceFolder):
# print(sourceFolder,'does not exist.')
# return
# result = {}
# #仅处理匹配的文件
# allFiles = [sourceFolder + '\\' + f for f in listdir(sourceFolder) if f.endswith('_map.txt')]
# for f in allFiles:
# with open(f,'r',encoding='UTF-8') as fp:
# for line in fp:
# line = line.strip()
# if not line:
# continue
# key,value = line.split(':')
# result[key] = result.get(key,0) + int(value)
# with open(targetFile,'w',encoding='UTF-8') as fp:
# for k,v in result.items():
# fp.write(k+':'+str(v)+'\n\n')
# if __name__ == "__main__":
# Reduce('test\\MapFile','test\\result.txt') #14.优化上面MapRedurce处理,需要进行分割文件,直接使用Map,Reduce
# import os,re,time
# def Map(sourceFile):
# if not os.path.exists(sourceFile):
# print(sourceFile,' does not exist.')
# return
# pattern = re.compile(r'[0-9]{4}/[0-9]{1,2}/[0-9]{1,2}')
# result = {}
# with open(sourceFile,'r',encoding='UTF-8') as srcFile:
# for dataLine in srcFile:
# r = pattern.findall(dataLine)
# if r:
# print(r[0],',',1)
# Map('test.txt')
#15.Redurce文件
# import os,sys
# def Reduce(targetFile):
# result = {}
# for line in sys.stdin:
# riqi,shuliang = line.strip().split(',')
# result[riqi] = result.get(riqi,0)+1
# with open(targetFile,'w',encoding='UTF-8') as fp:
# for k,v in result.items():
# fp.write(k+':'+str(v)+'\n')
# Reduce('result.txt')
本文旨在学习python中有关线程的基础知识,上述代码都是一个个相关的实例,仅供参考。