Python中(多)线程

Python中(多)线程

threading模块

_thread模块提供了操作多个线程的底层原语,多个控制线程共享全局数据空间。threading 模块基于该模块提供了更易用的高级多线程 API。


threading.Thread类

  • 创建线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# ---------- 方式一 ----------
from threading import Thread
import time

def foo():
time.sleep(2)
print('foo run')

if __name__ == '__main__':
t=Thread(target=foo,args=())
t.start()
print('主线程')

# 输出
主线程
foo run

# ---------- 方式二 ----------
from threading import Thread
import time

class foo(Thread):
def __init__(self):
super().__init__()

def run(self):
time.sleep(2)
print('foo run')


if __name__ == '__main__':
t = foo()
t.start()
print('主线程')

# 输出
主线程
foo run

线程本地数据

threading.local

  • 概念
    线程本地数据是特定线程的数据。管理线程本地数据,只需要创建一个 local (或者一个子类型)的实例并在实例中储存属性
    在不同的线程中,实例的值会不同。

  • 示例

1
2
mydata = threading.local()
mydata.x = 1

锁对象

死锁

  • 概念
    是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁。

互斥锁(Lock)

  • 概念
    实现原始锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from threading import Thread,Lock
import time

n=100

def f():
global n
mutex.acquire()
temp = n
time.sleep(0.1)
n = temp-1
mutex.release()

if __name__ == '__main__':
mutex = Lock()
l=[]
for i in range(10):
t=Thread(target = f)
l.append(t)
t.start()

for t in l:
t.join()

print(n)

# 输出
90

递归锁(RLock)

  • 概念
    递归锁是一个可以被同一个线程多次获取的同步基元组件。在内部,它在互斥锁的锁定/非锁定状态上附加了 “所属线程” 和 “递归等级” 的概念。在锁定状态下,某些线程拥有锁 ; 在非锁定状态下, 没有线程拥有它。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from threading import Thread,RLock
import time

mutexB = RLock()
mutexA = RLock()

class MyThread(Thread):
def run(self):
self.f1()
self.f2()

def f1(self):
mutexA.acquire()
print('%s 拿到A锁 '%self.name)
mutexB.acquire()
print('%s 拿到B锁 '%self.name)
mutexB.release()
mutexA.release()

def f2(self):
mutexB.acquire()
print('%s 拿到B锁 ' % self.name)
time.sleep(1)
mutexA.acquire()
print('%s 拿到B锁 ' % self.name)
mutexA.release()
mutexB.release()

if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()

# 输出
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁

Join & Daemon

Join(timeout)

  • 概念
    等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结。

守护线程(Daemon)

  • 概念
    一个表示这个线程守护线程的布尔值。一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。
    初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。
    当没有存活的非守护线程时,整个Python程序才会退出。

条件对象

条件(Condition)

  • 概念
    使线程等待,只有满足某条件时,才释放n个线程。
    条件变量总是与某种类型的锁对象相关联,锁对象可以通过传入获得,或者在缺省的情况下自动创建。当多个条件变量需要共享同一个锁时,传入一个锁很有用。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading

def run(n):
con.acquire()
con.wait()
print("run the thread: %s" % n)
con.release()

if __name__ == '__main__':

con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()

while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release()
print('****')

信号量对象

信号量(Semaphore)

  • 概念
    同进程的一样,Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release()时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import Semaphore
import threading
import time

def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(2)
sm.release()
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(10):
t=threading.Thread(target=func)
t.start()

# 输出
Thread-1 get sm
Thread-2 get sm
Thread-3 get sm
Thread-4 get sm
Thread-5 get sm
Thread-6 get sm
Thread-9 get sm
Thread-8 get sm
Thread-7 get sm
Thread-10 get sm

事件对象

事件(Event)

  • 概念
    同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import time
import random
from threading import Thread
from threading import Event

def conn_mysql():
count=1
while not event.is_set():
if count > 3:
raise TimeoutError('链接超时')
print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
event.wait(0.5)
count+=1
print('<%s>链接成功' %threading.current_thread().getName())

def check_mysql():
print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
time.sleep(random.randint(2,4))
event.set()
if __name__ == '__main__':
event=Event()
conn1=Thread(target=conn_mysql)
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql)

conn1.start()
conn2.start()
check.start()

# 输出
<Thread-1>第1次尝试链接
<Thread-2>第1次尝试链接
<Thread-1>第2次尝试链接
<Thread-2>第2次尝试链接
<Thread-1>第3次尝试链接
<Thread-2>第3次尝试链接
Exception in thread Thread-1:
Traceback (most recent call last):
File "main.py", line 12, in conn_mysql
raise TimeoutError('链接超时')
TimeoutError: 链接超时

定时器对象

定时器(Timer)

  • 概念
    此类表示一个操作应该在等待一定的时间之后运行,相当于一个定时器。 Timer 类是 Thread 类的子类,因此可以像一个自定义线程一样工作。

  • 示例

1
2
3
4
5
def hello():
print("hello, world")

t = Timer(30.0, hello)
t.start() # 30秒后输出'hello, world'

栅栏对象

栅栏(Barrier)

  • 概念
    栅栏类提供一个简单的同步原语,用于应对固定数量的线程需要彼此相互等待的情况。线程调用 wait() 方法后将阻塞,直到所有线程都调用了 wait() 方法。此时所有线程将被同时释放。
    栅栏对象可以被多次使用,但进程的数量不能改变。

  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading, logging
logging.basicConfig(level=logging.INFO, format="%(threadName)s %(message)s")

def work(barrier:threading.Barrier):
logging.info("n_waiting = {}".format(barrier.n_waiting))
bid = barrier.wait()
logging.info("after barrier {}".format(bid)) # 栅栏之后

barrier = threading.Barrier(3)

for x in range(1,4):
threading.Event().wait(1)
threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start()

# 输出
Barrier-1 n_waiting = 0
Barrier-2 n_waiting = 1
Barrier-3 n_waiting = 2
Barrier-3 after barrier 2
Barrier-2 after barrier 1
Barrier-1 after barrier 0