目录


概述

队列是用于多个生产者线程多个消费者线程之间同步数据的。
本文将对Python2.7的标准库提供的Queue模块进行源码解析,并且实现一个延迟队列(所谓的延迟队列就是把元素放进队列之后,并不能立刻从队列中将其取出来,而是需要等待一段时间之后,该元素才变得可用)。


源码解析

Queue模块的源代码在:https://github.com/python/cpython/blob/2.7/Lib/Queue.py
如果对条件变量不了解,可以先看:Python Condition源码解析
接下来,我们看主要代码:


class Queue:
    """使用给定的最大大小创建一个队列对象。
    如果maxsize小于等于0,那么队列是无限大的
    """
    def __init__(self, maxsize=0):
        # 队列的最大容量
        self.maxsize = maxsize
        # 初始化队列对象的底层数据结构,该数据结构用来保存put到队列中的元素
        # 在Queue中,底层数据结构是collections.dequeue(双端队列)
        self._init(maxsize)

        # 在使用队列对象暴漏出的所有方法时,都应该先获取到这个互斥锁
        # 所有的获取了该互斥锁的方法,在它返回之前,都应该先释放该锁
        # 这个互斥锁会在三个条件变量之间共享(也就是说,这三个条件变量
        # + 的底层锁都是该互斥锁)。因此,获取这些条件变量时,会先获取到该锁;
        # + 释放这些条件变量时,会先释放该锁。
        self.mutex = _threading.Lock()
        # 当成功地向队列中put一个元素时,会通知该条件变量;
        # 当以阻塞的方式从队列中get元素的时候,
        #  + 如果队列中没有可用的元素,那么线程会进入到该条件变量的waiting池,等待到超时或被唤醒
        self.not_empty = _threading.Condition(self.mutex)
        # 当成功地从队列中get一个元素时,会通知该条件变量。
        # 当以阻塞的方式向队列中put元素的时候,
        #  + 如果队列中没有剩余的空间,那么线程会进入到该条件变量的waiting池,等待到超时或被唤醒
        self.not_full = _threading.Condition(self.mutex)
        # 当未完成任务数归零的时候,会通知该条件变量,
        #  + join()操作会从阻塞状态中退出
        self.all_tasks_done = _threading.Condition(self.mutex)
        # 未完成的任务数
        self.unfinished_tasks = 0

    # 初始化底层数据结构
    def _init(self, maxsize):
        self.queue = deque()

    # 把元素保存到底层的数据结构中
    def _put(self, item):
        self.queue.append(item)

    def put(self, item, block=True, timeout=None):
        """往队列中加入一个元素
        如果block参数是True,并且timeout参数是None,
        那么put操作会阻塞,直到队列中有空闲的空间。
        如果block参数是True,timeout参数是非负数,
        并且,在这个期间,队列中一直没有空间,那么put操作会阻塞
        到超时,然后抛出Full异常。
        如果block参数是False,并且队列中有空闲空间,
        那么会立即把元素保存到底层数据结构中;否则,会忽略掉timeout参数,
        抛出Full异常。
        """

        # 1,获取条件变量not_full
        self.not_full.acquire()
        try:
            # 2,如果maxsize大于0
            if self.maxsize > 0:
                if not block:
                    # 2.1.1,如果block是False,并且队列已满,那么抛出Full异常
                    if self._qsize() == self.maxsize:
                        raise Full
                    # 2.1.2,如果block是False,并且队列未满,那么,走步骤4
                elif timeout is None:
                    # 2.2.1,如果block是True,timeout为None,并且队列已满,那么
                    #  + 线程进入到not_full的waiting池,等待被唤醒;
                    #  + 如果队列未满,那么,走步骤4
                    # 2.2.2,线程被唤醒之后,回到2.2.1
                    #  + (当有其他线程从队列中消费消息时,会通知not_full)
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    # 如果block为True,但是timeout小于0,那么抛出ValueError
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    # 2.3,计算等待的结束时间
                    endtime = _time() + timeout
                    # 2.3.1,如果队列未满,则走步骤4
                    # 2.3.2,如果队列已满,那么计算出需要等待的时间,如果已经超时,那么,抛出Full异常;
                    #  + 否则,并进入到not_full的waiting池,等待到超时或被唤醒
                    # 2.3.3,在线程被唤醒或等待超时之后,回到2.3.1

                    # 也就是说,如果在timeout指定的时间内,队列中,一直没有空闲空间,
                    # + 那么线程在等待到超时之后,会抛出Full异常
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)

            # 3,如果maxsize小于等于0,则走步骤4

            # 4,将元素保存到底层数据结构中,
            # + 并递增unfinished_tasks,同时通知not_empty,唤醒在其中等待数据的线程
            self._put(item)
            self.unfinished_tasks += 1
            # 值得再次提及是,not_full、not_empty、all_tasks_done底层的锁是同一个
            self.not_empty.notify()
        finally:
            # 4,释放条件变量not_full
            self.not_full.release()

    # 从底层数据结构中,移除并返回一个元素
    def _get(self):
        return self.queue.popleft()

    def get(self, block=True, timeout=None):
        """从队列中移除,并返回一个元素。
        如果block参数是True,并且timeout参数是None,
        那么get操作会一直阻塞到队列中有元素可用。
        如果timeout参数是非负数,那么get操作最多等待timeout秒,
        如果在这个时间内,队列中一直没有元素可用,那么get操作会等待timeout秒,并抛出Empty异常。
        否则,(如果blocking参数是False,)那么会忽略timeout参数,
        如果队列中,有元素可用,那么立即返回一个;否则,抛出Empty异常
        """
        # 1,获取not_empty条件变量
        self.not_empty.acquire()
        try:
            # 2.1.1,如果block是False,并且队列中没有元素,那么抛出Empty异常
            # 2.1.2,如果block是False,并且队列中有元素,那么走步骤3
            if not block:
                if not self._qsize():
                    raise Empty
            # 2.2,如果block是True,并且timeout是None,那么:
            # 2.2.1,如果队列中有元素,那么走步骤3;否则,线程进入到not_empty的waiting池,等待被唤醒
            # 2.2.2,线程被唤醒之后,回到2.2.1
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            # 如果timeout小于0,那么抛出ValueError
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                # 2.3,计算等待的结束时间
                endtime = _time() + timeout
                # 2.3.1,如果队列中有元素,那么走步骤3;否则,计算等待的超时时间,
                #  + 如果达到了超时时间,那么抛出Empty错误;否则,进入到not_empty的waiting池,
                #  + 等待到超时,或被唤醒 
                # 2.3.2,当线程等待到超时或被唤醒之后,回到2.3.1

                # 也就是说,如果在timeout指定的时间之内,队列中一直没有元素可用,
                # + 那么,会等待timeout秒,并抛出Empty异常
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)

            # 3,从队列中移除并返回一个元素;通知not_full,也就是唤醒一个在not_full中等待的线程
            item = self._get()
            self.not_full.notify()
            return item
        finally:
            # 4,释放not_empty条件变量
            self.not_empty.release()

    def task_done(self):
        """调用该方法意味着,之前放到队列中的一个任务被完成了。
        该方法是被队列的消费者线程使用的。消费者线程在调用get()方法,从队列中获取到一个任务,
        并处理之后,需要调用该方法,告诉队列该任务处理完成了。
        每次,成功向队列中put一个元素的时候,都会将unfinished_tasks增加1;
        每次,调用task_done()方法,都会将unfinished_tasks减少1
        该方法的作用是唤醒正在阻塞的join()操作。
        """
        # 获取条件变量
        self.all_tasks_done.acquire()
        try:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                # 当unfinished小于0时,
                #  + 也就是成功调用put()的次数小于调用task_done()的次数时,会抛出异常
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                # 当unfinished为0时,会通知all_tasks_done
                self.all_tasks_done.notify_all()
            # 递减unfinished_tasks
            self.unfinished_tasks = unfinished
        finally:
            # 释放条件变量
            self.all_tasks_done.release()

    def join(self):
        """该方法会一直阻塞,直到,队列中所有的元素都被取出,并被处理了。
        当成功向队列中put元素的时候,unfinished_tasks会增加。
        当消费者线程调用task_done()方法时,unfinished_tasks会减少。
        当unfinished_tasks降为0时,join()方法才会退出阻塞状态
        """
        self.all_tasks_done.acquire()
        try:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()
        finally:
            self.all_tasks_done.release()

    # 返回队列中的元素数
    def _qsize(self, len=len):
        return len(self.queue)

    def qsize(self):
        """返回队列中的元素数"""
        self.mutex.acquire()
        n = self._qsize()
        self.mutex.release()
        return n

    def empty(self):
        """如果队列为空,则返回True,否则返回False"""
        self.mutex.acquire()
        n = not self._qsize()
        self.mutex.release()
        return n

    def full(self):
        """如果队列满了,则返回True,否则返回False"""
        self.mutex.acquire()
        n = 0 < self.maxsize == self._qsize()
        self.mutex.release()
        return n

    # 非阻塞的put
    def put_nowait(self, item):
        return self.put(item, False)

    # 非阻塞的get
    def get_nowait(self):
        return self.get(False)


例子


# coding: utf8

from Queue import Queue
from threading import Thread
import logging
import time

logging.basicConfig(level=logging.INFO,
    format="%(thread)d [%(asctime)s] %(msg)s",
    datefmt="%F %T")
LOGGER = logging.getLogger(__name__)

# 创建一个队列,并向其中添加100个元素
queue = Queue()
for i in range(100):
    queue.put("this is %2d" % i)

# 开始10个线程处理任务
def consume():
    global queue

    while not queue.empty():
        item = queue.get()
        LOGGER.info("get '%s' from queue" % item)
        time.sleep(1)
        # 处理完任务后,调用task_done()方法,告诉队列,任务处理完了
        queue.task_done()
for i in range(10):
    thread = Thread(target=consume)
    thread.start()

# 等待,直到队列中所有任务都被处理了
queue.join()


延迟队列DelayQueue

通常,创建一种特定的队列,只需要继承Queue类,并在子类中重写_init()_put()_get()_qsize()方法即可。比如Queue模块中的PriorityQueue和LifoQueue,就是这么实现的。
但是延迟队列比较特别,因为即使队列中有元素,但是这些元素可能因为还没有到达延迟时间,而导致这些元素其实是不可用的,也就是不能被get出来。因此,下面的实现中对get()方法进行了重写。
注意:看下面的代码之前,需要对最小堆以及heapq模块有一定的了解。


# coding: utf8

from Queue import Queue, Empty
import heapq
from time import time as _time


class _Item(object):
    def __init__(self, value, delay):
        assert delay >= 0, "delay can not be less than 0"
        self._value = value
        self._available_at = _time() + delay

    @property
    def value(self):
        return self._value

    @property
    def available_at(self):
        return self._available_at

    def __cmp__(self, another):
        if not isinstance(another, self.__class__):
            raise TypeError("expect %s, not %s" % 
                (self.__class__.__name__, type(another).__name__))
        if self.available_at < another.available_at:
            return -1
        elif self.available_at == another.available_at:
            return 0
        else:
            return 1


class DelayQueue(Queue):
    def _init(self, max_size):
        self._underlying = []

    def _put(self, item):
        assert isinstance(item, (list, tuple)) and \
            len(item) == 2 and \
            isinstance(item[1], (int, long, float)), \
            "item should be tuple, and the second " \
                    "element should be float"
        heapq.heappush(self._underlying, _Item(item[0], item[1]))

    def _get(self):
        item = heapq.heappop(self._underlying)
        return item.value

    def _qsize(self):
        return len(self._underlying)

    def _has_available_item(self):
        if len(self._underlying) == 0:
            return False
        item = self._underlying[0]
        if item.available_at <= _time():
            return True
        else:
            return False

    def _at_least_wait_for(self):
        if len(self._underlying) == 0:
            return None
        item = self._underlying[0]
        if item.available_at <= _time():
            return 0
        else:
            return max(0, item.available_at - _time())

    def get(self, block=True, timeout=None):
        # 1,获取not_empty条件变量
        self.not_empty.acquire()

        try:
            # 2.1.1,如果block是False,并且在当前,队列无可用的元素,
            # + 那么抛出Empty异常
            # 2.1.2,如果block是False,并且在当前,队列有可用的元素,
            # + 那么走步骤3
            if not block:
                if not self._has_available_item():
                    raise Empty
            # 2.2,如果block是True,并且timeout是None,那么:
            # 2.2.1,如果队列中有可用的元素,那么走步骤3;
            # + 否则,线程进入到not_empty的waiting池,等待被唤醒或者是
            # + + 堆中的第一个元素变得可用
            # 2.2.2,线程等待到超时或被唤醒之后,回到2.2.1
            elif timeout is None:
                while not self._has_available_item():
                    self.not_empty.wait(self._at_least_wait_for())
            # 如果timeout小于0,那么抛出ValueError
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                # 2.3,计算等待的结束时间
                endtime = _time() + timeout
                # 2.3.1,如果队列中有可用的元素,那么走步骤3;
                # + 否则,计算仍需等待的时间,
                # + 如果达到了超时时间,那么抛出Empty错误;
                # + 否则,进入到not_empty的waiting池,
                # + 等待到超时或堆中的第一个元素变得可用,或者被唤醒
                # 2.3.2,当线程等待到超时或被唤醒之后,回到2.3.1

                # 也就是说,如果在timeout指定的时间之内,队列中一直没有元素可用,
                # + 那么,会等待timeout秒,并抛出Empty异常
                while not self._has_available_item():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    wait_for = min(self._at_least_wait_for, remaining)
                    self.not_empty.wait(wait_for)

            # 3,从队列中移除并返回一个元素;通知not_full,
            # + 也就是唤醒一个在not_full中等待的线程
            item = self._get()
            self.not_full.notify()
            return item
        finally:
            # 4,释放not_empty条件变量
            self.not_empty.release()


if __name__ == "__main__":
    from threading import Thread
    delay_queue = DelayQueue(1)

    def consume():
        t1 = _time()
        print(delay_queue.get())
        t2 = _time()
        print("time used: %f" % (t2 - t1))
        print(delay_queue.get())
        t3 = _time()
        print("time used: %f" % (t3 - t2))

    t = Thread(target=consume)
    t.setDaemon(False)
    t.start()
    delay_queue.put(("value1", 3))
    delay_queue.put(("value2", 6))