Python中(多)进程

Python中(多)进程

multiprocessing模块

multiprocessing 是一个用于产生进程的包,具有与 threading 模块相似API。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 GIL锁(Global Interpreter Lock)带来的影响。因此,multiprocessing 模块允许程序员充分利用机器上的多核。可运行于 Unix 和 Windows 。


multiprocessing.Process类

  • 创建进程
    在 multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# ---------- 创建进程 ----------
from multiprocessing import Process

def f(name):
print('hello', name)

if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()

# ---------- 显示所涉及的各个进程ID ----------
from multiprocessing import Process
import os

def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):
info('function f')
print('hello', name)

if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()

multiprocessing启动方式

spawn

父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。
可在Unix和Windows上使用。 Windows上的默认设置。3.8版本后,macOS上的默认方式。

fork

父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。
只存在于Unix。Unix中的默认值。

forkserver

程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。
可在Unix平台上使用,支持通过Unix管道传递文件描述符。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# ---------- 启动示例 ----------
import multiprocessing as mp

def foo(q):
q.put('hello')

if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()

# ---------- 获取上下文----------
# 在程序中 set_start_method() 不应该被多次调用。
# 可以使用 get_context() 来获取上下文对象。
# 上下文对象与 multiprocessing 模块具有相同的API,并允许在同一程序中使用多种启动方法。
# ------------------------------

import multiprocessing as mp

def foo(q):
q.put('hello')

if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()

multiprocessing 进程间通信

队列(Queue)

  • 概念
    Queue 类是一个近似 queue.Queue 的克隆。队列是线程和进程安全的。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()

管道(Pipe)

  • 概念
    Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。如果两个进程/线程同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()

multiprocessing 进程间同步

死锁

  • 概念
    是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

互斥锁(Lock)

  • 概念
    实现互斥锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。互斥锁有可能造成死锁问题。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()

if __name__ == '__main__':
lock = Lock()

for num in range(10):
Process(target=f, args=(lock, num)).start()

# 输出
hello world 2
hello world 3
hello world 1
hello world 4
hello world 0
hello world 6
hello world 5
hello world 7
hello world 8
hello world 9

递归锁(RLock)

  • 概念
    递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。递归锁避免了互斥锁有可能会造成死锁的情况。

进程间状态共享

共享内存(Value/Array)

  • 概念
    可以使用 Value 或 Array 将数据存储在共享内存映射中。
    为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process, Value, Array

def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])

# 输出
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

服务进程(Manager)

  • 概念
    由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。
    Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。
    使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))

p = Process(target=f, args=(d, l))
p.start()
p.join()

print(d)
print(l)

# 输出
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

使用工作进程

进程池(Pool)

  • 概念
    Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。
    进程池的方法只能由创建它的进程使用

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
return x*x

if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:

# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))

# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)

# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"

# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])

# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")

print("For the moment, the pool remains available for more work")

# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")

Join & Daemon

Join(timeout)

  • 概念
    使用join会将该方法阻塞,直到timeout时间或调用join的进程终止。一个进程可以被join多次,且进程无法join自身,join自身会导致死锁问题。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
import os
import time

def now():
return str(time.time())


def f():
print('Run child process ' + str(os.getpid()) + ' at ' + str(now()))
time.sleep(3)
print('Stop child process ' + str(os.getpid()) + ' at ' + str(now()))

if __name__ == '__main__':
print ('Parent process ' + str(os.getpid()))
p1 = Process(target=f, args=())
print('Process start at '+ str(now()))
p1.start()
p1.join()
print('Process end at '+ str(now()))

# 输出
# 如果没p1.join(),则主进程会在子进程结束之前执行Process end 打印
Parent process 2
Process start at 1581322409.9711816
Run child process 3 at 1581322409.975377
Stop child process 3 at 1581322412.9764113
Process end at 1581322412.9770644

Daemon

  • 概念
    进程的守护标志,一个布尔值。这必须在 start() 被调用之前设置。当进程退出时,它会尝试终止其所有守护进程子进程。
    不允许守护进程创建子进程。否则,守护进程会在子进程退出时终止其子进程。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import multiprocessing
import time

def f():
print("work start at {}".format(time.time()));
time.sleep(3)
print("work end at {}".format(time.time()));

if __name__ == "__main__":
p = multiprocessing.Process(target = f, args = ())
p.daemon = True # 设置是否为守护进程
p.start()
print("Process end")

# 输出
# 因为守护进程随主进程的结束而结束,所以如果加了p.daemon = True,则只会输出 Process end
Process end
work start at 1581323384.468953 # 如果不加p.daemon = True,则输出该行
work end at 1581323387.4720087 # 如果不加p.daemon = True,则输出该行

信号量

信号量(Semaphore)

  • 概念
    该类实现信号量对象。信号量对象管理一个原子性的计数器,代表 release() 方法的调用次数减去 acquire() 的调用次数再加上一个初始值。如果需要, acquire() 方法将会阻塞直到可以返回而不会使得计数器变成负数。在没有显式给出 value 的值时,默认为1。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing
import time

def f(sem, num):
sem.acquire()
print('{} get semaphores'.format(num))
time.sleep(3)
sem.release()

if __name__ == '__main__':
sem = multiprocessing.Semaphore(5)
for i in range(10):
t = multiprocessing.Process(target=f, args=(sem, i,))
t.start()

# 输出
1 get semaphores
2 get semaphores
3 get semaphores
4 get semaphores
5 get semaphores

事件

事件(Event)

  • 概念
    实现事件对象的类。事件对象管理一个内部标志,调用 set() 方法可将其设置为true。调用 clear() 方法可将其设置为false。调用 wait() 方法将进入阻塞直到标志为true。这个标志初始时为false。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/python
from multiprocessing import Process
from multiprocessing import Event
import time, random

def traffic_light(e):
while True:
if e.is_set():
time.sleep(1)
print('红灯亮')
e.clear()
else:
time.sleep(1)
print('绿灯亮')
e.set()

def car(i, e):
e.wait()
print('{}:车通过...'.format(i))

if __name__ == '__main__':
e = Event()
traffic = Process(target=traffic_light, args=(e,))
traffic.daemon = True
traffic.start()
for i in range(30):
if i % 6:
time.sleep(random.random())
car_obj = Process(target=car, args=(i, e))
car_obj.start()

# 输出
0:车通过...
1:车通过...
2:车通过...
4:车通过...
3:车通过...
5:车通过...
6:车通过...
7:车通过...
8:车通过...

CPython中GIL问题

概念

GIL全称为全局解释器锁,每个线程在执行的过程中都需要先获取GIL,保证同一时刻只有一个线程在运行,目的是解决多线程同时竞争程序中的全局变量而出现的线程安全问题。它并不是python语言的特性,仅仅是由于历史的原因在CPython解释器中难以移除,因为python语言运行环境大部分默认在CPython解释器中。

问题

多核环境下,多线程同一时刻也只有一个线程在运行,这样不仅不能利用多核优势,同时由于线程在多核上交替执行,反而又导致了核心切换时造成的资源消耗,会似的程序的运行更耗时。造成这样现象的原因就是因为CPython解释器中的GIL问题,即同一时刻一个进程只会有一把GIL,当多个线程需要执行任务时就会造成资源不足,进而线程等待GIL锁的释放,导致多核优势不能被利用。

影响

  • 计算密集型
    CPython的GIL对于计算密集型的程序会有较大影响,因为同一时刻一个线程只会执行一个任务,所以对于计算密集型程序而言只能串行等待资源调用。
  • I/O密集型
    对于I/O密集型程序而言,其瓶颈在于I/O,所以CPython的GIL对其影响并没有想象中的大。

解决方案

  • 多进程
    Python中的Multiprocessing库可以帮助实现多进程,从而在多核系统中利用多核资源。
  • Ctypes
    CPython解释器的优势就是其可以调用C语言库来帮助实现程序对于多核系统的充分利用。
  • 协程
    协程又称微线程,是一种用户态的轻量级线程。python3.4之后内置了asyncio标准库,实现了内置协程,同时也可以通过第三方库类似Greenlet、Gevent实现协程。、