Python中(多)线程 threading模块 _thread模块提供了操作多个线程的底层原语,多个控制线程共享全局数据空间。threading 模块基于该模块提供了更易用的高级多线程 API。
threading.Thread类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from threading import Threadimport timedef foo () : time.sleep(2 ) print('foo run' ) if __name__ == '__main__' : t=Thread(target=foo,args=()) t.start() print('主线程' ) 主线程 foo run from threading import Threadimport timeclass foo (Thread) : def __init__ (self) : super().__init__() def run (self) : time.sleep(2 ) print('foo run' ) if __name__ == '__main__' : t = foo() t.start() print('主线程' ) 主线程 foo run
线程本地数据 threading.local
1 2 mydata = threading.local() mydata.x = 1
锁对象 死锁
概念 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁。
互斥锁(Lock)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from threading import Thread,Lockimport timen=100 def f () : global n mutex.acquire() temp = n time.sleep(0.1 ) n = temp-1 mutex.release() if __name__ == '__main__' : mutex = Lock() l=[] for i in range(10 ): t=Thread(target = f) l.append(t) t.start() for t in l: t.join() print(n) 90
递归锁(RLock)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from threading import Thread,RLockimport timemutexB = RLock() mutexA = RLock() class MyThread (Thread) : def run (self) : self.f1() self.f2() def f1 (self) : mutexA.acquire() print('%s 拿到A锁 ' %self.name) mutexB.acquire() print('%s 拿到B锁 ' %self.name) mutexB.release() mutexA.release() def f2 (self) : mutexB.acquire() print('%s 拿到B锁 ' % self.name) time.sleep(1 ) mutexA.acquire() print('%s 拿到B锁 ' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__' : for i in range(10 ): t = MyThread() t.start() Thread-1 拿到A锁 Thread-1 拿到B锁 Thread-1 拿到B锁 Thread-2 拿到A锁
Join & Daemon Join(timeout)
概念 等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结。
守护线程(Daemon)
概念 一个表示这个线程守护线程的布尔值。一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。 初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。 当没有存活的非守护线程时,整个Python程序才会退出。
条件对象 条件(Condition)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import threadingdef run (n) : con.acquire() con.wait() print("run the thread: %s" % n) con.release() if __name__ == '__main__' : con = threading.Condition() for i in range(10 ): t = threading.Thread(target=run, args=(i,)) t.start() while True : inp = input('>>>' ) if inp == 'q' : break con.acquire() con.notify(int(inp)) con.release() print('****' )
信号量对象 信号量(Semaphore)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from threading import Semaphoreimport threadingimport timedef func () : sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(2 ) sm.release() if __name__ == '__main__' : sm=Semaphore(5 ) for i in range(10 ): t=threading.Thread(target=func) t.start() Thread-1 get sm Thread-2 get sm Thread-3 get sm Thread-4 get sm Thread-5 get sm Thread-6 get sm Thread-9 get sm Thread-8 get sm Thread-7 get sm Thread-10 get sm
事件对象 事件(Event)
概念 同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import threadingimport timeimport randomfrom threading import Threadfrom threading import Eventdef conn_mysql () : count=1 while not event.is_set(): if count > 3 : raise TimeoutError('链接超时' ) print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count)) event.wait(0.5 ) count+=1 print('<%s>链接成功' %threading.current_thread().getName()) def check_mysql () : print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2 ,4 )) event.set() if __name__ == '__main__' : event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start() <Thread-1 >第1 次尝试链接 <Thread-2 >第1 次尝试链接 <Thread-1 >第2 次尝试链接 <Thread-2 >第2 次尝试链接 <Thread-1 >第3 次尝试链接 <Thread-2 >第3 次尝试链接 Exception in thread Thread-1 : Traceback (most recent call last): File "main.py" , line 12 , in conn_mysql raise TimeoutError('链接超时' ) TimeoutError: 链接超时
定时器对象 定时器(Timer)
1 2 3 4 5 def hello () : print("hello, world" ) t = Timer(30.0 , hello) t.start()
栅栏对象 栅栏(Barrier)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threading, logginglogging.basicConfig(level=logging.INFO, format="%(threadName)s %(message)s" ) def work (barrier:threading.Barrier) : logging.info("n_waiting = {}" .format(barrier.n_waiting)) bid = barrier.wait() logging.info("after barrier {}" .format(bid)) barrier = threading.Barrier(3 ) for x in range(1 ,4 ): threading.Event().wait(1 ) threading.Thread(target=work,args=(barrier,),name="Barrier-{}" .format(x)).start() Barrier-1 n_waiting = 0 Barrier-2 n_waiting = 1 Barrier-3 n_waiting = 2 Barrier-3 after barrier 2 Barrier-2 after barrier 1 Barrier-1 after barrier 0