python多进程并发机制:

一、python并发编程之多进程

一.什么是进程

因为GIL(全局解释器锁)的限制(GIL是用来保证在任意时刻只能有一个控制线程在执行),所以python中的多线程并非真正的多线程。只有python程序是I/O密集型应用时,多线程才会对运行效率有显著提高(因在等待I/O的时,会释放GIL允许其他线程继续执行),而在计算密集型应用中,多线程并没有什么用处。考虑到要充分利用多核CPU的资源,允许python可以并行处理一些任务,这里就用到了python多进程编程了。multiprocessing是python中的多进程模块,使用这个模块可以方便地进行多进程应用程序开发。multiprocessing模块中提供了:Process、Pool、Queue、Manager等组件。

1.什么是进程:

 回答: 正在进行的一个过程或者说一个任务,而 这个过程就叫做进程。

1.1 进程与程序的区别:

 回答: 程序仅仅是一堆代码而已,而进程指的是程序的运行过程。

2.并发与并行。

回答:无论是并行还是并发,在用户看来都是 同时  运行的,
不管是进程还是线程,都只是一个任务而已,真实干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能运行一个任务。 

2.1: 并发什么意思:

 回答:并发是伪并行,即看起来是同事运行。单个cpu+多道技术就可以实现并发。(并行也属于并发)

2.2: 并行什么意思:

 回答:并行就是同时运行,只有具备多个cpu才能实行并行。

3.同步 与 异步

3.1什么是同步:

 同步执行:一个进程在执行某个任务是,另外一个进程必须等待其执行完毕,才能继续执行

 异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行同步通信,

 打电话时就是同步通信,发短信时就是异步通信。

这里使用了multprocessing.Pool进程池,来动态增加进程

1.1 multiprocessing模块介绍

  由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

  multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。

  该Process对象与Thread对象的用法相同,也有start(), run(),
join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类
(这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

 

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

 

方法介绍:

p1=Process(target=foo,args=('p1',))    开启子进程
p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  

p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

 

在linux中,主进程的全局变量,子进程能用

在window中,主进程的全局变量,子进程不能用能用

 

1.2 创建并开启子进程的两种方式

通过函数:

#函数方式
def foo(name):
    print('%s starting'%name)
    time.sleep(random.randrange(1,5))
    print('%s is end'%name)

if __name__ == '__main__':
    print('主进程starting')

    p1=Process(target=foo,args=('p1',))
    p2=Process(target=foo,args=('p2',))
    p3=Process(target=foo,args=('p3',))

    # p1.daemon = True
    # p2.daemon = True
    # p3.daemon = True

    p1.start()
    p2.start()
    p3.start()

    # p1.join()
    # p2.join()
    # p3.join()

    # time.sleep(1)
    print('主进程ending')

通过类的方式:

#用类的方式创建子进程
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        # self.name=name

    def run(self):
        print('%s starting' % self.name)
        time.sleep(random.randrange(1, 5))
        print('%s id is %s' % (self.name,self.pid))
        print('%s is end'%self.name)

if __name__ == '__main__':
    print('主进程starting')
    for i in range(4):
        p = MyProcess(str(i))
        p.start()

    print('主进程ending')

1.3 Process对象的其他方法或属性

图片 1图片 2

#进程对象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) #结果为True

print('开始')
print(p1.is_alive()) #结果为False

进程对象的其他方法一:terminate,is_alive

 

图片 3图片 4

#进程对象的其他方法二:p.daemon=True,p.join
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程死,p跟着一起死
p.start()
p.join(0.0001) #等待p停止,等0.0001秒就不再等了
print('开始')

#进程对象的其他方法二:p.daemon=True,p.join

注:p.join(),是父进程在等p的结束,是父进程阻塞在原地,而p仍然在后台运行

图片 5图片 6

#进程对象的其他属性:name,pid
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
        #                    #所以加到这里,会覆盖我们的self.name=name

        #为我们开启的进程设置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

p=Piao('egon')
p.start()
print('开始')
print(p.pid) #查看pid

#进程对象的其他属性:name,pid

 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。我们自己在python文件中写了一些代码,这叫做程序,运行这个python文件的时候,这叫做进程。

1 Process类

一、python 并发编辑之多进程

#coding=utf-8
from multiprocessing import Pool
import time
def ft(x):
  #多任务,系统自动化配进程执行
  for i in range(2):
    print i,'-----------',x
    time.sleep(1)



def main_process():
    pool = Pool(processes=4) #控制进程池的大小,为4个进程
    for i in range(10):
        #添加入进程池,apply带_async,单独apply为阻塞版本;函数名target,参数args
        result = pool.apply_async(ft,(i,))

    pool.close()
    pool.join()

    if result.successful():
        print('_____success_____')


if __name__=="__main__":
    main_process()

1.2 进程间通讯 

 狭义定义:进程是正在运行的程序的实例(an
instance of a computer program that is being executed)。 广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。 举例:
比如py1文件中有个变量a=1,py2文件中有个变量a=2,他们两个会冲突吗?不会的,是不是,因为两个文件运行起来后是两个进程,操作系统让他们在内存上隔离开,对吧。图片 7图片 8

1.1 构造方法

def __init__(self, group=None, target=None, name=None, args=(), kwargs={})

group:进程所属组,基本不用
target:进程调用对象(可以是一个函数名,也可以是一个可调用的对象(实现了__call__方法的类))
args:调用对象的位置参数元组
name:别名
kwargs:调用对象的关键字参数字典

1.1multiprocessing  模块介绍

  python
中的多线程无法利用多核优势,如果想要充分地使用多核cpu的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。python提供了非常好用的多进程包,这个包是:
multiprocessing.

  multiprocessing模块的功能众多:如  支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

  需要再次强调的一点是:与线程不用,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

运行结果:

1.2.1进程间通信(IPC)方式一:队列(推荐使用)

from multiprocessing import Process
import queue

def f(q,n):
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':

    q = queue.Queue()  
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。我们自己在python文件中写了一些代码,这叫做程序,运行这个python文件的时候,这叫做进程。  狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。  广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。  举例: 比如py1文件中有个变量a=1,py2文件中有个变量a=2,他们两个会冲突吗?不会的,是不是,因为两个文件运行起来后是两个进程,操作系统让他们在内存上隔离开,对吧。

1.2 实例方法

is_alive():返回进程是否在运行
start():启动进程,等待CPU调度
join([timeout]):阻塞当前上下文环境,直到调用此方法的进程终止或者到达指定timeout
terminate():不管任务是否完成,立即停止该进程
run():start()调用该方法,当实例进程没有传入target参数,stat()将执行默认的run()方法

1.2 Process 类的介绍:

创建进程的类:

Process ([group [, target [, name [, args [ , kwargs]]]]) ,由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)


强调:
1.需要使用关键字的方式来指定参数
2.args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号。

参数介绍:

1。group:参数未使用,值始终为None
2. target:表示调用对象,即 子进程要执行的任务
3. args  : 表示调用对象的位置传参数元组, args = (1,2,“aray”,)
4. kwargs:表示调用对象的字典,kwargs = {’name':'aray','age':18} 
5.name : 为子进程的名称

方法介绍:

1 p.start():启动进程,并调用该子进程中的p.run() 
2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
4 p.is_alive():如果p仍然运行,返回True
5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  

属性介绍:

1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 p.name:进程的名称
3 p.pid:进程的pid 
4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 
5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可) 
C:\Python27\python.exe D:/weixin/temp/testtmp.py
0 ----------- 0
0 ----------- 1
0 ----------- 2
0 ----------- 3
1 ----------- 0
1 ----------- 1
1 ----------- 2
1 ----------- 3
0 ----------- 4
0 ----------- 5
0 ----------- 6
0 ----------- 7
1 ----------- 4
1 ----------- 5
1 ----------- 6
1 ----------- 7
0 ----------- 8
0 ----------- 9
1 ----------- 8
1 ----------- 9
_____success_____

Process finished with exit code 0

1.2.2进程间通信(IPC)方式二:管道(不推荐使用,了解即可)

from multiprocessing import Pipe,Process

def foo(sk):
    sk.send("hello world")
    print(sk.recv())

if __name__ == '__main__':
    sock, conn = Pipe()
    p=Process(target=foo,args=(sock,))
    p.start()

    print(conn.recv())
    conn.send("hi son")

>>:hello world
       hi son

  Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。

进程的概念图片 9图片 10

1.3 属性

authkey
daemon:守护进程标识,在start()调用之前可以对其进行修改
exitcode:进程的退出状态码
name:进程名
pid:进程id

1.3 Process 类的使用

从以上运行结果可以看出,一次最多执行了我们设定的4个进程。

1.2.3进程间通信方式三:共享数据(不推荐使用,了解即可)

  Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。可以通过Manager实现数据共享,

  从数据安全角度看,进程间通信应该尽量避免使用本节所讲的共享数据的方式

用manager实现进程共享数据

from multiprocessing import Process,Manager

def foo(d,l,i):
    d[i] = i
    d['egon'] = 'egon'
    l.append(i**2)

if __name__ == '__main__':
    # manager = Manager()
    dic = Manager().dict()
    Mlist = Manager().list([11,22,33])
    l=[]
    for i in range(5):
        p=Process(target=foo,args=(dic,Mlist,i))
        p.start()
        l.append(p)

    for j in l:
        j.join()

    print(dic)
    print(Mlist)
动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。并发性:任何进程都可以同其他进程一起并发执行独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进结构特征:进程由程序、数据和进程控制块三部分组成。多个不同的进程可以包含相同的程序:一个程序在不同的数据集里就构成不同的进程,能得到不同的结果;但是执行过程中,程序不能发生改变。

1.4 实例

实例一:传入的target为一个函数

#!/usr/bin/python
#coding=utf-8

import time
import random
from multiprocessing import Process

def foo(i):
    print time.ctime(), "process the %d begin ......" %i
    time.sleep(random.uniform(1,3))
    print time.ctime(), "process the %d end !!!!!!" %i

if __name__ == "__main__":
    print time.ctime(), "process begin......"

    p_lst = list()
    for i in range(4):
        p_lst.append(Process(target=foo, args=(i,)))    #创建4个子进程
    #启动子进程
    for p in p_lst:
        p.start()
    #等待子进程全部结束
    for p in p_lst:
        p.join()

    print time.ctime(), "process end!!!!!"   

实例二:传入的target为一个可调用对象

#!/usr/bin/python
#coding=utf-8

import time
import random
from multiprocessing import Process

class Foo(object):
    def __init__(self, i):
        self.i = i

    def __call__(self):
        '''
        使Foo的实例对象成为可调用对象
        '''                                                                                                        
        print time.ctime(), "process the %d begin ......" %self.i
        time.sleep(random.uniform(1,3))
        print time.ctime(), "process the %d end !!!!!!" %self.i

if __name__ == "__main__":
    print time.ctime(), "process begin......"

    p_lst = list()
    for i in range(4):
        p_lst.append(Process(target=Foo(i)))    #创建4个子进程
    #启动子进程
    for p in p_lst:
        p.start()
    #等待子进程全部结束
    for p in p_lst:
        p.join()

    print time.ctime(), "process end!!!!!"

实例三:派生Process子类,并创建子类的实例

#!/usr/bin/python                                                                                                  
#coding=utf-8

import time
import random
from multiprocessing import Process

class MyProcess(Process):

    def __init__(self, i):
        Process.__init__(self)
        self.i = i

    def run(self):
        '''
        #重写run方法,当调用start方法时,就会调用当前重写的run方法中的程序
        '''
        print time.ctime(), "process the %d begin ......" %self.i
        time.sleep(random.uniform(1,3))
        print time.ctime(), "process the %d end !!!!!!" %self.i

if __name__ == "__main__":
    print time.ctime(), "process begin......"

    p_lst = list()
    for i in range(4):
        p_lst.append(MyProcess(i))  #创建4个子进程
    #启动子进程
    for p in p_lst:
        p.start()
    #等待子进程全部结束
    for p in p_lst:
        p.join()

    print time.ctime(), "process end!!!!!"

**          part1:创建并开启子进程的两种方式**

注意:在windows中Process()必须放到# if __name__ ==
‘__main__’:下

Since Windows has no fork, the multiprocessing module starts a new
Python process and imports the calling module.
If Process() gets called upon import, then this sets off an infinite
succession of new processes (or until your machine runs out of
resources).
This is the reason for hiding calls to Process() inside

if __name__ == “__main__”
since statements inside this if-statement will not get called upon
import.

由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
这是隐藏对Process()内部调用的原,使用if __name__ == “__main
__”,这个if语句中的语句将不会在导入时被调用。

图片 11图片 12

方法一:
import time
import random
from multiprocessing import  Process
def piao(name):
    print('%s playing' %name)
    time.sleep(random.randint(1,3))
    print('%s playing end'%name)

if __name__ == '__main__':
    p1 = Process(target=piao,args = ('aray',))  #必须加逗号
    p2 = Process(target=piao,args = ('zxc',))
    p3 = Process(target=piao,args = ('asd',))
    p4 = Process(target=piao,args = ('qwe',))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主线程')

输出结果“
主线程
aray playing
zxc playing
asd playing
qwe playing
aray playing end
qwe playing end
zxc playing end
asd playing end


方法二:
import time
import random
from multiprocessing import Process

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('%s playing'%self.name)
        time.sleep((random.randint(1,3)))
        print('%s playing end'%self.name)

if __name__ == '__main__':

    p1=Piao('aray')
    p2=Piao('zxc')
    p3=Piao('asd')
    p4=Piao('qwe')
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主进程')

输出结果:
主进程
aray playing
zxc playing
asd playing
qwe playing
zxc playing end
aray playing end
asd playing end
qwe playing end

开启进程的方法

 练习:把  socket 通信编程并发的形式。

图片 13图片 14

服务端:
from socket import *
from multiprocessing import Process

server = socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
print('ready go')

def talk(conn,addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
            print('from client msg:%s'%msg)
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,client_addr =server.accept()
        p = Process(target=talk, args=(conn,client_addr))
        p.start()


客户端:
from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg = input('>>:').strip()
    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print('from server %s'%msg.decode('utf-8'))

View Code

         part2: Process对象的其他方法或属性

#进程对象的其他方法1:iterminate,is_alive

from multiprocessing import Process
import time
import random

class Play(Process):
    def __init__(self,name):
        self.name = name
        super().__init__()

    def run(self):
        print('%s is playing ' %self.name)
        time.sleep(random.randint(1,3))
        print('%s play end'%self.name)

if __name__ == '__main__':

    p1 = Play('aray')
    p1.start()

    p1.terminate()   #关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p1.is_alive())
    print('开始')
    time.sleep(3)
    print(p1.is_alive())


输出结果:
True
开始
False

#进程对象的其他方法2: p,daemon = True,p.join
from multiprocessing import Process
import time
import random

class Play(Process):
    def __init__(self,name):
        # self.name =name
        super().__init__()#Process的__init__方法会执行self.name=Piao-1,所以加到这里,会覆盖我们的self.name=name
        self.name = name  #为我们开启的进程设置名字的做法
    def run(self):
        print('%s is playing '%self.name)
        time.sleep(random.randint(1,3))
        print('%s is paly end'%self.name)

if __name__ == '__main__':
    p = Play('aray')
    p.daemon =True  #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程死,p跟着一起死
    p.start()
    p.join(1)  #等待p停止,等1秒就不再等了
    print('开始')
    print(p.pid)  #查看pid

输出结果:
aray is playing 
开始
20892

1.互斥锁 也 称 同步锁:Lock()

图片 15图片 16

一:没加锁之前
import os
import time
from multiprocessing import Process
def work():

    print('task[%s] is running '%os.getpid())
    time.sleep(3)
    print('task[%s] is done'%os.getpid())

if __name__ == '__main__':
    p1 = Process(target=work,)
    p2 = Process(target=work,)
    p3 = Process(target=work,)
    p1.start()
    p2.start()
    p3.start()
    print('主公')

输出结果:
主公
task[13768] is running 
task[22400] is running 
task[21128] is running 
task[13768] is done
task[22400] is done
task[21128] is done

Process finished with exit code 0


二、加锁之后
import os
import time
from multiprocessing import Process,Lock
def work(mutex):
    mutex.acquire()
    print('task[%s] is running '%os.getpid())
    time.sleep(3)
    print('task[%s] is done'%os.getpid())
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    p1 = Process(target=work,args=(mutex,))
    p2 = Process(target=work,args=(mutex,))
    p3 = Process(target=work,args=(mutex,))
    p1.start()
    p2.start()
    p3.start()
    print('主公')

输出结果:
主公
task[6700] is running 
task[6700] is done
task[21140] is running 
task[21140] is done
task[14584] is running 
task[14584] is done

Process finished with exit code 0

互斥锁

2.守护进程:daemon

图片 17图片 18

import os
import time
from multiprocessing import Process
def work():
    print('task[%s] is running '%os.getpid())
    # time.sleep(1)
    print('task[%s] is done'%os.getpid())

if __name__ == '__main__':
    p1 = Process(target=work)
    p2 = Process(target=work)
    p3 = Process(target=work)
    p1.daemon = True   #守护进程
    p2.daemon = True   #守护进程
    p3.daemon = True   #守护进程 
    p1.start()
    p2.start()
    p3.start()
    # time.sleep(2)
    print('主')

输出结果:
主

守护进程

3.主进程等待机制:join()

图片 19图片 20

import os
import time
from multiprocessing import Process,Lock
def work(mutex):
    mutex.acquire()
    print('task[%s] 洗衣服 '%os.getpid())
    time.sleep(3)
    print('task[%s] 晒衣服'%os.getpid())
    mutex.release()
def work2(mutex):
    mutex.acquire()
    print('task[%s] 煮饭 '%os.getpid())
    time.sleep(3)
    print('task[%s] 炒菜'%os.getpid())
    mutex.release()
def work3(mutex):
    mutex.acquire()
    print('task[%s] 下电影 '%os.getpid())
    time.sleep(3)
    print('task[%s] 看电影'%os.getpid())
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    p1 = Process(target=work,args=(mutex,))
    p2 = Process(target=work2,args=(mutex,))
    p3 = Process(target=work3,args=(mutex,))
    p1.start()
    p2.start()
    p3.start()
    # p1,p2,p3.join()
    p1.join()
    p2.join()
    p3.join()
    print('主')

输出结果:
task[22592] 洗衣服 
task[22592] 晒衣服
task[17328] 煮饭 
task[17328] 炒菜
task[23072] 下电影 
task[23072] 看电影
主

Process finished with exit code 0

主进程等待机制

4.强制关闭:terminate()

图片 21图片 22

import os
import time
from multiprocessing import Process,Lock
def work(mutex):
    mutex.acquire()
    print('task[%s] is running '%os.getpid())
    time.sleep(3)
    print('task[%s] is done'%os.getpid())
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    p1 = Process(target=work,args=(mutex,))
    # p2 = Process(target=work,args=(mutex,))
    # p3 = Process(target=work,args=(mutex,))
    p1.start()
    # p2.start()
    # p3.start()
    p1.terminate()
    time.sleep(3)
    print(p1.is_alive())
    print(p1.name)
    print(p1.pid)

输出结果:
False
Process-1
22888

Process finished with exit code 0

强制关闭进程

5.进程队列:Queue

图片 23图片 24

from multiprocessing import Process,Queue

q = Queue(4)   #设置 4 个进程
q.put('first',)
q.put('second',)
q.put('third',)
q.put('fourth',block=True)   #block  默认为True。如果队列不足4就在那等待。

print(q.get())
print(q.get())
print(q.get())
print(q.get())

输出结果:
first
second
third
fourth

Process finished with exit code 0

进程队列

练习:

抢票:

图片 25图片 26

import json
import os
import random
from multiprocessing import Process,Lock

#查询票数
def search():
    dic = json.load(open('db.txt')) #打开文件 并且 序列化
    print('剩余票数:%s '%dic['count'])

#购票
def get_ticket():
    dic = json.load(open('db.txt'))
    if dic['count'] > 0:
        dic['count'] -=1
        time.sleep(random.randint(1,3))
        json.dump(dic,open('db.txt','w'))
        print('购买成功: %s'%os.getpid())

#运行调用
def task(mutex):     
    search()
    mutex.acquire()
    get_ticket()
    mutex.release()    

if __name__ == '__ main __':
    mutex = Lock()
    for i in range(5):
        p = Process(target = task , args = (mutex,))
        p.start()

输出结果: (这里  db.txt  里面的  count  值为  1.就一张票)
剩余票数:1
剩余票数:1
剩余票数:1
剩余票数:1
剩余票数:1
购买成功:13176

Process finished with exit code 0

购票

多消费者和多生产者:

图片 27图片 28

import os 
import random
import time
from multiprocessing import Process,JoinableQueue

def producer_bun(q): #生产包子
    for i in range(5):
        time.sleep(2)  #睡2s  让它能有时间响应
        res = ' 包子 %s' %i
        q.put(res)
        print('%s 制造了 %s' %(os.getpid(),res))
    q.join()

def producer_sticks(q):  #生产油条
    for i in range(5):
        time.sleep(2):
        res = '油条 %s' % i 
        q.put(res)
        print('%s 制造了 %s',%(os.getpid(),res))
    q.join()

def consumer(q):  #消费者
    while True:
        res  = q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃了 %s'%(os.getpid(),res))
        q.task_done()

if __name__ == '__main__':
    q= JoinableQueue()
    p1 = Process(target = prodcuer_bun, args = (q,))    #生产者1
    p2 = Process(target = producer_sticks,args = (q,))  #生产者2

    p3 = Process(target = consumer, args = (q,))    #消费者1
    p4 = Process(target = consumer, args = (q,))    #消费者2

    p3.daemon = True          #守护进程
    p4.daemon = True          #守护进程
    p_l = [p1,p2,p3,p4]
    for p in p_l:
        p.start()
    p1.join()
    p2.join()
    print('主')

输出结果:
3392 制造了 包子0
20628 制造了 油条0
8180 吃了 包子0
15756 吃了 油条0
3392 制造了 包子1
20628 制造了 油条1
15756 吃了 油条1
3392 制造了 包子2
20628 制造了 油条2
8180 吃了 包子1
15756 吃了 包子2
3392 制造了 包子3
20628 制造了 油条3
8180 吃了 油条2
3392 制造了 包子4
8180 吃了 油条3
20628 制造了 油条4
15756 吃了 包子3
15756 吃了 油条4
8180 吃了 包子4
主

Process finished with exit code 0

View Code

 开启进程的两种方法:

图片 29图片 30

#继承调用:
import time
import random
from multiprocessing import Process

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('%s playing'%self.name)
        time.sleep((random.randint(1,3)))
        print('%s playing end'%self.name)

if __name__ == '__main__':

    p1=Piao('aray')
    p2=Piao('zxc')
    p3=Piao('asd')
    p4=Piao('qwe')
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主进程')

输出结果:
主线程
aray playing
zxc playing
asd playing
qwe playing
aray playing end
asd playing end
qwe playing end
zxc playing end

Process finished with exit code 0


直接调用:
import time
import random
from multiprocessing import  Process
def piao(name):
    print('%s playing' %name)
    time.sleep(random.randint(1,3))
    print('%s playing end'%name)

if __name__ == '__main__':
    p1 = Process(target=piao,args = ('aray',))  #必须加逗号
    p2 = Process(target=piao,args = ('zxc',))
    p3 = Process(target=piao,args = ('asd',))
    p4 = Process(target=piao,args = ('qwe',))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主线程')

输出结果:
主线程
aray playing
zxc playing
asd playing
qwe playing
aray playing end
asd playing end
qwe playing end
zxc playing end

Process finished with exit code 0

View Code

通过进程池和回调函数,完成一个爬虫网页的案例:

图片 31图片 32

from multiprocessing import Pool
import requests
import os

def get_page(url):
    print('%s  get  %s'%(os.getpid(),url))
    response = requests.get(url)
    return {'url':url,'text':response.text}

def parse_page(res):
    print('%s  parse %s'%(os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res = 'url : %s size: %s \n ' %(res['url'],len(res['text']))
        f.write(parse_res)


if __name__ == '__main__':
    p = Pool(4)
    urls = [
        'https://www.baidu.com',
        'http://www.openstack.org',
        'https://www.python.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    for url in urls:
        p.apply_async(get_page,args=(url,),callback=parse_page)  #callback 回调函数

    p.close()
    p.join()
    print('主',os.getpid())

View Code

 

 

1.3 进程池(重点学习)

  开多进程的目的是为了并发,如果有多核,通常有几个核就开几个进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行),但很明显需要并发执行的任务要远大于核数,这时我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数…

  进程池(Pool)可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程

   
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。

进程池实例:

from multiprocessing import Pool
import time

def foo(n):
    print(n)
    time.sleep(1)

if __name__ == '__main__':

    pool_obj=Pool(5)

    for i in range(100):
        pool_obj.apply_async(func=foo,args=(i,))

    pool_obj.close()
    pool_obj.join()

    print("ending")

  

 

进程的特征图片 33图片 34

2 Pool类

当使用Process类管理非常多(几十上百个)的进程时,就会显得比较繁琐,这是就可以使用Pool(进程池)来对进程进行统一管理。当池中进程已满时,有新进程请求执行时,就会被阻塞,直到池中有进程执行结束,新的进程请求才会被放入池中并执行。

 

程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。而进程是程序在处理机上的一次执行过程,它是一个动态的概念。程序可以作为一种软件资料长期存在,而进程是有一定生命期的。程序是永久的,进程是暂时的。举例:就像qq一样,qq是我们安装在自己电脑上的客户端程序,其实就是一堆的代码文件,我们不运行qq,那么他就是一堆代码程序,当我们运行qq的时候,这些代码运行起来,就成为一个进程了。

2.1 构造方法

def __init__(self, processes=None, initializer=None, initargs=(),                 maxtasksperchild=None)

processes:池中可容纳的工作进程数量,默认情况使用os.cpu_count()返回的数值,一般默认即可
其他参数暂不清楚有什么用处……

Linux and
python学习交流1,2群已满.

进程与程序的区别

2.2 实例方法

apply(self, func, args=(),
kwds={})
:阻塞型进程池,会阻塞主进程,直到工作进程全部退出,一般不用这个
apply_async(self, func, args=(), kwds={},
callback=None)
:非阻塞型进程池
map(self, func, iterable,
chunksize=None)
:与内置map行为一致,它会阻塞主进程,直到map运行结束
map_async(self, func, iterable, chunksize=None,
callback=None)
:非阻塞版本的map
close():关闭进程池,不在接受新任务
terminate():结束工作进程
join():阻塞主进程等待子进程退出,该方法必须在close或terminate之后执行

Linux and
python学习交流3群新开,欢迎加入,一起学习.qq 3群:563227894

注意:同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。比如打开暴风影音,虽然都是同一个软件,但是一个可以播放米老鼠,一个可以播放唐老鸭。**

2.3 实例

#!/usr/bin/python
#coding=utf-8

import time
import random
from multiprocessing import Pool

def foo(i):
    print time.ctime(), "process the %d begin ......" %i
    time.sleep(random.uniform(1,3))
    print time.ctime(), "process the %d end !!!!!!" %i

if __name__ == "__main__":

    print time.ctime(), "process begin......"
    pool = Pool(processes = 2)  #设置进程池中最大并行工作进程数为2                                                 
    for i in range(4):
        pool.apply_async(foo, args=(i,))    #提交4个子进程任务

    pool.close()
    pool.join()

    print time.ctime(), "process end!!!!!"

结果:

Fri Nov 18 13:57:22 2016 process begin......
Fri Nov 18 13:57:22 2016 process the 0 begin ......
Fri Nov 18 13:57:22 2016 process the 1 begin ......
Fri Nov 18 13:57:23 2016 process the 1 end !!!!!!
Fri Nov 18 13:57:23 2016 process the 2 begin ......
Fri Nov 18 13:57:24 2016 process the 0 end !!!!!!
Fri Nov 18 13:57:24 2016 process the 3 begin ......
Fri Nov 18 13:57:25 2016 process the 2 end !!!!!!
Fri Nov 18 13:57:25 2016 process the 3 end !!!!!!
Fri Nov 18 13:57:25 2016 process end!!!!!

不前进,不倒退,停止的状态是没有的.

二.并发和并发

3 Queue类

Queue主要提供进程间通信以及共享数据等功能。除Queue外还可以使用Pipes实现进程间通信(Pipes是两个进程间进行通信)

一起进步,与君共勉,

通过进程之间的调度,也就是进程之间的切换,我们用户感知到的好像是两个视频文件同时在播放,或者音乐和游戏同时在进行,那就让我们来看一下什么叫做并发和并行

3.1 构造方法

def __init__(self, maxsize=0)

maxsize:用于设置队列最大长度,当为maxsize<=0时,队列的最大长度会被设置为一个非常大的值(我的系统中队列最大长度被设置为2147483647)

 

无论是并行还是并发,在用户看来都是’同时’运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

3.2 实例方法

put(self, obj, block=True, timeout=None)

1、block为True,若队列已满,并且timeout为正值,该方法会阻塞timeout指定的时间,直到队列中有出现剩余空间,如果超时,会抛出Queue.Full异常
2、block为False,若队列已满,立即抛出Queue.Full异常

get(self, block=True, timeout=None)

block为True,若队列为空,并且timeout为正值,该方法会阻塞timeout指定的时间,直到队列中有出现新的数据,如果超时,会抛出Queue.Empty异常
block为False,若队列为空,立即抛出Queue.Empty异常

  并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,

3.3 实例

#!/usr/bin/python
#coding=utf-8

import time
import random
from multiprocessing import Process, Queue

def write(q):
    for value in "abcd":
        print time.ctime(), "put %s to queue" %value
        q.put(value)
        time.sleep(random.random())

def read(q):
    while True:
        value = q.get()
        print time.ctime(), "get %s from queue" %value

if __name__ == "__main__":
    #主进程创建Queue,并作为参数传递给子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    #启动子进程pw,往Queue中写入
    pw.start()
    #启动子进程pr,从Queue中读取
    pr.start()
    #等待写进程执行结束
    pw.join()
    #终止读取进程                                                                                                  
    pr.terminate()

运行结果:

Fri Nov 18 15:04:13 2016 put a to queue
Fri Nov 18 15:04:13 2016 get a from queue
Fri Nov 18 15:04:13 2016 put b to queue
Fri Nov 18 15:04:13 2016 get b from queue
Fri Nov 18 15:04:13 2016 put c to queue
Fri Nov 18 15:04:13 2016 get c from queue
Fri Nov 18 15:04:13 2016 put d to queue
Fri Nov 18 15:04:13 2016 get d from queue

图片 35图片 36

4 Manager类

Manager是进程间数据共享的高级接口。
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager支持的类型有list,
dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition,
Event, Queue, Value和Array。
如下是使用Manager管理一个用于多进程共享的dict数据

#!/usr/bin/python
#coding=utf-8

import time
import random
from multiprocessing import Manager, Pool

def worker(d, key, value):
    print time.ctime(), "insert the k-v pair to dict begin: {%d: %d}" %(key, value)
    time.sleep(random.uniform(1,2))
    d[key] = value  #访问共享数据
    print time.ctime(), "insert the k-v pair to dict end: {%d: %d}" %(key, value)


if __name__ == "__main__":
    print time.ctime(), "process for manager begin"
    mgr = Manager()
    d = mgr.dict()
    pool = Pool(processes=4)                                                                                       
    for i in range(10):
        pool.apply_async(worker, args=(d, i, i*i))

    pool.close()
    pool.join()
    print "Result:"
    print d
    print time.ctime(), "process for manager end"

运行结果

Fri Nov 18 16:36:19 2016 process for manager begin
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {0: 0}
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {1: 1}
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {2: 4}
Fri Nov 18 16:36:19 2016 insert the k-v pair to dict begin: {3: 9}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict end: {3: 9}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict begin: {4: 16}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict end: {0: 0}
Fri Nov 18 16:36:20 2016 insert the k-v pair to dict begin: {5: 25}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict end: {2: 4}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict begin: {6: 36}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict end: {1: 1}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict begin: {7: 49}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict end: {5: 25}
Fri Nov 18 16:36:21 2016 insert the k-v pair to dict begin: {8: 64}
Fri Nov 18 16:36:22 2016 insert the k-v pair to dict end: {4: 16}
Fri Nov 18 16:36:22 2016 insert the k-v pair to dict begin: {9: 81}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {8: 64}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {6: 36}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {7: 49}
Fri Nov 18 16:36:23 2016 insert the k-v pair to dict end: {9: 81}
Result:
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
Fri Nov 18 16:36:23 2016 process for manager end
你是一个cpu,你同时谈了三个女朋友,每一个都可以是一个恋爱任务,你被这三个任务共享要玩出并发恋爱的效果,应该是你先跟女友1去看电影,看了一会说:不好,我要拉肚子,然后跑去跟第二个女友吃饭,吃了一会说:那啥,我去趟洗手间,然后跑去跟女友3开了个房,然后在你的基友眼里,你就在和三个女友同时在一起玩。

单核cpu多进程并发举例

  并行:并行:同时运行,只有具备多个cpu才能实现并行

图片 37图片 38

将多个cpu必须成高速公路上的多个车道,进程就好比每个车道上行驶的车辆,并行就是说,大家在自己的车道上行驶,会不影响,同时在开车。这就是并行

多个cpu多个进程举例三.同步\异步\阻塞\非阻塞1.进程状态图片 39

在了解其他概念之前,我们首先要了解进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。

  就绪状态

    当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

  执行/运行状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

  阻塞状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件等。

    事件请求:input、sleep、文件输入输出、recv、accept等

    事件发生:sleep、input等完成了

    时间片到了之后有回到就绪状态,这三个状态不断的在转换。

图片 40

2.同步异步

所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。其实就是一个程序结束才执行另外一个程序,串行的,不一定两个程序就有依赖关系。阻塞和非阻塞这两个概念与程序等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序等待消息通知时的状态角度来说的

所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了至于被依赖的任务最终是否真正完成,依赖它的任务无法确定所以它``是不可靠的``任务序列

图片 41图片 42

比如我们去楼下的老家肉饼吃饭,饭点好了,取餐的时候发生了一些同步异步的事情。同步:我们都站在队里等着取餐,前面有个人点了一份肉饼,后厨做了很久,但是由于同步机制,我们还是要站在队里等着前面那个人的肉饼做好取走,我们才往前走一步。异步:我们点完餐之后,点餐员给了我们一个取餐号码,跟你说,你不用在这里排队等着,去找个地方坐着玩手机去吧,等饭做好了,我叫你。这种机制就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中等着取餐的你)往往注册一个回调机制,在所等待的事件被触发时由触发机制通过某种机制(喊号,‘250号你的包子好了‘)找到等待该事件的人。

同步异步举例

3.阻塞与非阻塞

阻塞和非阻塞这两个概念与程序等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序等待消息通知时的状态角度来说的

图片 43图片 44

继续上面的那个例子,不论是排队还是使用号码等待通知,如果在这个等待的过程中,等待者除了等待消息通知之外不能做其它的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。相反,有的人喜欢在等待取餐的时候一边打游戏一边等待,这样的状态就是非阻塞的,因为他没有阻塞在这个消息通知上,而是一边做自己的事情一边等待。阻塞的方法:input、time.sleep,socket中的recv、accept等等。

阻塞与非阻塞举例

4.同步/异步 与 阻塞和非阻塞

  1. 同步阻塞形式

    效率最低。拿上面的例子来说,就是你专心排队,什么别的事都不做。

  1. 异步阻塞形式

    如果在排队取餐的人采用的是异步的方式去等待消息被触发,也就是领了一张小纸条,假如在这段时间里他不能做其它的事情,就在那坐着等着,不能玩游戏等,那么很显然,这个人被阻塞在了这个等待的操作上面;

异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

  1. 同步非阻塞形式

    实际上是效率低下的。

    想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

  1. 异步非阻塞形式

    效率更高,

    因为打电话是你的事情,而通知你则是柜台的事情,程序没有在两种不同的操作中来回切换

    比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉点餐员说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。

  很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来,同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞

四.进程的创建

 1.multiprocessing模块第一种创建进程方式

import timefrom multiprocessing import Processdef f1():    time.sleep(3)    print('xxx')def f2():    time.sleep(3)    print('sss')# window系统下必须写main,因为windows系统创建子进程的方式决定的,开启一个子进程,这个子进程会copy一份主进程的所有代码,并且机制类似于import引入,这样就容易导致引入代码的时候,被引入的代码中的可执行程序被执行,导致递归开始进程,会报错if __name__ == '__main__':    p1 = Process(target=f1)    p2 = Process(target=f2)    p1.start()    p2.start()

  2.for循环创建进程

import timefrom multiprocessing import Processdef f1:    time.sleep(3)    printif __name__ == '__main__':    for i in range(20):        p1 = Process(target=f1, args=        p1.start()

  3.multiprocessing模块第二种创建进程方式

from multiprocessing import Processclass MyProcess:    def __init__:        super().__init__()  # 别忘了执行父类的init        self.n = n    def run:        print('宝宝and%s不可告人的事情'%self.n)if __name__ == '__main__':    p1 = MyProcess('高望')    p1.start()

  4.join方法

 1 import time 2 from multiprocessing import Process 3  4 def f1(): 5     time.sleep(2) 6     print('xxx') 7  8 def f2(): 9     time.sleep(2)10     print('sss')11 12 if __name__ == '__main__':13 14 15     p1 = Process(target=f1,)16 17     p2 = Process(target=f2,)18 19     p1.start()20     p2.start()21     p1.join()  # 等待子进程p1执行完毕22     p2.join()  # 等待子进程p2执行完毕23     print('我要等待子进程')24     print('我是主进程!!!')

五.进程锁

通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

多进程抢占输出资源,导致打印混乱的示例

图片 45图片 46

import osimport timeimport randomfrom multiprocessing import Processdef work:    print('%s: %s is running' %(n,os.getpid    time.sleep(random.random    print('%s:%s is done' %(n,os.getpidif __name__ == '__main__':    for i in range(5):        p=Process(target=work,args=        p.start()# 看结果:通过结果可以看出两个问题:问题一:每个进程中work函数的第一个打印就不是按照我们for循环的0-4的顺序来打印的#问题二:我们发现,每个work进程中有两个打印,但是我们看到所有进程中第一个打印的顺序为0-2-1-4-3,但是第二个打印没有按照这个顺序,变成了2-1-0-3-4,说明我们一个进程中的程序的执行顺序都混乱了。#问题的解决方法,第二个问题加锁来解决,第一个问题是没有办法解决的,因为进程开到了内核,有操作系统来决定进程的调度,我们自己控制不了# 0: 9560 is running# 2: 13824 is running# 1: 7476 is running# 4: 11296 is running# 3: 14364 is running# 2:13824 is done# 1:7476 is done# 0:9560 is done# 3:14364 is done# 4:11296 is done多进程抢占输出资源,导致打印混乱的示例

View Code

加锁版

 1 import time,json 2 from multiprocessing import Process,Lock 3  4 def show_t: 5     f = open('ticket', mode='r',encoding='utf-8') 6     dic = json.load 7     print('%s查询剩余票数为%s'%(i,dic['count'])) 8 def get_t: 9     l1.acquire()10     f = open('ticket', mode='r', encoding='utf-8')11     dic = json.load12     if dic['count'] > 0:13         dic['count'] -= 114         print('%s抢票成功'%i)15         time.sleep(0.2)16         f = open('ticket',mode= 'w',encoding='utf-8')17         json.dump18     else:19         print('没票了')20     l1.release()21 if __name__ == '__main__':22     l1 = Lock()23     for i in range(10):24         p1 = Process(target=show_t,args=25         p1.start()26 27     for i in range(10):28         p2 = Process(target=get_t,args=29         p2.start()

进程锁总结:

#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。虽然可以用文件共享数据实现进程间通信,但问题是:1.效率低(共享数据基于文件,而文件是硬盘上的数据)2.需要自己加锁处理#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。队列和管道都是将数据存放于内存中队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。IPC通信机制:IPC是intent-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操作系统都需要有相应的IPC机制,比如Windows上可以通过剪贴板、管道和邮槽等来进行进程间通信,而Linux上可以通过命名共享内容、信号量等来进行进程间通信。Android它也有自己的进程间通信方式,Android建构在Linux基础上,继承了一部分Linux的通信方式。

六.守护进程

之前我们讲的子进程是不会随着主进程的结束而结束,子进程全部执行完之后,程序才结束,那么如果有一天我们的需求是我的主进程结束了,由我主进程创建的那些子进程必须跟着结束,怎么办?守护进程就来了!

    主进程创建守护进程

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError:
daemonic processes are not allowed to have children

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

 1 import time 2 from multiprocessing import Process 3  4 def f1(): 5     time.sleep(3) 6     print('xxxx') 7  8 def f2(): 9     time.sleep(5)10     print('我是普通子进程')11 12 if __name__ == '__main__':13     p1 =Process(target=f1,)14     p1.daemon = True  # 将该进程设置为守护进程,必须写在start之前,意思如果我的主进程代码运行结束了,你这个子进程不管运行到什么地方,都直接结束15     p1.start()16     # 开启一个普通的子进程来验证一下守护进程的结束只和主进程的代码运行结束有关系,而整个程序的结束需要主进程和普通的子进程的代码都运行结束才结束17     p2 = Process(target=f2,)18     p2.start()19     # 等待2号普通进程的结束,才继续执行下面主进程的代码20     # p2.join()21     # 守护进程会跟着父进程的代码运行结束,就结束22     print('我是主进程')

七.队列

1.基础队列

 1 from multiprocessing import Process,Queue 2 q = Queue  # 创建一个队列队形,队列长度为3,先进先出 3 q.put(1) 4 print('>>>>',q.qsize # 返回当前队列的内容长度 5 print 6 q.put(2) 7 print('>>>>',q.qsize 8 q.put(3) 9 print  # q.full()了解,因为这个东西不可靠,满了返回一个True,不满返回一个False10 # print11 # q.put  # 放入数据的时候,如果队列满了,程序会在你put操作的地方阻塞12 try:13     q.put_nowait  # 不阻塞程序,但是会报错queue.Full,可以通过捕获异常来进行其他的操作14 except:15     print('队列满了,玩别的去吧')16 17 print('xxx')18 19 print20 print21 print22 print('是不是空了呀:',q.empty  # q.empty()了解,因为这个东西不可靠,空了返回一个True,不空返回False23 q.put(4)24 print('是不是空了呀:',q.empty   # True或者False,因为q在put数据的时候,有一个细微的延迟回一个False25 print  # 如果队列空了,程序会在这个地方卡主,也就是阻塞程序26 try:27     q.put_nowait()  # queue.Empty28 except:29     print('队列空了,搞别的事情')30 31 print('队列拿完了')

2.队列之队列简单通信

from multiprocessing import Process,Queuedef f1:    q.put('约吗?')if __name__ == '__main__':    q = Queue(3)    p =Process(target=f1,args=    p.start()    son_p_msg = q.get()    print('来自子进程的消息:',son_p_msg)

3.队列之生产者消费者模型

 1 import time 2 from multiprocessing import Process,JoinableQueue 3  4 # 生产者 5 def producer: 6     for i in range(10): 7         time.sleep(0.7) 8         s = '大包子%s号'%i 9         print(s+'新鲜出炉')10         q.put11     q.join() # 就等着task_done()信号的数量.和我put进去的数量想同时,才继续执行12     print('所有任务都被处理了,继续前行吧少年们')13 14 15 def consumer:16     while 1:17         time.sleep(0.2)18         baozi =q.get()19 20         print(baozi+'被吃了')21         q.task_done()  # 给队列发送一个取出的这个任务已经处理完毕的信号22 23 24 if __name__ == '__main__':25     q = JoinableQueue(10)26     pro_p = Process(target=producer,args=27     con_p = Process(target=consumer,args=28     pro_p.start()29     con_p.daemon = True  # 守护进程30     con_p.start()31 32     pro_p.join()33     print('主进程结束')

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图