python学习笔记_week9

一、paramiko模块

SSHClient,用于连接远程服务器并执行基本命令

基于用户名密码连接:

ssh执行命令:stdin,stdout,sterr:标准输入、输出、错误

 import paramiko
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
res,err=stdout.read(),stderr.read()
result = res if res else err
print(result.decode())
# 关闭连接
ssh.close()

ssh

传文件:Linux上:scp(基于ssh的ftp协议) r(目录) p(权限)

python学习笔记_week9

python模拟实现:SFTPClient,用于连接远程服务器并执行上传下载

基于用户名密码上传下载:

 import paramiko
#建立一个链接
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', password='')
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 将remove_path 下载到本地 local_path
sftp.get('remove_path', 'local_path')
transport.close()

ssh_sftp

二、密钥

直接写用户名密码不安全(明文的)--->不用输密码来连接(用密钥)

RSA—非对称密钥验证

公钥(public key)、私钥(private key) ---成对出现。私钥自己拿着,公钥给对方。ssh-keygen:

python学习笔记_week9

python学习笔记_week9将公钥拷贝到对方的.ssh下

python学习笔记_week9python学习笔记_week9 设置读写执行权限

python学习笔记_week9或者用命令拷贝python学习笔记_week9

python上模拟实现:

python学习笔记_week9将私钥copy过来python学习笔记_week9

 import paramiko
private_key = paramiko.RSAKey.from_private_key_file('id_rsa31.txt')#指定你的公钥在哪里
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='10.0.0.41', port=52113, username='gongli', pkey=private_key)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()

ssh_rsa

三、进程与线程

计算机中,cpu负责运算。线程是操作系统能够进行运算调度的最小单位(线程可简单理解为一串指令的集合)。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

进程:qq 要以一个整体的形式暴露给操作系统管理,里面包含对各种资源的调用,内存的管理,网络接口的调用等...

对各种资源管理的集合就可以称为进程。

进程要操作cpu,必须先创建一个线程。(进程相当于屋子这个整体,线程相当于屋子里的人)

线程:A thread is an execution context(上下文), which is all the information a CPU needs to execute a stream of instructions.

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion(幻觉) that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers(寄存器).

Last: threads are different from processes(进程). A thread is a context of execution, while a process is a bunch of(一堆) resources associated(相关的) with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory---所有在同一个进程里的线程共享同一块内存空间), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

进程:An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique(唯一的) process identifier(进程标识符,pid), environment variables, a priority class(优先级类), minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread(主线程---进程中的第一个线程), but can create additional threads from any of its threads.(子线程可以继续创建子线程并且相互独立)

进程与线程的区别?---进程还是线程快(运行没有可比性),但是线程启动快)

  1. Threads share the address space of the process that created it; processes have their own address space.线程共享内存空间,进程的内存是独立的
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess(中间进程) communication to communicate with sibling(兄弟,姐妹) processes.同一个进程的线程之间可以直接交流,两个进程想通信,必须通过一个中间代理来实现
  4. New threads are easily created; new processes require duplication of the parent process.创建新线程很简单,创建新进程需要对其父进程进行一次克隆
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

Python GIL(Global Interpreter Lock)  

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行,擦。。。,那这还叫什么多线程呀?莫如此早的下结结论,听我现场讲。

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf

四、Python threading模块

线程有2种调用方式。1、直接调用。2、继承式调用。

 import threading
import time
def run(n):
print("task",n)
time.sleep(2)
print("task done",n,threading.current_thread())
start_time=time.time()
t_objs=[] #存线程实例
for i in range(50):
t=threading.Thread(target=run,args=("t-%s"%i,)) #启动线程
t.start()
t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
# for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
# t.join()
print("all threads has finished...",threading.current_thread(),threading.active_count())#current是否为主线程,active线程运行的个数
print("cost:",time.time()-start_time)

threadding_ex1

 import threading
import time
class MyThread(threading.Thread):
def __init__(self,n,sleep_time):
super(MyThread,self).__init__()
self.n=n
self.sleep_time=sleep_time
def run(self): #定义每个线程要运行的函数
print("running task",self.n)
time.sleep(self.sleep_time)
print("task done",self.n)
t1=MyThread("t1",2)
t2=MyThread("t2",4)
t1.start()
t2.start()
t1.join() #等待t1执行完
t2.join()
print("main thread...")

threading_ex2

Join&deamon:

 import threading
import time
def run(n):
print("task",n)
time.sleep(2)
print("task done",n,threading.current_thread())
start_time=time.time()
t_objs=[] #存线程实例
for i in range(50):
t=threading.Thread(target=run,args=("t-%s"%i,)) #启动线程
t.setDaemon(True) #把当前线程设置为守护线程,一定要在start之前
t.start()
t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
# for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
# t.join()
print("all threads has finished...",threading.current_thread(),threading.active_count())#current是否为主线程,active线程运行的个数
print("cost:",time.time()-start_time)

threading_ex3_daemon

五、线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

python学习笔记_week9正常来讲,这个num结果应该是1000, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是1000,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行加1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=0这个初始变量交给cpu去运算,当A线程去处完的结果是1,但此时B线程运算完的结果也是1,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是1。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。 *注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁版

 import threading
import time
def run(n):
lock.acquire()#获取一把锁(用户锁)
global num
num+=1
time.sleep(1)
lock.release() #释放用户锁
lock=threading.Lock() #生成全局锁
num=0
t_objs=[] #存线程实例
for i in range(1000):
t=threading.Thread(target=run,args=("t-%s"%i,)) #启动线程
t.start()
t_objs.append(t)
for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
t.join()
print("all threads has finished...",threading.current_thread(),threading.active_count())
print("num:",num)

threading_ex3_lock

GIL VS Lock 

机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 ,具体我们通过下图来看一下+配合我现场讲给大家,就明白了。

python学习笔记_week9

那你又问了, 既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,  这可以说是Python早期版本的遗留问题。

RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁(用的不多)

 import threading,time
def run1():
print("grab the first part data")
lock.acquire()
global num
num +=1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2+=1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res,res2)
num,num2 = 0,0
lock = threading.RLock() #递归锁 用Lock的话死循环了
for i in range(1):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num,num2)

rlock

六、

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,出来几个(不大于3)进去几个

 import threading, time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" % n)
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('----all threads done---')

信号量

Timer  

This class represents an action that should be run only after a certain amount of time has passed

Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling thecancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

 import threading
def hello():
print("hello, world")
t = threading.Timer(30, hello)
t.start() # after 30 seconds, "hello, world" will be printed

Timer

Events(事件)

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.标志位设定了,代表绿灯,直接通行

If the flag is cleared, wait will block until it becomes set again.标志位被清空,代表红灯,wait等待变绿灯
Any number of threads may wait for the same event.很多车等一个红灯

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

 import time
import threading
event = threading.Event()
def lighter():
count = 0
event.set() #先设置绿灯
while True:
if count >5 and count < 10: #改成红灯
event.clear() #把标志位清了
print("\033[41;1mred light is on....\033[0m")
elif count >10:
event.set() #变绿灯
count = 0
else:
print("\033[42;1mgreen light is on....\033[0m")
time.sleep(1)
count +=1
def car(name):
while True:
if event.is_set(): #判断是否设置了标志位,代表绿灯
print("[%s] running..."% name )
time.sleep(1)
else:
print("[%s] sees red light , waiting...." %name)
event.wait()
print("\033[34;1m[%s] green light is on, start going...\033[0m" %name)
light = threading.Thread(target=lighter,)
light.start()
car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()

红绿灯

 import threading,time
import random
def light():
if not event.isSet():
event.set() #wait就不阻塞 #绿灯状态
count = 0
while True:
if count < 10:
print('\033[42;1m--green light on---\033[0m')
elif count <13:
print('\033[43;1m--yellow light on---\033[0m')
elif count <20:
if event.isSet():
event.clear()
print('\033[41;1m--red light on---\033[0m')
else:
count = 0
event.set() #打开绿灯
time.sleep(1)
count +=1
def car(n):
while 1:
time.sleep(random.randrange(10))
if event.isSet(): #绿灯
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()

红绿灯改进

这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。

 import threading
import time
import random
def door():
door_open_time_counter = 0
while True:
if door_swiping_event.is_set():
print("\033[32;1mdoor opening....\033[0m")
door_open_time_counter +=1
else:
print("\033[31;1mdoor closed...., swipe to open.\033[0m")
door_open_time_counter = 0 #清空计时器
door_swiping_event.wait()
if door_open_time_counter > 3:#门开了已经3s了,该关了
door_swiping_event.clear()
time.sleep(0.5)
def staff(n):
print("staff [%s] is comming..." % n )
while True:
if door_swiping_event.is_set():
print("\033[34;1mdoor is opened, passing.....\033[0m")
break
else:
print("staff [%s] sees door got closed, swipping the card....." % n)
print(door_swiping_event.set())
door_swiping_event.set()
print("after set ",door_swiping_event.set())
time.sleep(0.5)
door_swiping_event = threading.Event() #设置事件
door_thread = threading.Thread(target=door)
door_thread.start()
for i in range(5):
p = threading.Thread(target=staff,args=(i,))
time.sleep(random.randrange(3))
p.start()

员工刷卡

七、queue队列 (作用 1.解耦.2.提高效率)

与列表区别:列表元素取走之后数据还有(相当于复制了一份),但队列取走之后数据就没有了

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in first out 后进先出
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)

Equivalent to put(item, False).

Queue.get(block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完毕
 import queue
q = queue.PriorityQueue()
q.put((-1,"chenronghua"))
q.put((3,"hanyang"))
q.put((10,"alex"))
q.put((6,"wangsen"))
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# q = queue.LifoQueue()
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())
# print(q.get())
# print(q.get())

queue

八、生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 import threading,time
import queue
q = queue.Queue(maxsize=10)
def Producer(name):
count = 1
while True:
q.put("骨头%s" % count)
print("生产了骨头",count)
count +=1
time.sleep(0.1)
def Consumer(name):
#while q.qsize()>0:
while True:
print("[%s] 取到[%s] 并且吃了它..." %(name, q.get()))
time.sleep(1)
p = threading.Thread(target=Producer,args=("Alex",))
c = threading.Thread(target=Consumer,args=("ChengRonghua",))
c1 = threading.Thread(target=Consumer,args=("王森",))
p.start()
c.start()
c1.start()

producer_consumer

 import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <20:
time.sleep(random.randrange(3))
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
def Consumer(name):
count = 0
while count <20:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()

包子

作业

题目:简单主机批量管理工具

需求:

  1. 主机分组
  2. 主机信息配置文件用configparser解析
  3. 可批量执行命令、发送文件,结果实时返回,执行格式如下 
    1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 
    2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 
  4. 主机用户名密码、端口可以不同
  5. 执行远程命令使用paramiko模块
  6. 批量命令需使用multiprocessing并发
上一篇:Android利用调试器调试程序


下一篇:memsql filesystem pipeline 试用