Condition(条件变量)可以看成是一个具有waiting池的RLock说明1(或Lock)。
使用条件变量的流程通常是:
# coding: utf8
from threading import Condition, Thread
class Storage(object):
def __init__(self, max_size):
self._max_size = max_size
self._storage = []
def is_full(self):
return len(self._storage) == self._max_size
def append(self, element):
self._storage.append(element)
def is_empty(self):
return len(self._storage) == 0
def pop(self):
return self._storage.pop(0)
# 条件变量
condition = Condition()
max_count = 100
# 共享数据结构
storage = Storage(10)
def producer():
global condition, max_count, storage
current_count = 0
while current_count < max_count:
# 1,获取条件变量的底层锁
condition.acquire()
# 2,如果条件不满足,则进入waiting池,等待被唤醒
if storage.is_full():
# 需要特别说明的是:
# 1,线程在进入waiting池之后,会释放条件变量的底层锁
# 2,线程在被唤醒或超时之后,会获取条件变量的底层锁,
# + 也就是说,一旦离开wait()方法,那么该线程就持有了底层锁
print("produce thread enter into waiting pool")
condition.wait()
print("produce thread is waken up")
# 2.1,被唤醒之后,继续进行判断
# 3,如果条件满足,则更新共享数据
else:
print("produce: %d" % current_count)
storage.append(current_count)
current_count = current_count + 1
# 3.1,唤醒waiting池中的线程
condition.notify()
# 释放条件变量的底层锁
condition.release()
def consumer():
global condition, max_count, storage
current_count = 0
while current_count < max_count:
condition.acquire()
if storage.is_empty():
print("consume thread enter into waiting pool")
condition.wait()
print("consume thread is waken up")
else:
element = storage.pop()
print("consume: %d" % element)
current_count = current_count + 1
condition.notify()
condition.release()
consumer_thread = Thread(target=consumer)
consumer_thread.start()
producer_thread = Thread(target=producer)
producer_thread.start()
consumer_thread.join()
producer_thread.join()
threading模块的源代码在:https://github.com/python/cpython/blob/2.7/Lib/threading.py。
接下来看与Condition相关的代码:
def Condition(*args, **kwargs):
return _Condition(*args, **kwargs)
class _Condition(_Verbose):
# 条件变量允许一个或多个线程进入到等待状态,直到它们被其他线程唤醒
"""Condition variables allow one or more threads to wait until they are
notified by another thread.
"""
def __init__(self, lock=None, verbose=None):
...
if lock is None:
lock = RLock()
# Condition的底层锁,默认是RLock
self.__lock = lock
# 将Condition的acquire()和release()方法设置为底层锁的acquire()和release()方法
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
# waiting池
self.__waiters = []
def _is_owned(self):
# 如果当前线程持有锁,则返回True,否则返回False
# 说明2
# 其过程是:使用非阻塞的方式获取锁,如果获取成功,则表明当前线程,
# + 没有持有锁,那么释放获取到的锁,并返回False;
# + + 否则,认为当前线程持有锁,返回True。
# 只有当底层锁没有_is_owned()方法时,才会用这种方式判断当前线程是否拥有底层锁
# 因为RLock具有_is_owned()方法,所以,它的对象不会使用这里的_is_owned()方法
if self.__lock.acquire(0):
self.__lock.release()
return False
else:
return True
# Condition也支持上下文管理。
# + 当进入到Condtion对象时,会获取底层锁
# + 当离开到Condtion对象时,会释放底层锁
def __enter__(self):
return self.__lock.__enter__()
def __exit__(self, *args):
return self.__lock.__exit__(*args)
# 对于RLock而言,调用release()方法,并不一定会真正的释放锁。
# + 因此,它提供了_release_save()方法,该方法会真正的释放锁,
# + + 并且将RLock内部维护的状态,返回给调用方。
# 之后,线程再次获取到底层锁之后,再将状态重置回去
# RLock内部会维护两个状态:owner-拥有锁的线程的id,count-该线程获取了多少次锁
# 只有底层锁没有_release_save()和_acquire_restore()方法时,才用下面的实现
def _release_save(self):
self.__lock.release() # No state to save
def _acquire_restore(self, x):
self.__lock.acquire() # Ignore saved state
def wait(self, timeout=None):
# 1,如果当前线程,没有获取到底层锁,那么抛出异常
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# 2,创建一个锁对象,并获取它,然后把它放到waiting池
waiter = _allocate_lock()
waiter.acquire()
self.__waiters.append(waiter)
# 3,释放底层锁,并保存锁对象的内部状态
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
# 3.1,如果timeout是None,那么再次以阻塞的方式获取锁对象
# + 因此当前线程已经获取了一次该锁,因此当前线程会阻塞,直到其他线程释放该锁
waiter.acquire()
if __debug__:
self._note("%s.wait(): got it", self)
else:
# 3.2,如果timeout不是None,那么重复下面的流程:
# + 1,以非阻塞方式获取锁
# + + 1.1,如果获取成功,说明其他线程释放了该锁,那么退出循环
# + + 1.2,如果获取失败,那么看是否达到了超时时间,如果达到了,那么退出循环;否则,继续
# + 2,sleep一小段时间,然后回到步骤1
# + + 每次循环sleep的时长,是从0.0005秒开始,指数增长,最多增长到0.05秒
# + + + 第一次是:0.0005,第二次是:0.001,...
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if not gotit:
...
try:
# 3.3,如果因为超时,而不是被唤醒,退出的wait(),那么将锁从waiting池中移除
# + 因为当线程被唤醒的时候,调用notify的线程会将锁从waiting池中移除
self.__waiters.remove(waiter)
except ValueError:
pass
else:
...
finally:
# 4,获取底层锁,并重置锁对象的内部状态
self._acquire_restore(saved_state)
def notify(self, n=1):
# 1,如果当前线程,没有获取到底层锁,那么抛出异常
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
__waiters = self.__waiters
waiters = __waiters[:n]
if not waiters:
if __debug__:
self._note("%s.notify(): no waiters", self)
return
self._note("%s.notify(): notifying %d waiter%s", self, n,
n!=1 and "s" or "")
# 2,唤醒waiting池中前n个线程,并将他们从waiting池中移除
for waiter in waiters:
waiter.release()
try:
__waiters.remove(waiter)
except ValueError:
pass
def notifyAll(self):
self.notify(len(self.__waiters))
notify_all = notifyAll