Python中协程

Python中协程

asyncio模块

概念

asyncio 是用来编写 并发 代码的库,使用 async/await 语法。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

作用

  • 提供高层级API

    • 并发地运行 Python 协程并对其执行过程实现完全控制
    • 执行 网络 IO 和 IPC
    • 控制 子进程
    • 通过 队列 实现分布式任务
    • 同步 并发代码
  • 提供低层级API

    • 创建和管理 事件循环,以提供异步 API 用于 网络化, 运行 子进程,处理 OS 信号 等等
    • 使用 transports 实现高效率协议
    • 通过 async/await 语法 桥接 基于回调的库和代码

asyncio模块之协程

协程

  • 概念
    使用async / await语法声明的协程是编写异步应用程序的首选方式。

  • 三种运行协程的机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# --------------------------------------------------------------------------------
# 一、asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数
# --------------------------------------------------------------------------------
import asyncio

async def main():
print('hello')
await asyncio.sleep(1)
print('world')

asyncio.run(main())

# 输出
hello
world

# --------------------------------------------------------------------------------
# 二、等待一个协程
# --------------------------------------------------------------------------------
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# 输出
started at 17:13:52
hello
world
finished at 17:13:55

# --------------------------------------------------------------------------------
# 三、asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。
# --------------------------------------------------------------------------------
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))

task2 = asyncio.create_task(
say_after(2, 'world'))

print(f"started at {time.strftime('%X')}")

# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2

print(f"finished at {time.strftime('%X')}")

# 输出
started at 17:14:32
hello
world
finished at 17:14:34

可等待对象

  • 概念
    如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。

  • 类型
    可等待 对象有三种主要类型: 协程(包括协程函数、协程对象), 任务Future

并发协程

  • 语法
    asyncio.gather(*aws, loop=None, return_exceptions=False)

    • 并发 运行 aws 序列中的 可等待对象。
    • 如果 aws 中的某个可等待对象为协程,它将自动作为一个任务加入日程。
    • 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。
  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")

async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)

asyncio.run(main())

# 输出
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

超时

  • 语法
    asyncio.wait_for(aw, timeout, *, loop=None)

    • 等待 aw 可等待对象 完成,指定 timeout 秒数后超时。
    • 如果 aw 是一个协程,它将自动作为任务加入日程。
    • timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。
  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')

async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')

asyncio.run(main())

# 输出
timeout!

内省

  • 语法
    asyncio.current_task(loop=None):返回当前运行的 Task 实例,如果没有正在运行的任务则返回 None。
    asyncio.all_tasks(loop=None):返回事件循环所运行的未完成的 Task 对象的集合。

Task 对象

  • 概念
    一个与 Future 类似 的对象,可运行 Python 协程。非线程安全。
    Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。

支持协程的模块Greenlet/Gevent

Greenlet

  • 概念
    greenlet提供了一种简单的切换模式,当任务遇到I/O时可以出发阻塞。但是greenlet没有解决切换任务的目的。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# greenlet实现切换
pip install greenlet

from greenlet import greenlet

def foo(x):
print('{} at foo_1'.format(x))
g2.switch('Elijah')
print('{} at foo_2'.format(x))
g2.switch()

def bar(y):
print('{} at bar_1'.format(y))
g1.switch()
print('{} at bar_2'.format(y))

g1 = greenlet(foo)
g2 = greenlet(bar)

g1.switch('yang')

Gevent

  • 概念
    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。
    Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

  • 示例一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ---------- 主动切换 ----------
pip install gevent
import gevent

def foo(x):
print('{} at foo_1'.format(x))
gevent.sleep(2)
print('{} at foo_2'.format(x))

def bar(y):
print('{} at bar_1'.format(y))
gevent.sleep(1)
print('{} at bar_2'.format(y))

g1 = gevent.spawn(foo, 'yang')
g2 = gevent.spawn(bar, 'Elijah')
gevent.joinall([g1,g2])
  • 示例二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# ----------服务端 ----------
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)

def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()

if __name__ == '__main__':
server('127.0.0.1',8080)

# ----------客户端 ----------
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
msg=input('>>: ').strip()
if not msg:continue

client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

# ----------多个客户端并发 ----------
from threading import Thread
from socket import *
import threading

def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
c.connect((server_ip,port))

count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()

猴子补丁

名字的由来

  • 说法一
    这个词原来为Guerrilla Patch,杂牌军、游击队,说明这部分不是原装的,在英文里guerilla发音和gorllia(猩猩)相似,再后来就写了monkey(猴子)。
  • 说法二
    由于这种方式将原来的代码弄乱了(messing with it),在英文里叫monkeying about(顽皮的),所以叫做Monkey Patch。

作用

在程序运行是,将原本的模块功能替换成另外模块的功能