Python第十五天 datetime模块 time模块 thread模块 threading模块 Queue队列模块 multiprocessing模块 paramiko模块 fabric模块
目录
datetime模块 time模块
子类之间的对应关系
object timedeltatzinfotimedatedatetimenow = datetime.datetime.now() # 获取当前时间
delta = datetime.timedelta(minutes=5) # 获取前5分钟时间fiveminago = now – delta # 时间差now > fiveminago # 时间比较GMT_FORMAT = '%b %d %H:%M:%S GMT'now.strftime(GMT_FORMAT) # 格式化时间now.strftime('%Y-%m-%d.%H:%M:%S') # 格式化时间int(time.time()) #返回时间戳time.ctime() #相当于datetime.datetime.now()
# 解释日志时间import datetimeimport timedef parseLogTime(line):now = datetime.datetime.now()month,day,time = line.split()[:3]hour,minute,second = [int(i) for i in time.split(':')]logtime = datetime.datetime(now.year,MONTH[month],int(day),hour,minute,second)return logtime
获取今天零点和昨天零点时间
# 获取今天零点和昨天零点时间import datetimeimport timenow_time = int(time.time()) # 获取当前时间的时间戳day_time = now_time - now_time % 86400 + time.timezone # 今天零点时间戳today_datetimeformat=datetime.datetime.fromtimestamp(day_time) # 转换为datetime格式delta = datetime.timedelta(days=1) # 一天的时间差yesterday_datetimeformat = today_datetimeformat - delta # 昨天零点时间datetime格式yesterday_uxtimeformat = int(time.mktime(yesterday_datetimeformat.timetuple()))torday_uxtimeformat = int(time.mktime(today_datetimeformat.timetuple()))print yesterday_uxtimeformatprint torday_uxtimeformat
定义class datetime(date)| datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])| | The year, month and day arguments are required. tzinfo may be None, or an| instance of a tzinfo subclass. The remaining arguments may be ints or longs.
datetime模块定义了下面这几个类:
datetime.date:表示日期的类。常用的属性有year, month, day;datetime.time:表示时间的类。常用的属性有hour, minute, second, microsecond;datetime.datetime:表示日期时间。datetime.timedelta:表示时间间隔,即两个时间点之间的长度。datetime.tzinfo:与时区有关的相关信息。
时间格式相互转换
时间对象,时间字符串,时间戳 获取当前时间import datetimenow_time=datetime.datetime.now()print(now_time)print(type(now_time))
字符串、datetime互转
import datetimestring = '2017-01-02 11:12:12'_d_time = datetime.datetime.strptime(string, '%Y-%m-%d %H:%M:%S')print (_d_time)print(type(_d_time))dateime转字符串
import datetime_time=datetime.datetime.strftime(now_time,'%Y-%m-%d %H:%M:%S')print(_time)print(type(_time)) 时间戳、datetime互转import time_a=int(time.time())print (_a)_n_time=datetime.datetime.fromtimestamp(_a)print (_n_time)print(type(_n_time))datetime转换为unix时间戳
dtime = datetime.datetime.now()un_time = int(time.mktime(dtime.timetuple()))print un_time
多线程
thread模块threading模块thread模块是低级模块threading模块是高级模块,对thread模块的封装生产中要用threading模块而不用thread模块
GIL:Python无法高效的实现多线程的原因 《Python Linux系统管理与自动化运维》
Python 默认的解释器,由于全局解释器锁GIL的存在,确实在任意时刻都只有一个线程在执行Python 代码,致使多线程不能充分利用机器多核的特性,GIL的好处是防止死锁、争用条件、高复杂性问题
多进程代替多线程有效避开GIL,每个进程都有自己的python解释器实例,不受GIL限制但是,我们的程序也不是无时无刻不在计算的,我们的程序需要等待用户输入、等待文件读写以及网络收发数据,这些都是比较费时的操作。使用Python 多线程,计算机会将这些等待操作放到后台去处理,从而释放出宝贵的计算资源,继续进行计算。也就是说,如果读者的程序是CPU 密集型的,使用Python 的多线程确实无法提升程序的效率,如果读者的程序是IO 密集型的,则可以使用Python 的多线程提高程序的整体效率。 线程的特点:- 线程的生命周期- 开始- 运行- 结束线程的退出:- 进程执行完成- 线程的退出方法- python系统退出我们通常把当前进程叫做主线程或者主进程。函数是通过thread.start_new_thread来调用的,说明它是在线程中运行的。import thread,threadingstart_new_thread(func, args)func:函数名args:元组allocate_lock()exit()#!/usr/bin/env python#encoding:utf8import threadimport timedef func(name,i): for n in xrange(i): print name,n #time.sleep(1)thread.start_new_thread(func, ('声音',3))thread.start_new_thread(func, ('画面',3))time.sleep(1)
LockType对象方法acquire()locked()release() //释放锁,使用前线程必须已获得锁定,否则将抛出异常thread锁使用lock = thread.allocate_lock():生成全局锁对象lock.acquire():加锁线程使用锁对象lock.release():线程调度释放锁查看locked()状态示例1
#!/usr/bin/env python#encoding:utf8import threadimport timedef func(name, i, l): for n in xrange(i): print name, n time.sleep(1) l.release() lock = thread.allocate_lock()lock.acquire()thread.start_new_thread(func, ('声音', 3, lock))thread.start_new_thread(func, ('画面', 3, lock))while lock.locked(): passprint lock.locked()print "Exit main process"
示例2
#!/usr/bin/env pythonimport threadimport timedef world(): for i in range(5): if w_lock.acquire(): print 'world', time.ctime() h_lock.release()h_lock = thread.allocate_lock()w_lock = thread.allocate_lock()thread.start_new_thread(world, ())w_lock.acquire()for i in range(5): if h_lock.acquire(): print 'hello', w_lock.release()while h_lock.locked(): pass
示例3
#!/usr/bin/env pythonimport threadimport timedef hello(): for i in xrange(5): h_lock.acquire() print 'hello', w_lock.release()def world(): for i in xrange(5): w_lock.acquire() print 'world', time.ctime() h_lock.release() lock.release()lock = thread.allocate_lock() 分配锁lock.acquire() 获得锁h_lock = thread.allocate_lock()w_lock = thread.allocate_lock()w_lock.acquire()thread.start_new_thread(hello, ())thread.start_new_thread(world, ())while lock.locked(): 是否获得锁 pass
threading模块threading不需要进程来控制线程,主进程会等待线程执行完毕才退出threading.Thread:类成员方法:
- isAlive() //查线程是否在运行中
- start() //启动线程 非阻塞- run() //可以重写- join() //该方法会阻塞调用,直到线程中止- getName() //获取线程的名称- setName() //设置线程的名称- isDaemon() //判断线程是否守护线程- setDaemon() //设置线程为守护线程1、使用threading.Thread生成线程对象来开启多线程#给线程/函数传递参数
import timeimport threadingdef t1(name, x): for i in xrange(x): print i, name time.sleep(1)th1 = threading.Thread(target=t1, args=('声音', 3))th2 = threading.Thread(target=t1, args=('画面', 3))target:函数名args:函数的参数th1.setDaemon(True) //随着主线程的退出而退出,一定要写在th1.start()前面th1.start() //运行一个线程#th1.run() //一直运行这个线程,直到运行完才会运行下一个线程#th1.join() //等待线程终止th2.start()
2、继承threading.Thread 的方式编写多线程程序
通过继承threading.Thread 类进行多线程编程,只需要在子类中实现run 方法,并在run 方法中实现该线程的业务逻辑即可
from __future__ import print_functionimport threadingclass MyThread(threading.Thread ): def __init__(self, count, name): super(MyThread, self).__init__() self.count = count self.name = name def run(self): while self.count>0: print ("hello", self.name) self.count-= 1def main(): usernames = [ 'Bob','Jack','Pony','Jone','Mike'] for i in range(5): thread = MyThread(50,usernames[i]) thread.start()if __name__ =='__main__': main()
线程安全:用锁来控制并发访问
Lock工厂函数lock = threading.Lock()lock. acquire() //获取锁lock.release() //释放锁lock.locked() //查看锁状态
#加锁和释放锁 import threading lock = threading.Lock() try: lock.acquire() # do something finally: lock.release()#改为用上下文管理器 import threading lock = threading.Lock() with lock: # do something pass
启动线程的两个方法第一种方法:定义一个函数,threading.Thread实例化对象来启动第二种方法:编写一个类并继承threading.Thread并重写run方法
线程安全队列
线程安全队列是线程间最常用的交换数据的形式,线程间通信模块。Queue是提供队列操作的模块生产者消费者模型
生产者 和消费者之间加一个缓冲区生产者消费者之间存在并发访问问题,即多个消费者可能同时从缓冲区中获取商品,为了解决并发问题,使用python标准库的queue,queue是线程安全的队列实现FIFO提供多线程编程的先进先出数据结构,非常适合生产者消费者之间数据传递
三种队列:
FIFO:Queue.Queue(maxsize=0):一个先进先出( FIFO )的队列,最先加入队列的元素最先取出LIFO:Queue.LifoQueue(maxsize=0):一个后进先出( LIFO )的队列,最后加入队列的元素最先取出Priority:Queue.PriorityQueue(maxsize=0):优先级队列,队列中的元素根据优先级排序Queue一些方法Queue.empty() //如果队列为空,返回True,反之FalseQueue.full() //如果队列满了,返回True,反之FalseQueue.join() //阻塞等待,直到所有消费者对每一个元素都调用了task_doneQueue.task_done() //和join一起工作,提示先前取出的元素已经完成处理Queue.get([block[, timeout]]) //读队列,timeout等待时间 ,block和timeout意思跟put方法一样Queue.get_nowait() //读队列,当队列中没有数据的时候会直接抛出Empty异常,而不会等待Queue.put_nowait() //写队列,非阻塞地向队列添加元素Queue.put(item, [block[, timeout]]) //写队列,timeout等待时间 ,任何数据结构数据都可以加到队列,block=true,timeout=none默认,当队列满了会一直阻塞并等待Queue.queue.clear() 清空队列
q = Queue.Queue()class Queue | Create a queue object with a given maximum size. | | If maxsize is <= 0, the queue size is infinite. | | Methods defined here: | | __init__(self, maxsize=0) |
#!/usr/bin/env pythonimport threadingimport randomimport Queueimport timedef producer(name, queue): for i in xrange(10): num = random.randint(1,10) th_name = name + '-' + threading.currentThread().getName() print "%s: %s ---> %s" % (time.ctime(), th_name, num) queue.put(num) time.sleep(1)def odd(name, queue): th_name = name + '-' + threading.currentThread().getName() while True: try: val_odd = queue.get(1,5) #阻塞,阻塞5秒超时 if val_odd % 2 != 0: print "%s: %s ---> %s" % (time.ctime(), th_name, val_odd) time.sleep(1) else: queue.put(val_odd) time.sleep(1) except: print "%s: %s finished" % (time.ctime(), th_name) breakdef even(name, queue): th_name = name + '-' + threading.currentThread().getName() while True: try: val_even = queue.get(1,5) if val_even % 2 == 0: print "%s: %s ---> %s" % (time.ctime(), th_name, val_even) time.sleep(1) else: queue.put(val_even) time.sleep(1) except: print "%s: %s finished" % (time.ctime(), th_name) breakdef main(): q = Queue.Queue(10) t_pro = threading.Thread(target=producer, args=('pro',q)) t_odd = threading.Thread(target=odd, args=('odd',q)) t_even = threading.Thread(target=even, args=('even',q)) t_pro.start() t_odd.start() t_even.start()if __name__ == '__main__': main()
multiprocessing模块
thread模块和threading模块都不是真正意义的多线程,受限于GIL只能用到一个核心
multiprocessing使用多进程,可以利用多个核心,但是因为是多进程开销也比多线程大multiprocessing.Process调用方法跟threading.Thread调用方法和参数基本一样
from multiprocessing import Process
import timeimport osdef f(name):
print 'hello', name, os.getpid(), os.getppid() time.sleep(1)if __name__ == '__main__':
for i in range(10): p = Process(target=f, args=(i,)) p.start() #p.join() #wait
进程池Pool
multiprocessing.Pool很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低。pool = multiprocessing.Pool(processes=3) // 设置最大进程数为3,processes的默认值等于系统当前的最大核心数,假设当前是4核机器,那么进程池默认会初始化四个进程result = pool.apply_async(func=f, args=(i,)) //向进程池提交目标请求,非阻塞的,但result.get()方法是阻塞的。pool.close() //会等待池中的所有worker进程执行结束再关闭poolpool. terminate() //直接关闭poolpool.join() //等待进程池中的所有worker进程执行完毕,阻塞主进程。但必须使用在pool.close()或者pool.terminate()之后pool.close() pool.join() #阻塞主进程,不然主进程退出了子进程还在运行变成孤儿进程
#!/usr/bin/env pythonimport multiprocessingfrom subprocess import Popen, PIPEdef run(id): p = Popen('vim', stdout=PIPE, stderr=PIPE) p.communicate() #不用 stdout和 stderr 接收 表示等待命令执行完 print "Task: %s" %idpool = multiprocessing.Pool()for i in xrange(5): pool.apply_async(func=run, args=(i,))print "Wait..."pool.close()pool.join()print 'Done'
#!/usr/bin/env pythonimport multiprocessingimport osimport timedef run_task(id): print "Run task id: %s pid(%s) ppid(%s)" % (id, os.getpid(), os.getppid()) start = int(time.time()) time.sleep(3) end = int(time.time()) print "Task %s run %0.2f" % (id, (end - start))if __name__ == '__main__': print "Parent process %s" %os.getpid() pool = multiprocessing.Pool(processes=2) for i in xrange(5): pool.apply_async(run_task, args=(i,)) #print "Wait..." pool.close() pool.join() print "Done"
paramiko模块
paramiko 是一个Python 的库,该库支持Python 2.6 +和Python 3.3 +版本,实现了SSHv2 协议(底层使用cryptography )
cryptography为python开发组提供cryptographic recipes加密套件和 primitives 原语的一个python包
cryptographic standard libraryIt supports Python 2.7, Python 3.4+, and PyPy 5.3+pip install cryptography
只能上传文件,不能上传文件夹,需要结合os.walk,上传目录里所有文件
SSHClient
SSHClient 类是对SSH 会话的封装,该类封装了传输(transport) 、通道(channel)及SFTPClient 建立的方法(open_sftp), 通常用于执行远程命令SSHClient 类常用的几个方法:
1 ) connect : connect 方法实现远程服务器连接与认证,对于该方法, 只有hostname 是必传参数。connect(self , hostname , port=22, username=None , password=None ,pkey=None , key_filename=None , timeout=None ,allow_agent=True , look_for_keys=True , compress=False)2 ) set_missing_host_key_policy : 设置远程服务器没有在know_hosts 文件中记录时的应对策略。目前支持三种策略,分别是AutoAddPolicy 、RejectPolicy ( 默认策略)与WarningPolicy 分别表示自动添加服务器到know_hosts 文件、拒绝本次连接、警告并将服务器添加到know hosts 文件中。3 ) exec_command :在远程服务器执行Linux 命令的方法。4 ) open_sftp:在当前ssh 会话的基础上创建一个sftp 会话。该方法会返回一个SFTPClient 对象。SFTPClient 类常用的几个方法:
put :上传本地文件到远程服务器;get :从远程服务器下载文件到本地;mkdir :在远程服务器上创建目录;remove :删除远程服务器中的文件;rmdir :删除远程服务器中的目录;rename :重命名远程服务器中的文件或目录;stat : 获取远程服务器中文件的详细信息;listdir :列出远程服务器中指定目录下的内容。这里仅仅介绍了paramiko 中SSHClient 与SFTPClient 类的常用方法,完整的API 可以参考官方文档
需要安装libffi-devel开发库
yum install -y libffi* error: invalid command ‘bdist_wheel’http://www.cnblogs.com/BugQiang/archive/2015/08/22/4732991.htmlparamiko模块遵循SSH2协议,支持加密和认证的方式,进行远程服务器的连接。由于使用的是python这样的能够跨平台运行的语言,所以所有python支持的平台,如Linux, Solaris, BSD, MacOS X, Windows等,paramiko都可以支持
批量执行命令
安装paramilo模块yum install -y python-paramiko1.10.noarch或者:pip install paramiko==2.2.1 例子import paramikoclient = paramiko.SSHClient() client.load_system_host_keys() //~/.ssh/know_hosts #加载本机known_hosts文件或:client.set_missing_host_key_policy(paramiko.AutoAddPolicy())client.connect(hostname='127.0.0.1',username='root',password=‘XXX')stdin, stdout, stderr = client.exec_command('date')stdout.read()client.close()
基于用户名和密码的 sshclient 方式登录# 建立一个sshclient对象ssh = paramiko.SSHClient()# 允许将信任的主机自动加入到known_hosts列表,此方法必须放在connect方法的前面ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())# 调用connect方法连接服务器ssh.connect(hostname='192.168.2.129', port=22, username='super', password='super',timeout=5)# 执行命令stdin, stdout, stderr = ssh.exec_command('df -hl')# 结果放到stdout中,读出stdout中的结果print(stdout.read().decode())# 关闭连接ssh.close()
基于公钥密钥的 SSHClient 方式登录
# 指定本地的RSA私钥文件,如果建立密钥对时设置的有密码,password为设定的密码,如无不用指定password参数pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa', password='12345')# 建立连接ssh = paramiko.SSHClient()ssh.connect(hostname='192.168.2.129',port=22,username='super',pkey=pkey,timeout=5)# 执行命令stdin, stdout, stderr = ssh.exec_command('df -hl')# 结果放到stdout中,读出stdout中的结果print(stdout.read().decode())# 关闭连接ssh.close()
基于用户名和密码的 transport 方式传文件
# 实例化一个transport对象trans = paramiko.Transport(('192.168.2.129', 22))# 建立连接trans.connect(username='super', password='super')# 实例化一个 sftp对象,指定连接的通道sftp = paramiko.SFTPClient.from_transport(trans)# 发送文件sftp.put(localpath='/tmp/11.txt', remotepath='/tmp/22.txt')# 下载文件sftp.get(remotepath='/tmp/22.txt', localpath='/tmp/11.txt')trans.close()
基于密钥的 Transport 方式传文件
# 指定本地的RSA私钥文件,如果建立密钥对时设置的有密码,password为设定的密码,如无不用指定password参数pkey = paramiko.RSAKey.from_private_key_file('/home/super/.ssh/id_rsa', password='12345')# 建立连接trans = paramiko.Transport(('192.168.2.129', 22))trans.connect(username='super', pkey=pkey)# 实例化一个 sftp对象,指定连接的通道sftp = paramiko.SFTPClient.from_transport(trans)# 发送文件sftp.put(localpath='/tmp/11.txt', remotepath='/tmp/22.txt')# 下载文件sftp.get(remotepath='/tmp/22.txt', localpath='/tmp/11.txt')trans.close()
问题汇总
https://cloud.tencent.com/community/article/945339
Error reading SSH protocol banner连接错误 解决办法:重新下载paramiko插件源码,解压后,编辑安装目录下的transport.py文件:vim build/lib/paramiko/transport.py搜索 self.banner_timeout 关键词,并将其参数改大即可,比如改为300s:self.banner_timeout = 300最后,重装paramiko即可。
paramiko远程执行后台脚本“阻塞”问题
解决办法将远程脚本的标准输出stdout重定向到错误输出stderr即可现在执行,就能立即得到结果了。其实原因很简单,因为bash /tmp/test.sh & 虽然是后台执行,但是依然会产生标准输出,一旦产生标准输出,paramiko就会认为命令还未执行完成,且stdout的buffer大于stderr,因此产生等待问题。这里只要将脚本执行的标准输出重定向到错误输出(1>&2),然后paramiko就可以使用stderr快速读取远程打屏信息了。
#!/usr/bin/env pythonimport paramikoimport threadingimport sysdef ssh_conn(ip, cmd): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) pkey_file = '/root/.ssh/id_rsa' key = paramiko.RSAKey.from_private_key_file(pkey_file) try: ssh.connect(hostname=ip, username='root', pkey=key, timeout=5) except: print "%s: Timeout or not permission" % ip return 1 stdin, stdout, stderr = ssh.exec_command(cmd) stdout = stdout.read()[:-1] #第0个字符到最后一个字符但是不包含最后一个字符 stderr = stderr.read()[:-1] if stdout: print "%s:\t %s" % (ip, stdout) ssh.close() else: print "%s:\t %s" % (ip, stderr) ssh.close()if __name__ == '__main__': paramiko.util.log_to_file('/tmp/paramiko.log') ips = ['192.168.20.'+str(i) for i in xrange(1, 100)] try: cmd = sys.argv[1] except IndexError: print "%s follow a command" % __file__ sys.exit(1) for ip in ips: t = threading.Thread(target=ssh_conn, args=(ip,cmd)) t.start()
#!/usr/bin/env python#进程池版本import paramikoimport multiprocessingimport sysdef ssh_conn(ip, cmd): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) pkey_file = '/root/.ssh/id_rsa' key = paramiko.RSAKey.from_private_key_file(pkey_file) try: ssh.connect(hostname=ip, username='root', pkey=key, timeout=5) except: print "%s: Timeout or not permission" % ip return 1 stdin, stdout, stderr = ssh.exec_command(cmd) stdout = stdout.read()[:-1] stderr = stderr.read()[:-1] if stdout: print "%s:\t %s" % (ip, stdout) ssh.close() else: print "%s:\t %s" % (ip, stderr) ssh.close()if __name__ == '__main__': paramiko.util.log_to_file('/tmp/paramiko.log') ips = ['192.168.20.'+str(i) for i in xrange(1, 100)] try: cmd = sys.argv[1] except IndexError: print "%s follow a command" % __file__ sys.exit(1) pool = multiprocessing.Pool(processes=10) for ip in ips: pool.apply_async(func=ssh_conn, args=(ip, cmd)) pool.close() pool.join()
#!/usr/bin/env python#可传入参数改进版import paramikoimport multiprocessingimport sysfrom optparse import OptionParserimport urllib, urllib2import jsonDATA_BACK = '/var/tmp/data.json'def opt(): parser = OptionParser("Usage: %prog -a|-g command") parser.add_option('-a', dest='addr', action='store', default='True', help='ip or iprange EX: 192.168.1,192.168.1.3 or 192.168.1.1-192.168.1.100') parser.add_option('-g', dest='group', action='store', help='groupname') options, args = parser.parse_args() return options, argsdef ssh_conn(ip, cmd): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(hostname=ip, port=9022, username='steven', password='xx',timeout=5) except: print "%s: Timeout or not permission" % ip return 1 stdin, stdout, stderr = ssh.exec_command(cmd) stdout = stdout.read()[:-1] stderr = stderr.read()[:-1] if stdout: print "%s:\t %s" % (ip, stdout) ssh.close() else: print "%s:\t %s" % (ip, stderr) ssh.close()def parseOpt(option): if ',' in option: ips = option.split(',') return ips elif '-' in option: ip_start, ip_end = option.split('-') ip_net = '.'.join(ip_start.split('.')[:-1]) start = int(ip_start.split('.')[-1]) end = int(ip_end.split('.')[-1]) + 1 ips = [ip_net+'.'+str(i) for i in range(start, end)] return ips elif ',' not in option or '-' not in option: ips = [option] return ips else: print "%s -h" % __file__def getData(): url = 'http://192.168.1.5:8000/hostinfo/getjson/' try: req = urllib2.urlopen(url) data = json.loads(req.read()) with open(DATA_BACK, 'wb') as fd: json.dump(data, fd) except: with open(DATA_BACK) as fd: data = json.load(fd) return data def parseData(data): dic_host = {} for hg in data: groupname = hg['groupname'] dic_host[groupname] = [] for h in hg['members']: dic_host[groupname] += [h['ip']] #列表相加 得到一个新列表 return dic_hostif __name__ == '__main__': paramiko.util.log_to_file('/tmp/paramiko.log') options, args = opt() try: cmd = args[0] except IndexError: print "%s follow a command" % __file__ sys.exit(1) if options.addr: ips = parseOpt(options.addr) elif options.group: groupname = options.group data = getData() dic = parseData(data) if groupname in dic: ips = dic[groupname] else: print "%s is not exists in SimpleCMDB" % groupname sys.exit() else: print "%s -h" % __file__ sys.exit(1) pool = multiprocessing.Pool(processes=10) for ip in ips: pool.apply_async(func=ssh_conn, args=(ip, cmd)) pool.close() pool.join() #阻塞一下主进程,不然主进程退出了子进程还在运行变成孤儿进程
fabric模块
Fabric 的作者也是paramiko 的作者
Fabric 依赖paramiko 、pycrypto
fabric是基于paramiko模块封装开发的。paramiko更底层安装:依赖pycrypto,paramiko要先卸载系统自带的python-pycrypto与python-paramikopycrypto最好安装2.3版本,如果安装2.6.1会有gmp版本低的警告
Fabric 比较特殊,它既是一个Python 库,也是一个命令行工具
它的命令行工具不是Fabric , 而是fab 。我们可以通过下面的语句获得命令行工具的简单说明以及验证Fabric 是否安装正确:$ fab --help 安装:yum install -y python-develpip install pycrypto=="2.3"pip install paramiko==1.12.4pip install fabric==1.8.3 pip listargparse (1.2.1)asn1crypto (0.22.0)bcrypt (3.1.3)cffi (1.10.0)cryptography (2.0.1)distribute (0.6.10)Django (1.6.5)ecdsa (0.13)enum34 (1.1.6)Fabric (1.8.3)idna (2.5)iniparse (0.3.1)ipaddress (1.0.18)ipython (1.2.1)mysql-connector-python (2.1.6)MySQL-python (1.2.3rc1)mysql-replication (0.13)mysql-utilities (1.6.5)ordereddict (1.2)paramiko (1.12.4)pip (7.1.0)pyasn1 (0.3.1)pycparser (2.18)pycrypto (2.3)pycurl (7.19.0)pygpgme (0.1)pymssql (2.1.0)PyMySQL (0.7.11)PyNaCl (1.1.2)setuptools (0.6rc11)six (1.10.0)urlgrabber (3.9.1)virtinst (0.600.0)wheel (0.29.0)yum-metadata-parser (1.1.2)http://www.fabfile.org/en/latest/index.html编写fabfile.py文件:from fabric.api import rundef host_type(): #函数名随便定义 run('uname -s')执行:fab -H localhost host_typehttp://docs.fabfile.org/en/1.10/usage/execution.html#connections
#不能同时出现多个hosts
from fabric.api import envenv.user='test' #机器的用户名env.password='123' #机器的密码env.hosts = ['node1','node2']
#分组
不能同时出现多个roledefsfrom fabric.api import envenv.roledefs['webservers'] = ['www1', 'www2', 'www3']env.roledefs = {
'web': ['www1', 'www2', 'www3'], 'dns': ['ns1', 'ns2']Fab -R web …
cryptography 、 PyCrypto 、 PyCryptodome(pycrypto的分支pycryptodome)
PyCrypto:Python Cryptography Toolkit (pycrypto)
PyCryptodome: is a self-contained Python package of low-level cryptographic primitives.
f