目录


摘要

Condition(条件变量)可以看成是一个具有waiting池的RLock说明1(或Lock)。
使用条件变量的流程通常是:


# coding: utf8

from threading import Condition, Thread


class Storage(object):
    def __init__(self, max_size):
        self._max_size = max_size
        self._storage = []

    def is_full(self):
        return len(self._storage) == self._max_size

    def append(self, element):
        self._storage.append(element)

    def is_empty(self):
        return len(self._storage) == 0

    def pop(self):
        return self._storage.pop(0)


# 条件变量
condition = Condition()
max_count = 100
# 共享数据结构
storage = Storage(10)


def producer():
    global condition, max_count, storage
    current_count = 0

    while current_count < max_count:
        # 1,获取条件变量的底层锁
        condition.acquire()

        # 2,如果条件不满足,则进入waiting池,等待被唤醒
        if storage.is_full():
            # 需要特别说明的是:
            # 1,线程在进入waiting池之后,会释放条件变量的底层锁
            # 2,线程在被唤醒或超时之后,会获取条件变量的底层锁,
            #  + 也就是说,一旦离开wait()方法,那么该线程就持有了底层锁
            print("produce thread enter into waiting pool")
            condition.wait()
            print("produce thread is waken up")
            # 2.1,被唤醒之后,继续进行判断
        # 3,如果条件满足,则更新共享数据
        else:
            print("produce: %d" % current_count)
            storage.append(current_count)
            current_count = current_count + 1
            # 3.1,唤醒waiting池中的线程
            condition.notify()
        # 释放条件变量的底层锁
        condition.release()


def consumer():
    global condition, max_count, storage
    current_count = 0

    while current_count < max_count:
        condition.acquire()
        if storage.is_empty():
            print("consume thread enter into waiting pool")
            condition.wait()
            print("consume thread is waken up")
        else:
            element = storage.pop()
            print("consume: %d" % element)
            current_count = current_count + 1
            condition.notify()
        condition.release()


consumer_thread = Thread(target=consumer)
consumer_thread.start()

producer_thread = Thread(target=producer)
producer_thread.start()

consumer_thread.join()
producer_thread.join()

主要源码

threading模块的源代码在:https://github.com/python/cpython/blob/2.7/Lib/threading.py
接下来看与Condition相关的代码:


def Condition(*args, **kwargs):
    return _Condition(*args, **kwargs)

class _Condition(_Verbose):
    # 条件变量允许一个或多个线程进入到等待状态,直到它们被其他线程唤醒
    """Condition variables allow one or more threads to wait until they are
       notified by another thread.
    """

    def __init__(self, lock=None, verbose=None):
        ...
        if lock is None:
            lock = RLock()
        # Condition的底层锁,默认是RLock
        self.__lock = lock
        # 将Condition的acquire()和release()方法设置为底层锁的acquire()和release()方法
        self.acquire = lock.acquire
        self.release = lock.release

        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass

        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        # waiting池
        self.__waiters = []

    def _is_owned(self):
        # 如果当前线程持有锁,则返回True,否则返回False
        # 说明2
        # 其过程是:使用非阻塞的方式获取锁,如果获取成功,则表明当前线程,
        # + 没有持有锁,那么释放获取到的锁,并返回False;
        # + + 否则,认为当前线程持有锁,返回True。

        # 只有当底层锁没有_is_owned()方法时,才会用这种方式判断当前线程是否拥有底层锁
        # 因为RLock具有_is_owned()方法,所以,它的对象不会使用这里的_is_owned()方法
        if self.__lock.acquire(0):
            self.__lock.release()
            return False
        else:
            return True

    # Condition也支持上下文管理。
    # + 当进入到Condtion对象时,会获取底层锁
    # + 当离开到Condtion对象时,会释放底层锁
    def __enter__(self):
        return self.__lock.__enter__()

    def __exit__(self, *args):
        return self.__lock.__exit__(*args)


    # 对于RLock而言,调用release()方法,并不一定会真正的释放锁。
    # + 因此,它提供了_release_save()方法,该方法会真正的释放锁,
    # + + 并且将RLock内部维护的状态,返回给调用方。
    # 之后,线程再次获取到底层锁之后,再将状态重置回去
    # RLock内部会维护两个状态:owner-拥有锁的线程的id,count-该线程获取了多少次锁

    # 只有底层锁没有_release_save()和_acquire_restore()方法时,才用下面的实现
    def _release_save(self):
        self.__lock.release()           # No state to save

    def _acquire_restore(self, x):
        self.__lock.acquire()           # Ignore saved state


    def wait(self, timeout=None):
        # 1,如果当前线程,没有获取到底层锁,那么抛出异常
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")

        # 2,创建一个锁对象,并获取它,然后把它放到waiting池
        waiter = _allocate_lock()
        waiter.acquire()
        self.__waiters.append(waiter)

        # 3,释放底层锁,并保存锁对象的内部状态
        saved_state = self._release_save()
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                # 3.1,如果timeout是None,那么再次以阻塞的方式获取锁对象
                #  + 因此当前线程已经获取了一次该锁,因此当前线程会阻塞,直到其他线程释放该锁
                waiter.acquire()
                if __debug__:
                    self._note("%s.wait(): got it", self)
            else:
                # 3.2,如果timeout不是None,那么重复下面的流程:
                #  + 1,以非阻塞方式获取锁
                #  + + 1.1,如果获取成功,说明其他线程释放了该锁,那么退出循环
                #  + + 1.2,如果获取失败,那么看是否达到了超时时间,如果达到了,那么退出循环;否则,继续
                #  + 2,sleep一小段时间,然后回到步骤1
                #  + + 每次循环sleep的时长,是从0.0005秒开始,指数增长,最多增长到0.05秒
                #  + + + 第一次是:0.0005,第二次是:0.001,...
                endtime = _time() + timeout
                delay = 0.0005 # 500 us -> initial delay of 1 ms
                while True:
                    gotit = waiter.acquire(0)
                    if gotit:
                        break
                    remaining = endtime - _time()
                    if remaining <= 0:
                        break
                    delay = min(delay * 2, remaining, .05)
                    _sleep(delay)
                if not gotit:
                    ...
                    try:
                        # 3.3,如果因为超时,而不是被唤醒,退出的wait(),那么将锁从waiting池中移除
                        # + 因为当线程被唤醒的时候,调用notify的线程会将锁从waiting池中移除
                        self.__waiters.remove(waiter)
                    except ValueError:
                        pass
                else:
                    ...
        finally:
            # 4,获取底层锁,并重置锁对象的内部状态
            self._acquire_restore(saved_state)

    def notify(self, n=1):
        # 1,如果当前线程,没有获取到底层锁,那么抛出异常
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")

        __waiters = self.__waiters
        waiters = __waiters[:n]
        if not waiters:
            if __debug__:
                self._note("%s.notify(): no waiters", self)
            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")

        # 2,唤醒waiting池中前n个线程,并将他们从waiting池中移除
        for waiter in waiters:
            waiter.release()
            try:
                __waiters.remove(waiter)
            except ValueError:
                pass

    def notifyAll(self):
        self.notify(len(self.__waiters))

    notify_all = notifyAll

说明