推广 热搜: csgo  vue  angelababy  2023  gps  新车  htc  落地  app  p2p 

简单谈谈python中的Queue与多进程

   2023-08-16 网络整理佚名1740
核心提示:如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在对象中的锁

简单说一下Queue和多进程

更新时间:2016年8月25日08:46:12 作者:

本文给大家简单总结了队列对象(queue)和多进程(),非常简单实用。 有需要的朋友可以参考以下

最近接触到一个项目,需要在多个虚拟机中运行任务。 参考了别人以前项目的代码,它使用了多进程处理,所以就在网上查了多进程。

首先我们来说说Queue(队列对象)

Queue 是 中的标准库,可以直接引用。 以前学习的时候,就听过著名的“先吃,先拉”,“先吃,先吐”。 其实就是这里说的队列。 您可以在构建队列时定义它。 容量,别吃太多。 吃多了就会报错。 如果构造时不写或者写小于1的数,则表示无穷大

队列

q = 队列。 队列(10)

将一个值放入队列(put)

q. 把('阳')

q. 放(4)

q. put(['yan','兴'])

获取队列中的值get()

默认队列是先进先出

>>> 问。 得到()

‘阳’

>>> 问。 得到()

>>> 问。 得到()

[‘颜’、‘兴’]

当队列为空时,如果使用get去取,会被阻塞,所以一般在取队列时使用

() 方法,该方法在从空队列中获取值时会抛出 Empty 异常

所以比较常见的方法是先判断一个队列是否为空,不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小

如果队列为空,Queue.empty() 返回 True,否则返回 False

如果队列已满,Queue.full() 返回 True,否则返回 False

Queue.get([block[, ]]) 获取队列、等待时间

Queue.() 相当于 Queue.get(False)

非阻塞Queue.put(item)写入队列,等待时间

Queue.(item) 相当于 Queue.put(item, False)

其次,使用子流程的概念

子进程可以通过以下方式构建

p = (=有趣,args=(args))

然后通过p.start()启动子进程

然后使用p.join()方法让子进程运行然后执行父进程

from multiprocessing import Process
import os
 
# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'

3. 使用池

如果需要多个子进程,可以考虑使用进程池(pool)来管理

从泳池

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'

通过pool创建子进程的方法与通过pool创建子进程的方法不同

p.(func, args=(args)) 意识到可以在池中同时运行的任务取决于计算机中的 cpu 数量。 比如我的电脑现在有4个cpu,那么子进程task0、task1、task2、task3可以同时启动,task4在前一个进程结束后启动

上述程序运行后的结果其实是按照上图中的1、2、3分别进行的,先打印1,3秒后打印2,3秒后打印3

代码中的p.close()是关闭进程池,不再向其中添加进程。 在 Pool 对象上调用 join() 方法将等待所有子进程完成执行。 调用 join() 之前必须先调用 close() ,调用 close() 后不能继续添加新的。

这个时候,在给它定义进程的时候,它也可以是一个实例池。

如果上面代码中p=Pool(5),则所有子进程可以同时执行

3. 多个子流程之间的通信

多个子进程之间的通信必须使用第一步提到的Queue。 例如,如果有以下需求,一个子进程向队列写入数据,另一个进程从队列中取出数据。

#coding:gbk
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start() 
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'

4.上面代码的几个有趣的问题

if __name__=='__main__': 
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

如果 main 函数写成上面的示例,我想要的是获取一个队列并将其作为参数传递给进程池中的每个子进程,但是我得到

: 队列只能

错误,查了一下,大体思路是队列对象无法在父进程和子进程之间通信,如果要使用进程池中的队列就要使用这个类

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

这样父进程和子进程之间就可以通信这个队列对象了,没有池就不需要它了。 以后我们要扩大班级规模

关于锁的应用,如果不同的程序同时操作同一个队列,为了避免错误,可以在操作队列时对某个函数加锁,这样同一时间只能有一个队列时间。 子进程对队列进行操作,锁也必须锁定在对象上

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value  
  q.put(value)  
 lock.release() #释放锁 
 
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
 
if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'

 
反对 0举报 0 收藏 0 打赏 0评论 0
 
更多>同类资讯
推荐图文
推荐资讯
点击排行
网站首页  |  关于我们  |  联系方式  |  使用协议  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报
Powered By DESTOON