Python 线程模块thread和threading

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。

如果对进程、线程概念还不是很了解,建议看下CPU、超线程与进程、线程

Python的标准库提供了两个模块:threadthreading,前者是低级模块,后者是高级模块。threadingthread进行了封装,绝大多数情况下,我们只需要使用threading就可以了,这也是Python官方推荐。

Python 3中,thread已经更名成_thread。

thread

总结几点:

  1. 调用start_new_thread方法即启动线程,启动后MainThread必须等待,否则启动的子线程旋即退出。
  2. thread.exit()thread.interrupt_main()都是通过raise异常实现的。
  3. lock.acquire([waitflag])方法,当waitflag等于0时,表示非阻塞获取锁,即获取失败也立即返回。所以下面的demo中,worker方法中的lock.acquire(0)改为lock.acquire(1)时,运行时间将比现在多10s。
  4. 锁的releaseacquire可以不是同一个线程。
  5. Py3获取锁的方法lock.acquire新增了timeout参数,并且设置的值不能大于TIMEOUT_MAX

以下demo在Python2.7和Python3.5测试通过。

  1. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
  1. # coding=utf-8
    """
    Demo running on Py2 and Py3
    """

    import sys
    import time
    from datetime import datetime
    import traceback

    try:
    import thread
    except ImportError:
    import _thread as thread


    def worker(run_num=3, internal=2, max_num=11, lock=None):
    """
    干活的worker
    :param run_num:
    :param internal:
    :param max_num:
    :param lock:
    :return:
    """

    if isinstance(lock, thread.LockType):
    """
    Py2
    lock.acquire([waitflag])
    无参数时, 无条件获取锁, 无法获取时, 会被阻塞,直到锁被释放
    有参数时, waitflag = 0 时,表示只有在不需要等待的情况下才获取锁, 非零情况与上面相同
    返回值 : 获得锁成功返回True, 获得锁失败返回False
    Py3
    lock.acquire(waitflag=1, timeout=-1)
    waitflag=0时,可以设置阻塞等待锁的超时时间(设置的值不能超过_thread.TIMEOUT_MAX)
    """
    if not lock.acquire(0):
    print(
    "Thread: {0} 获取锁失败!! 当前时间: {1}".format(
    thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    )
    )
    return

    for i in range(run_num):
    if i >= max_num:
    print("Thread {0} 运行次数已达上限({1}),即将退出".format(thread.get_ident(), max_num))
    thread.interrupt_main() # raise KeyboardInterrupt
    thread.exit() # raise SystemExit
    print(
    "Thread: {0}, 当前时间: {1}".format(
    thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    )
    )
    time.sleep(internal)

    if isinstance(lock, thread.LockType) and lock.locked():
    # 如果锁的状态是unlock,调用lock.release()会抛RuntimeError异常
    lock.release()


    def demo_no_lock():
    """
    未使用锁的demo
    :return:
    """

    work1_ident = thread.start_new_thread(worker, ())
    print("Thread work1 identifier: {0}".format(work1_ident))

    work2_ident = thread.start_new_thread(worker, (), dict(run_num=15, internal=1))
    print("Thread work2 identifier: {0}".format(work2_ident))

    # sleep时间必须足够长,等待子线程结束
    time.sleep(20)


    def demo_lock():
    """
    使用锁的demo
    :return:
    """

    # 获取2个互斥锁
    locks = map(lambda _: thread.allocate_lock(), range(2))
    # 兼容py3,py3中map函数返回值为迭代器iterator
    if not isinstance(locks, list):
    locks = [i for i in locks]

    work1_ident = thread.start_new_thread(worker, (), dict(lock=locks[0]))
    print("Thread work1 identifier: {0}".format(work1_ident))

    work2_ident = thread.start_new_thread(
    worker, (), dict(run_num=10, internal=1, lock=locks[1])
    )
    print("Thread work2 identifier: {0}".format(work2_ident))

    # work3与work2共用一把锁,所以work3会等work2运行结束释放锁后才开始执行
    work3_ident = thread.start_new_thread(
    worker, (), dict(run_num=10, internal=1, lock=locks[1])
    )
    print("Thread work3 identifier: {0}".format(work3_ident))

    wait = True
    while wait:
    """
    lock.locked()
    返回值 : 如果锁已经被某个线程获取,返回True, 否则为False
    """
    if any(map(lambda x: x.locked(), locks)):
    print("MainThread 还有线程未结束, 继续等待")
    else:
    wait = False
    time.sleep(1)


    if __name__ == "__main__":

    begin_time = time.time()

    version = "unlock"
    if len(sys.argv) > 1:
    version = sys.argv[1]
    try:
    # 未加锁版本
    if version == "unlock":
    demo_no_lock()

    # 加锁版本
    elif version == "lock":
    demo_lock()

    except KeyboardInterrupt:
    # 处理thread.interrupt_main()抛出的异常
    print(traceback.format_exc())

    print(
    "MainThread 运行结束, 耗时: {0}(ms)".format(
    round((time.time() - begin_time) * 1000, 2)
    )
    )

threading

  1. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
  1. # coding=utf-8

    """
    Demo running on Py2
    """

    import sys
    import time
    import traceback
    from datetime import datetime
    import thread
    import threading
    import logging

    logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


    def worker(run_num=3, internal=2, max_num=11, lock=None):
    """
    干活的worker
    """

    if isinstance(lock, thread.LockType) or isinstance(lock, threading._RLock):
    if not lock.acquire(True):
    print(
    "Thread: {0} 获取锁失败!! 当前时间: {1}".format(
    thread.get_ident(), datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    )
    )
    return

    current_thread = threading.current_thread()
    for i in range(run_num):
    print(
    "Thread: {0}-{1}, 当前时间: {2}".format(
    current_thread.name,
    current_thread.ident,
    datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
    )
    )
    time.sleep(internal)

    if isinstance(lock, thread.LockType) or isinstance(lock, threading._RLock):
    # 如果锁的状态是unlock,调用lock.release()会抛RuntimeError异常
    lock.release()


    def demo_no_lock():
    """
    未使用锁的demo
    :return:
    """

    work1 = threading.Thread(target=worker, name="work1", args=(5, 1))
    work1.setDaemon(True)

    work2 = threading.Thread(
    target=worker, name="work2", kwargs=dict(run_num=5, internal=3)
    )
    work2.setDaemon(False)

    print("Thread work1 start running...")
    work1.start()

    print("Thread work2 start running...")
    work2.start()

    """
    threading线程可以设置是否为守护线程
    如果设置为守护线程,才需要sleep一段时间,等待子线程结束
    """
    time.sleep(8)


    def demo_lock():
    """
    使用锁的demo
    :return:
    """
    # 获取2个互斥锁
    locks = map(lambda _: threading.Lock(), range(2))

    work3 = threading.Thread(target=worker, name="work3", kwargs=dict(lock=locks[0]))

    work4 = threading.Thread(
    target=worker, name="work4", kwargs=dict(run_num=10, internal=1, lock=locks[1])
    )

    # work4与work5共用一把锁,所以work5会等work4运行结束释放锁后才开始执行
    work5 = threading.Thread(
    target=worker, name="work5", kwargs=dict(run_num=10, internal=1, lock=locks[1])
    )

    print("Thread work3 start running...")
    work3.start()

    print("Thread work4 start running...")
    work4.start()

    print("Thread work5 start running...")
    work5.start()

    work3.join()
    work4.join()
    work5.join()


    def demo_monitor():

    work6 = threading.Thread(target=worker, name="work6", kwargs=dict(run_num=5))
    work6.daemon = True
    work6.start()

    while work6.is_alive():
    print("Thread work6 is still running...")
    time.sleep(1)


    class CountDownThread(threading.Thread):

    def __init__(self, *args, **kwargs):
    self.times = kwargs.pop("times", 10)
    super(CountDownThread, self).__init__(*args, **kwargs)

    def run(self):
    while self.times > 0:
    print(
    "Thread {0} T-minus {1} seconds".format(
    threading.current_thread().name, self.times
    )
    )
    self.times -= 1
    time.sleep(1)


    def demo_subclass():
    work7 = CountDownThread(name="work7", times=8)
    work7.start()


    if __name__ == "__main__":

    begin_time = time.time()

    version = "unlock"
    if len(sys.argv) > 1:
    version = sys.argv[1]

    try:
    # 未加锁版本
    if version == "unlock":
    demo_no_lock()

    # 加锁版本
    elif version == "lock":
    demo_lock()

    # monitor版本
    elif version == "monitor":
    demo_monitor()

    # subclass版本
    elif version == "subclass":
    demo_subclass()

    except:
    print(traceback.format_exc())

    print(
    "MainThread 运行结束, 耗时: {0}(ms)".format(
    round((time.time() - begin_time) * 1000, 2)
    )
    )

GIL

既然用到Python的多线程,必须得知道GIL的存在。

GIL(Global Interpreter Lock)

启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有160%,也就是使用不到两核。

即使启动100个线程,使用率也就170%左右,仍然不到两核。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

参考资料:
Python 2 thread 官方文档
Python 3 _thread 官方文档
Python 3 _thread 中文译文
Pythoh 2 threading 官方文档
Pythoh 3 threading 官方文档
Pythoh 3 threading 中文译文
Python 3 cookbook 并发编程
深入 GIL: 如何寫出快速且 thread-safe 的 Python – Grok the GIL: How to write fast and thread-safe Python

多线程
Python多线程之threading Event
【Python】threading.Event模块控制多线程
Launching parallel tasks
python多线程间通信机制-event