C语言基础学习PYTHON——基础学习D09
20180903内容纲要:
线程、进程
1、paramiko
2、线程、进程初识
3、多线程
(1)线程的调用方式
(2)join
(3)线程锁、递归锁、信号量
(4)Timer
(5)Event
(6)Queue队列
4、多进程
(1)多进程
(2)获取进程PID
(3)进程锁
(4)进程间的通讯
(5)进程池
5、小结
6、练习:简单主机批量管理工具
1、paramiko
paramiko模块提供了ssh及sft进行远程登录服务器执行命令和上传下载文件的功能。这是一个第三方的软件包,使用之前需要安装。
我个人觉得,这个在Windows上不太好用。在windows上python3需要安装vs2010,但我装了之后还是不行,可能是我自己的原因,在python2.7上好像可以,没试过。
这有链接:https://blog.csdn.net/songfreeman/article/details/50920767
很详细!
2 线程、进程初识
什么是线程(thread)?
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。
一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers. If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were. Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU. On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers. Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads. Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
英文解释
什么是进程(process)?
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。
程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
英文解释
那么他们的区别呢?
进程与线程的区别?
线程:是操作系统最小的调度单位。 是一串指令的集合。
进程:要操作cpu必须先创建一个线程
所有在同一个进程里的线程是共享一块内存空间
1、Threads share the address space of the process that created it; processes have their own address space.
1、线程共享内存空间,进程的内存是独立的 2、Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process. 3、Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
3、两个进程之间可以直接交流,两个进程进行通讯,必须通过一个中间代理来实现。 4、New threads are easily created; new processes require duplication of the parent process.
4、创建新线程很简单。创建新进程需要对其父进程进行一次克隆 5、Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
5、一个线程可以控制和操作同意进程里的其他线程,但是进程只能操作子进程 6、Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
进程与线程的区别
3 多线程
(1)线程的调用方式
直接调用:
#Author:ZhangKanghui import threading
import time def run(n):
print("task",n,threading.current_thread())
time.sleep(2)
'''#这里是只有两个线程,那么更多线程呢?
t1 = threading.Thread(target=run,args=("t1",))
t2 = threading.Thread(target=run,args=("t2",))
t1.start()
t2.start()
'''
#这是一个多线程,那么我们来看一下他整个程序的执行时间为什么不是2s
start_time = time.time()
for i in range(50):
t = threading.Thread(target=run,args=("t-%s" %i,)) #这个地方args必须有,。。因为这是默认一个元组
t.start()
#t.join() #这样可以把并行变成串行 print("mian threading has finished....",threading.current_thread(),threading.active_count())
print("cost:",time.time()-start_time)
#这是因为主线程和子线程没关系。主线程不会等子线程执行完毕才计算时间
#那接下来如果想要等所有的线程结束然后打印程序执行时间可以这么干
''' import threading
import time def run(n):
print("task",n)
time.sleep(2) start_time = time.time()
t_obj =[] #存线程实例
for i in range(50):
t = threading.Thread(target=run,args=("t-%s" %i,)) #这个地方args必须有,。。因为这是默认一个元组
#t.setDaemon(True) #把当前线程设置成守护线程,必须在启动strat之前
t.start()
t_obj.append(t) #为了不阻塞后面线程的启动,不在这里加join,先放到一个列表里
for i in t_obj: #循环线程实例列表,等待所有线程执行完毕
t.join() print("cost:",time.time()-start_time) '''
直接调用
继承式调用
#Author:ZhangKanghui import threading
import time class MyThread(threading.Thread):
def __init__(self,n,sleep_time):
super(MyThread,self).__init__()
self.n = n
self.sleep_time = sleep_time def run(self):
print("running task",self.n)
time.sleep(self.sleep_time)
print("task done",self.n) t1 =MyThread("t1",2)
t2 =MyThread("t2",4) t1.start()
t2.start() #t1.join()
#t2.join() print("mian threading....")
继承式调用
(2)join&Daemon
其实join就是wait。Daemon就是守护线程。
Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.
Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.
#Author:ZhangKanghui
import time
import threading def run(n):
print('[%s]------running----\n' % n)
time.sleep(2)
print('--done--') def main():
for i in range(5):
t = threading.Thread(target=run, args=[i, ])
t.start()
t.join(1)
print('starting thread', t.getName()) m = threading.Thread(target=main, args=[])
m.setDaemon(True) # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
m.start()
m.join()
# m.join(timeout=2)
print("---main thread done----")
join&Daemon
#Author:ZhangKanghui import threading
import time class MyThread(threading.Thread):
def __init__(self,n,sleep_time):
super(MyThread,self).__init__()
self.n = n
self.sleep_time = sleep_time def run(self):
print("running task",self.n)
time.sleep(self.sleep_time)
print("task done",self.n) t1 =MyThread("t1",2)
t2 =MyThread("t2",4) t1.start()
t2.start() t1.join()
t2.join() print("mian threading....")
join1
import threading
import time def run(n):
print("task",n)
time.sleep(2) start_time = time.time()
t_obj =[] #存线程实例
for i in range(50):
t = threading.Thread(target=run,args=("t-%s" %i,)) #这个地方args必须有,。。因为这是默认一个元组
#t.setDaemon(True) #把当前线程设置成守护线程,必须在启动strat之前
t.start()
t_obj.append(t) #为了不阻塞后面线程的启动,不在这里加join,先放到一个列表里
for i in t_obj: #循环线程实例列表,等待所有线程执行完毕
t.join() print("cost:",time.time()-start_time)
join2
(3)线程锁/递归锁/信号量
线程锁
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?
import time
import threading def addNum():
global num #在每个线程中都获取这个全局变量
print('--get num:',num )
time.sleep(1)
lock.acquire() #修改数据前加锁
num -=1 #对此公共变量进行-1操作
lock.release() #修改后释放 num = 100 #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t) for t in thread_list: #等待所有线程执行完毕
t.join() print('final num:', num )
线程锁
递归锁
就是在一个大锁中还要再包含子锁
import threading,time def run1():
print("grab the first part data")
lock.acquire()
global num
num +=1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2+=1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res,res2) if __name__ == '__main__': num,num2 = 0,0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start() while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num,num2)
递归锁
信号量(Semaphore)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 。
比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
import threading,time def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" %n)
semaphore.release() if __name__ == '__main__': num= 0
semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run,args=(i,))
t.start() while threading.active_count() != 1:
pass #print threading.active_count()
else:
print('----all threads done---')
print(num)
信号量
(4)Timer
This class represents an action that should be run only after a certain amount of time has passed。
#Author:ZhangKanghui
import threading
def hello():
print("hello, world") t = threading.Thread(target=hello)
t = threading.Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
Timer
(5)Event
An event is a simple synchronization object;通过Event来实现两个或多个线程间的交互。
the event represents an internal flag, and threads can wait for the flag to be set, or set or clear the flag themselves.
event.wait()
event.set()
event.clear()
下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
import threading,time
import random
def light():
if not event.isSet():
event.set() #wait就不阻塞 #绿灯状态
count = 0
while True:
if count < 10:
print('\033[42;1m--green light on---\033[0m')
elif count <13:
print('\033[43;1m--yellow light on---\033[0m')
elif count <20:
if event.isSet():
event.clear()
print('\033[41;1m--red light on---\033[0m')
else:
count = 0
event.set() #打开绿灯
time.sleep(1)
count +=1
def car(n):
while 1:
time.sleep(random.randrange(10))
if event.isSet(): #绿灯
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()
Event实例之红绿灯
这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import threading
import time
import random def door():
door_open_time_counter = 0
while True:
if door_swiping_event.is_set():
print("\033[32;1mdoor opening....\033[0m")
door_open_time_counter +=1 else:
print("\033[31;1mdoor closed...., swipe to open.\033[0m")
door_open_time_counter = 0 #清空计时器
door_swiping_event.wait() if door_open_time_counter > 3:#门开了已经3s了,该关了
door_swiping_event.clear() time.sleep(0.5) def staff(n): print("staff [%s] is comming..." % n )
while True:
if door_swiping_event.is_set():
print("\033[34;1mdoor is opened, passing.....\033[0m")
break
else:
print("staff [%s] sees door got closed, swipping the card....." % n)
print(door_swiping_event.set())
door_swiping_event.set()
print("after set ",door_swiping_event.set())
time.sleep(0.5)
door_swiping_event = threading.Event() #设置事件 door_thread = threading.Thread(target=door)
door_thread.start() for i in range(5):
p = threading.Thread(target=staff,args=(i,))
time.sleep(random.randrange(3))
p.start()
event实例之刷卡
(6)queue队列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class
queue.
Queue
(maxsize=0) #先入先出 - class
queue.
LifoQueue
(maxsize=0) #last in fisrt out - class
queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
-
Queue.
qsize
() -
Queue.
empty
() #return True if empty -
Queue.
full
() # return True if full -
Queue.
get
(block=True, timeout=None) -
Queue.
put
(item, block=True, timeout=None) -
Queue.
put_nowait
(item) -
Queue.
get_nowait
() -
Queue.
join
() block直到queue被消费完毕
a、先入先出
#Author:ZhangKanghui import queue
q =queue.Queue() q.put(1)
q.put(2)
q.put(3) print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
#先入先出,这个时候如果在获取呢?就会挂起卡死
print(q.get())
#在这里看不出来什么效果。去命令行试试
先入先出
存在一个问题就是,当把所有的数据都get到之后,程序就会挂起卡死。那这个时候怎么办呢?
q.get_nowait() 或者通过get(block=True,Timeout)参数进行设置
会出现先一个queue Empty的异常,这个时候可以通过捕获异常使程序正常进行。
b、后进先出
#Author:ZhangKanghui import queue
q =queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3) print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
后进先出LifoQueue
c、设置优先级
#Author:ZhangKanghui import queue q =queue.PriorityQueue()
q.put((1,"erha"))
q.put((5,"hashiqi"))
q.put((3,"taidi"))
print(q.get())
print(q.get())
print(q.get())
优先级队列
下面以一个生产者消费者的模型实例来深刻体会一下队列。
import threading
import queue def producer():
for i in range(10):
q.put("baozi %s" % i ) print("开始等待所有的包子被取走...")
q.join()
print("所有的包子被取完了...") def consumer(n): while q.qsize() >0: print("%s 取到" %n , q.get())
q.task_done() #告知这个任务执行完了 q = queue.Queue() p = threading.Thread(target=producer,)
p.start() c1 = consumer("二哈")
生产者消费者模型1
#Author:ZhangKanghui import threading,time
import queue q = queue.Queue()
def Producer(namne):
count = 0
for i in range(10):
q.put("骨头%s"%count)
print("生产了骨头",count)
count +=1
#time.sleep(2)
time.sleep(1)
#time.sleep(0.5) def Consumer(name):
while True:
print("[%s] 取到 [%s] 并吃了它..."%(name,q.get())) p =threading.Thread(target=Producer,args=("Kanghui",))
c =threading.Thread(target=Consumer,args=("erha",))
c1 =threading.Thread(target=Consumer,args=("taidi",)) p.start()
c.start()
c1.start()
生产者消费者模型2
4 进程
(1)多进程
Multiprocessing is a package that supports spawning processes using an API similar to the threading module.
#Author:ZhangKanghui
import multiprocessing
import threading
import time #在进程中启动线程
def thread_run():
print(threading.get_ident()) #打印线程编号
def run(name):
time.sleep(2)
print("hello",name)
t = threading.Thread(target=thread_run)
t.start() if __name__ == '__main__':
for i in range(10):
p =multiprocessing.Process(target=run,args=('Kanghui',))
p.start()
多进程1
from multiprocessing import Process
import os def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("\n\n") def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name) if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',))
p.start()
p.join()
多进程2
(2)获取进程pid
进程pid在任务管理器中可以查到。通过查看进程pid来区分各个进程。
#Author:ZhangKanghui
import multiprocessing
import os def info(title):
print(title)
print("module name",__name__)
print("parent process",os.getppid()) #输出主进程PID可以在任务管理器中查到
print("process id",os.getpid())
print("\n\n") #子进程启动依赖于父进程
def f(name):
info('\033[31;1mfuction f\033[0m')
print("hello",name) if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p =multiprocessing.Process(target=f,args=('Kanghui',))
p.start()
#p.join()
获取进程id
(3)进程锁
虽然进程之间是相互独立的,还要进程锁有什么用呢?进程锁存在的意义在于共享屏幕打印会乱。
#Author:ZhangKanghui #虽然进程之间是相互独立的,还要进程锁有什么用呢?进程锁存在的意义在于共享屏幕打印会乱
from multiprocessing import Process,Lock def f(l,i):
l.acquire()
try:
print("hello world",i)
finally:
l.release() if __name__ == '__main__':
lock = Lock() for num in range(10):
Process(target=f,args=(lock,num)).start()
进程锁
(4)进程间的通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
a、Queue
#Author:ZhangKanghui #通过复制出一个q,经过pickle序列化使得两个进程间完成通讯
from multiprocessing import Process
from multiprocessing import Queue
import queue def f(q):
q.put([42,None,'hello'])
if __name__ == '__main__':
#q =queue.Queue()
#如果这样会出现这样的错误TypeError: can't pickle _thread.lock objects
q =Queue()
p =Process(target=f,args=(q,))
p.start()
print(q.get())
#p.join()
进程间的通讯Queue
b、pipes
#Author:ZhangKanghui from multiprocessing import Process,Pipe def f(conn):
conn.send([42,None,'hello from child'])
conn.send([42,None,'hello from child2'])
print("from parent:",conn.recv())
conn.close() if __name__ == '__main__':
parent_conn,child_conn = Pipe()
p = Process(target=f,args=(child_conn,))
p.start()
print(parent_conn.recv())
print(parent_conn.recv())
parent_conn.send("how are you")
p.join()
进程间的通讯pipes
c、managers
#Author:ZhangKanghui from multiprocessing import Process,Manager
import os def f(d,l):
# d[1] = '1'
# d['2'] = 2
# d[0.25] = None
d[os.getpid()] = os.getpid()
l.append(os.getpid())
print(l) if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() #生成一个字典,可在多个进程间共享和传递
l = manager.list(range(5)) #生成一个列表,可在多个进程间共享和传递
p_list = [] for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p)
for res in p_list: #等待结果
res.join() print(d)
进程间的通讯managers
(5)进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:apply 和 apply_async
#Author:ZhangKanghui
from multiprocessing import Process,Pool
import time
import os def Foo(i):
time.sleep(2)
print("in process",os.getpid())
return i+100 def Bar(arg):
print(">>exec done:",arg,os.getpid()) if __name__ == '__main__': #在windows中多线程必须加上这么一句话,注释掉可以看看是什么错误
pool = Pool(processes=2) #允许进程池中同时放进5个进程
print("主进程PID:",os.getpid())
print(__name__)
if __name__ == '__main__': #这句代码的意思就是这些代码只能手动执行,如果被当做模块导入的话是不会执行的
for i in range(10):
pool.apply_async(func=Foo,args=(i,),callback=Bar) #callback回调
#回调函数式在主进程中进行,通过os.getpid()可以确认,这样的目的是能搞提高效率。
#pool.apply_async(func=Foo,args=(i,)) #异步执行
#pool.apply(func=Foo,args=(i,)) #同步执行 print("end")
pool.close()
pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
#一定要关闭进程池再join
进程池
5 小结
志不坚者智不达。
撑不住的时候可以对自己说声“我好累”,但永远不要再心里承认“我不行”。
没有鸡汤就没有生活。
6 练习
需求:
- 主机分组
- 登录后显示主机分组,选择分组后查看主机列表
- 可批量执行命令、发送文件,结果实时返回
- 主机用户名密码可以不同
流程图:
### 作者介绍:
* author:lzl
### 博客地址:
* http://www.cnblogs.com/lianzhilei/p/5881434.html ### 功能实现
题目:简单主机批量管理工具 需求:
主机分组
登录后显示主机分组,选择分组后查看主机列表
可批量执行命令、发送文件,结果实时返回
主机用户名密码可以不同 ### 目录结构:
Host-Manage
│
├── ftpclient #客户端程序
├── README.txt
├── management.py #服务端入口程序
├── database #数据库
├── test.py #修改数据库 ### 注释
可批量执行命令、发送文件
上传命令格式: put database /tmp/db ### 运行环境
windows系统
python3.0+ README
Readme
import json
import paramiko
import threading class Remotehost(object):
#远程操作主机
def __init__(self,host,port,username,password,cmd):
self.host = host
self.port = port
self.username = username
self.password = password
self.cmd = cmd def command(self):
#获取命令
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 允许连接不在know_hosts文件中的主机
ssh.connect(hostname=self.host, port=self.port, username=self.username, password=self.password) # 连接服务器
stdin, stdout, stderr = ssh.exec_command(self.cmd) # 获取命令结果
res ,err = stdout.read(),stderr.read() # 三元运算
result = res if res else err
print("[%s]".center(50,"-")%self.host)
print(result.decode()) # 打印输出
ssh.close() def put(self):
#上传
try:
transport = paramiko.Transport((self.host, self.port))
transport.connect(username=self.username, password=self.password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(self.cmd.split()[1], self.cmd.split()[2]) # 上传文件
transport.close()
print("\033[32;0m【%s】 上传 文件【%s】 成功....\033[0m"%(self.host,self.cmd.split()[2]))
except Exception as error: # 抓住异常
print("\033[31;0m错误:【%s】【%s】\033[0m"%(self.host,error)) def run(self):
#反射
cmd_str = self.cmd.split()[0]
if hasattr(self,cmd_str):
getattr(self,cmd_str)()
else:
setattr(self,cmd_str,self.command)
getattr(self,cmd_str)() if __name__ == "__main__":
#主程序
with open("database","r") as file:
data_dict = json.loads(file.read()) #获取数据库信息
for k in data_dict: #打印地址组
print(k) group_choice = input("输入要操作的组名:").strip()
if data_dict.get(group_choice):
host_dict = data_dict[group_choice] #定义主机字典
for k in host_dict: #打印所选地址组所有的主机名
print(k)
while True:
cmd = input("选择进行的操作的命令:").strip()
thread_list=[]
if cmd: #命令不为空
for k in host_dict:
host, port, username, password=k,host_dict[k]["port"],host_dict[k]["username"],host_dict[k]["password"]
func = Remotehost(host,port,username,password,cmd) #实例化类
t = threading.Thread(target=func.run) #创建线程
t.start()
thread_list.append(t)
for t in thread_list:
t.join() #等待线程执行结果
else:
print("\033[31;0m操作组不存在\033[0m")
简单主机批量管理工具主程序
相关参考:https://www.cnblogs.com/0zcl/p/6352278.html
我是尾巴~
这次推荐:摄影app合集 https://mp.weixin.qq.com/s/3N3m7otgKIZXGyUpGKbTpw
虽不才,才要坚持。