简单说一下Queue和多进程
更新时间:2016年8月25日08:46:12 作者:
本文给大家简单总结了队列对象(queue)和多进程(),非常简单实用。 有需要的朋友可以参考以下
最近接触到一个项目,需要在多个虚拟机中运行任务。 参考了别人以前项目的代码,它使用了多进程处理,所以就在网上查了多进程。
首先我们来说说Queue(队列对象)
Queue 是 中的标准库,可以直接引用。 以前学习的时候,就听过著名的“先吃,先拉”,“先吃,先吐”。 其实就是这里说的队列。 您可以在构建队列时定义它。 容量,别吃太多。 吃多了就会报错。 如果构造时不写或者写小于1的数,则表示无穷大
队列
q = 队列。 队列(10)
将一个值放入队列(put)
q. 把('阳')
q. 放(4)
q. put(['yan','兴'])
获取队列中的值get()
默认队列是先进先出
>>> 问。 得到()
‘阳’
>>> 问。 得到()
>>> 问。 得到()
[‘颜’、‘兴’]
当队列为空时,如果使用get去取,会被阻塞,所以一般在取队列时使用
() 方法,该方法在从空队列中获取值时会抛出 Empty 异常
所以比较常见的方法是先判断一个队列是否为空,不为空则取值
队列中常用的方法
Queue.qsize() 返回队列的大小
如果队列为空,Queue.empty() 返回 True,否则返回 False
如果队列已满,Queue.full() 返回 True,否则返回 False
Queue.get([block[, ]]) 获取队列、等待时间
Queue.() 相当于 Queue.get(False)
非阻塞Queue.put(item)写入队列,等待时间
Queue.(item) 相当于 Queue.put(item, False)
其次,使用子流程的概念
从
子进程可以通过以下方式构建
p = (=有趣,args=(args))
然后通过p.start()启动子进程
然后使用p.join()方法让子进程运行然后执行父进程
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'
3. 使用池
如果需要多个子进程,可以考虑使用进程池(pool)来管理
从泳池
from multiprocessing import Pool import os, time def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
通过pool创建子进程的方法与通过pool创建子进程的方法不同
p.(func, args=(args)) 意识到可以在池中同时运行的任务取决于计算机中的 cpu 数量。 比如我的电脑现在有4个cpu,那么子进程task0、task1、task2、task3可以同时启动,task4在前一个进程结束后启动
上述程序运行后的结果其实是按照上图中的1、2、3分别进行的,先打印1,3秒后打印2,3秒后打印3
代码中的p.close()是关闭进程池,不再向其中添加进程。 在 Pool 对象上调用 join() 方法将等待所有子进程完成执行。 调用 join() 之前必须先调用 close() ,调用 close() 后不能继续添加新的。
这个时候,在给它定义进程的时候,它也可以是一个实例池。
如果上面代码中p=Pool(5),则所有子进程可以同时执行
3. 多个子流程之间的通信
多个子进程之间的通信必须使用第一步提到的Queue。 例如,如果有以下需求,一个子进程向队列写入数据,另一个进程从队列中取出数据。
#coding:gbk from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): while True: if not q.empty(): value = q.get(True) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 等待pw结束: pw.join() # 启动子进程pr,读取: pr.start() pr.join() # pr进程里是死循环,无法等待其结束,只能强行终止: print print '所有数据都写入并且读完'
4.上面代码的几个有趣的问题
if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() p = Pool() pw = p.apply_async(write,args=(q,)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
如果 main 函数写成上面的示例,我想要的是获取一个队列并将其作为参数传递给进程池中的每个子进程,但是我得到
: 队列只能
错误,查了一下,大体思路是队列对象无法在父进程和子进程之间通信,如果要使用进程池中的队列就要使用这个类
if __name__=='__main__': manager = multiprocessing.Manager() # 父进程创建Queue,并传给各个子进程: q = manager.Queue() p = Pool() pw = p.apply_async(write,args=(q,)) time.sleep(0.5) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'
这样父进程和子进程之间就可以通信这个队列对象了,没有池就不需要它了。 以后我们要扩大班级规模
关于锁的应用,如果不同的程序同时操作同一个队列,为了避免错误,可以在操作队列时对某个函数加锁,这样同一时间只能有一个队列时间。 子进程对队列进行操作,锁也必须锁定在对象上
#coding:gbk from multiprocessing import Process,Queue,Pool import multiprocessing import os, time, random # 写数据进程执行的代码: def write(q,lock): lock.acquire() #加上锁 for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) lock.release() #释放锁 # 读数据进程执行的代码: def read(q): while True: if not q.empty(): value = q.get(False) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': manager = multiprocessing.Manager() # 父进程创建Queue,并传给各个子进程: q = manager.Queue() lock = manager.Lock() #初始化一把锁 p = Pool() pw = p.apply_async(write,args=(q,lock)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有数据都写入并且读完'