Python 线程模块thread和threading

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。

如果对进程、线程概念还不是很了解,建议看下CPU、超线程与进程、线程

Python的标准库提供了两个模块:threadthreading,前者是低级模块,后者是高级模块。threadingthread进行了封装,绝大多数情况下,我们只需要使用threading就可以了,这也是Python官方推荐。

Python 3中,thread已经更名成_thread。

thread

总结几点:

  1. 调用start_new_thread方法即启动线程,启动后MainThread必须等待,否则启动的子线程旋即退出。
  2. thread.exit()thread.interrupt_main()都是通过raise异常实现的。
  3. lock.acquire([waitflag])方法,当waitflag等于0时,表示非阻塞获取锁,即获取失败也立即返回。所以下面的demo中,worker方法中的lock.acquire(0)改为lock.acquire(1)时,运行时间将比现在多10s。
  4. 锁的releaseacquire可以不是同一个线程。
  5. Py3获取锁的方法lock.acquire新增了timeout参数,并且设置的值不能大于TIMEOUT_MAX

以下demo在Python2.7和Python3.5测试通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# coding=utf-8
"""
Demo running on Py2 and Py3
"""

import sys
import time
from datetime import datetime
import traceback

try:
import thread
except ImportError:
import _thread as thread


def worker(run_num=3, internal=2, max_num=11, lock=None):
"""
干活的worker
:param run_num:
:param internal:
:param max_num:
:param lock:
:return:
"""

if isinstance(lock, thread.LockType):
"""
Py2
lock.acquire([waitflag])
无参数时, 无条件获取锁, 无法获取时, 会被阻塞,直到锁被释放
有参数时, waitflag = 0 时,表示只有在不需要等待的情况下才获取锁, 非零情况与上面相同
返回值 : 获得锁成功返回True, 获得锁失败返回False
Py3
lock.acquire(waitflag=1, timeout=-1)
waitflag=0时,可以设置阻塞等待锁的超时时间(设置的值不能超过_thread.TIMEOUT_MAX)
"""
if not lock.acquire(0):
print(
"Thread: {0} 获取锁失败!! 当前时间: {1}".format(
thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
)
)
return

for i in range(run_num):
if i >= max_num:
print("Thread {0} 运行次数已达上限({1}),即将退出".format(thread.get_ident(), max_num))
thread.interrupt_main() # raise KeyboardInterrupt
thread.exit() # raise SystemExit
print(
"Thread: {0}, 当前时间: {1}".format(
thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
)
)
time.sleep(internal)

if isinstance(lock, thread.LockType) and lock.locked():
# 如果锁的状态是unlock,调用lock.release()会抛RuntimeError异常
lock.release()


def demo_no_lock():
"""
未使用锁的demo
:return:
"""

work1_ident = thread.start_new_thread(worker, ())
print("Thread work1 identifier: {0}".format(work1_ident))

work2_ident = thread.start_new_thread(worker, (), dict(run_num=15, internal=1))
print("Thread work2 identifier: {0}".format(work2_ident))

# sleep时间必须足够长,等待子线程结束
time.sleep(20)


def demo_lock():
"""
使用锁的demo
:return:
"""

# 获取2个互斥锁
locks = map(lambda _: thread.allocate_lock(), range(2))
# 兼容py3,py3中map函数返回值为迭代器iterator
if not isinstance(locks, list):
locks = [i for i in locks]

work1_ident = thread.start_new_thread(worker, (), dict(lock=locks[0]))
print("Thread work1 identifier: {0}".format(work1_ident))

work2_ident = thread.start_new_thread(
worker, (), dict(run_num=10, internal=1, lock=locks[1])
)
print("Thread work2 identifier: {0}".format(work2_ident))

# work3与work2共用一把锁,所以work3会等work2运行结束释放锁后才开始执行
work3_ident = thread.start_new_thread(
worker, (), dict(run_num=10, internal=1, lock=locks[1])
)
print("Thread work3 identifier: {0}".format(work3_ident))

wait = True
while wait:
"""
lock.locked()
返回值 : 如果锁已经被某个线程获取,返回True, 否则为False
"""
if any(map(lambda x: x.locked(), locks)):
print("MainThread 还有线程未结束, 继续等待")
else:
wait = False
time.sleep(1)


if __name__ == "__main__":

begin_time = time.time()

version = "unlock"
if len(sys.argv) > 1:
version = sys.argv[1]
try:
# 未加锁版本
if version == "unlock":
demo_no_lock()

# 加锁版本
elif version == "lock":
demo_lock()

except KeyboardInterrupt:
# 处理thread.interrupt_main()抛出的异常
print(traceback.format_exc())

print(
"MainThread 运行结束, 耗时: {0}(ms)".format(
round((time.time() - begin_time) * 1000, 2)
)
)

threading

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# coding=utf-8

"""
Demo running on Py2
"""

import sys
import time
import traceback
from datetime import datetime
import thread
import threading
import logging

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


def worker(run_num=3, internal=2, max_num=11, lock=None):
"""
干活的worker
"""

if isinstance(lock, thread.LockType) or isinstance(lock, threading._RLock):
if not lock.acquire(True):
print(
"Thread: {0} 获取锁失败!! 当前时间: {1}".format(
thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
)
)
return

current_thread = threading.current_thread()
for i in range(run_num):
print(
"Thread: {0}-{1}, 当前时间: {2}".format(
current_thread.name,
current_thread.ident,
datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
)
)
time.sleep(internal)

if isinstance(lock, thread.LockType) or isinstance(lock, threading._RLock):
# 如果锁的状态是unlock,调用lock.release()会抛RuntimeError异常
lock.release()


def demo_no_lock():
"""
未使用锁的demo
:return:
"""

work1 = threading.Thread(target=worker, name="work1", args=(5, 1))
work1.setDaemon(True)

work2 = threading.Thread(
target=worker, name="work2", kwargs=dict(run_num=5, internal=3)
)
work2.setDaemon(False)

print("Thread work1 start running...")
work1.start()

print("Thread work2 start running...")
work2.start()

"""
threading线程可以设置是否为守护线程
如果设置为守护线程,才需要sleep一段时间,等待子线程结束
"""
time.sleep(8)


def demo_lock():
"""
使用锁的demo
:return:
"""
# 获取2个互斥锁
locks = map(lambda _: threading.Lock(), range(2))

work3 = threading.Thread(target=worker, name="work3", kwargs=dict(lock=locks[0]))

work4 = threading.Thread(
target=worker, name="work4", kwargs=dict(run_num=10, internal=1, lock=locks[1])
)

# work4与work5共用一把锁,所以work5会等work4运行结束释放锁后才开始执行
work5 = threading.Thread(
target=worker, name="work5", kwargs=dict(run_num=10, internal=1, lock=locks[1])
)

print("Thread work3 start running...")
work3.start()

print("Thread work4 start running...")
work4.start()

print("Thread work5 start running...")
work5.start()

work3.join()
work4.join()
work5.join()


def demo_monitor():

work6 = threading.Thread(target=worker, name="work6", kwargs=dict(run_num=5))
work6.daemon = True
work6.start()

while work6.is_alive():
print("Thread work6 is still running...")
time.sleep(1)


class CountDownThread(threading.Thread):

def __init__(self, *args, **kwargs):
self.times = kwargs.pop("times", 10)
super(CountDownThread, self).__init__(*args, **kwargs)

def run(self):
while self.times > 0:
print(
"Thread {0} T-minus {1} seconds".format(
threading.current_thread().name, self.times
)
)
self.times -= 1
time.sleep(1)


def demo_subclass():
work7 = CountDownThread(name="work7", times=8)
work7.start()


if __name__ == "__main__":

begin_time = time.time()

version = "unlock"
if len(sys.argv) > 1:
version = sys.argv[1]

try:
# 未加锁版本
if version == "unlock":
demo_no_lock()

# 加锁版本
elif version == "lock":
demo_lock()

# monitor版本
elif version == "monitor":
demo_monitor()

# subclass版本
elif version == "subclass":
demo_subclass()

except:
print(traceback.format_exc())

print(
"MainThread 运行结束, 耗时: {0}(ms)".format(
round((time.time() - begin_time) * 1000, 2)
)
)

GIL

既然用到Python的多线程,必须得知道GIL的存在。

GIL(Global Interpreter Lock)

启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有160%,也就是使用不到两核。

即使启动100个线程,使用率也就170%左右,仍然不到两核。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

参考资料:
Python 2 thread 官方文档
Python 3 _thread 官方文档
Python 3 _thread 中文译文
Pythoh 2 threading 官方文档
Pythoh 3 threading 官方文档
Pythoh 3 threading 中文译文
Python 3 cookbook 并发编程
深入 GIL: 如何寫出快速且 thread-safe 的 Python – Grok the GIL: How to write fast and thread-safe Python

多线程
Python多线程之threading Event
【Python】threading.Event模块控制多线程
Launching parallel tasks
python多线程间通信机制-event