可以使用滑动窗口进行速率限制。比如TCP的滑动窗口机制:http://timd.cn/tcp-window/。
使用滑动窗口限速的好处是其比较平滑。
滑动窗口用来“框住”一段连续的元素序列。比如,一个时间序列,或一个环行缓冲区的子序列等。滑动窗口具有左右两个边缘,并且每个边缘都可以左移或右移。比如时间或环行缓冲区,可以被认为是向右无限延伸的,因此左边缘左移一般是一种不合规的行为。
下面看hystrix中是如何实现的:
在hystrix中,一个滑动窗口,包含若干个桶(默认是10个),每个桶保存一定时间间隔内的统计数据(默认是1s)。下面看一个官方的例子:
上图中,每个矩形框代表一个桶,每个桶记录着1秒内的4个指标数据:成功量、失败量、超时量、拒绝量。这10个桶合起来就是一个完整的滑动窗口。
从业务上讲,值得注意的是:桶对象有一个不可变的属性-windowStart,它表明该桶对象用来保存[windowStart, windowStart + bucketSizeInMillseconds)时间段内的统计信息。
从技术来讲,值得注意的是:因为每个桶都会被多个线程并发地更新指标数据,所以桶对象需要提供一些线程安全的数据结构和更新方法。为此,hystrix大量使用了CAS,而不是锁。
hystrix使用一个环形数组来维护这些桶,并且它的环形数组的实现类似于一个FIFO的队列。该数组实现有一个叫addLast(Bucket o)的方法,用于向环形数组的末尾追加新的桶对象,当数组中的元素个数没超过最大大小时,只是简单的维护尾指针;否则,在维护尾指针时,还要通过维护首指针,将第一个位置上元素剔除掉。可以看出,该环形数组的表现类似于FIFO队列。
更新统计数据时,都是向当前最新的bucket中更新的,因此hystrix的滑动窗口类HystrixRollingNumber中提供了getCurrentBucket()方法,获取当前最新的bucket。其执行流程如下:
可以看出:滑动窗口每次只向前滑动一个bucket,因此非常的平滑。
如果想要了解hystrix,请自行查阅资料,参考文档中有hystrix的滑动窗口类HystrixRollingNumber的实现。
通俗点讲,滑动窗口是向前滑的,而滚动窗口是向前滚的。滑动窗口每次向前滑的距离可以小于窗口的长度,假设有两个相邻的时刻t1、t2,t1时刻的滑动窗口和t2时刻的滑动窗口是可以有交叉的。而滚动窗口向前滚的距离至少是窗口的长度,因此 ,两个相邻时刻的滚动窗口,不会有交叉。
相比之下,滚动窗口没有滑动窗口那么平滑,因为:滑动窗口会继承前一个窗口的部分信息,而滚动窗口不会继承前一个窗口的任何信息。下面是博主基于滚动窗口实现的熔断器:
x
import threading
from typing import Optional
class WindowStatus:
"""
窗口状态:
开放
半开
关闭
"""
OPEN: int = 0x1
HALF_OPEN: int = 0x10
CLOSED: int = 0x100
class Window:
"""
窗口对象,其中包含:
起始位置
窗口长度
窗口状态
窗口期内的总请求数
窗口期内的失败请求数
"""
def __init__(self,
start_position: float,
status: int,
open_length: float,
closed_length: float,
half_open_length: float,
failure_ratio_threshold: float,
failure_count_threshold: float,
half_failure_count_threshold: float) -> None:
"""
:param start_position: 起始位置
:param status: 状态
:param open_length: 开放状态的窗口长度
:param closed_length: 关闭状态的窗口长度
:param half_open_length: 半开状态的窗口长度
:param failure_ratio_threshold: 失败比例阈值
:param failure_count_threshold: 失败数量阈值
:param half_failure_count_threshold: 半失败数量阈值
"""
self._start_position: float = start_position
self._status: int = status
self._open_length: float = open_length
self._closed_length: float = closed_length
self._half_open_length: float = half_open_length
self._failure_ratio_threshold: float = failure_ratio_threshold
self._failure_count_threshold: float = failure_count_threshold
self._half_failure_count_threshold: float = half_failure_count_threshold
self._lock = threading.RLock()
# 统计信息
self._total_count: int = 0
self._failure_count: int = 0
def _fetch(self, position: float) -> Optional[int]:
"""
使用指定的位置"推动"窗口前进
:param position: 位置
:return: 窗口的状态
 nbsp; """
with self._lock:
# 位置在窗口左边缘的左侧时,忽略它
if position < self._start_position:
return
# 位置在窗口右边缘的右侧时,分以下情况:
if position > self._get_end_position():
# 1,如果窗口是半开或打开的,直接右移
if self._status & WindowStatus.HALF_OPEN or self._status & WindowStatus.OPEN:
self._enter_into_open_status(position)
return self._status
# 2,如果当前窗口是关闭的,先进入到下一阶段的半开状态,然后递归处理
elif self._status & WindowStatus.CLOSED:
self._enter_into_half_open_status(self._get_end_position())
return self._fetch(position)
# 位置在窗口内时,直接返回
else:
return self._status
def _get_end_position(self) -> float:
"""
获取窗口的结束位置
:return: 窗口的结束位置
"""
if self._status & WindowStatus.OPEN:
return self._start_position + self._open_length
elif self._status & WindowStatus.CLOSED:
return self._start_position + self._closed_length
else:
return self._start_position + self._half_open_length
def get_status(self, position: float) -> int:
"""
使用指定的位置获取窗口的状态
:param position: 位置
:return: 窗口的状态
"""
return self._fetch(position)
def update_status(self, position: float, success_count: int, failure_count: int) -> None:
"""
更新窗口状态
:param position: 位置
:param success_count: 成功请求数
:param failure_count: 失败请求数
"""
with self._lock:
# 推进窗口
status: int = self._fetch(position)
if status is None:
return
# 当窗口处于关闭状态时,不允许更新窗口的统计
if status & WindowStatus.CLOSED:
return
# 更新统计信息
if success_count > 0:
self._total_count = self._total_count + success_count
if failure_count > 0:
self._total_count = self._total_count + failure_count
self._failure_count = self._failure_count + failure_count
if self._total_count == 0:
current_failure_ratio: float = 0
else:
current_failure_ratio: float = self._failure_count / (self._total_count + 0.)
# 当失败率达到阈值的时候,窗口进入到 CLOSED 状态
if status == WindowStatus.OPEN:
if current_failure_ratio >= self._failure_ratio_threshold:
if self._failure_count_threshold is None or \
self._failure_count >= self._failure_count_threshold:
self._enter_into_close_status(position)
return
if status == WindowStatus.HALF_OPEN:
if current_failure_ratio >= self._failure_ratio_threshold:
if self._half_failure_count_threshold is None or \
self._failure_count >= self._half_failure_count_threshold:
self._enter_into_close_status(position)
return
def _reset_counts(self) -> None:
"""
重置统计数据
"""
self._failure_count = 0
self._total_count = 0
def _enter_into_close_status(self, position: float) -> None:
"""
进入关闭状态
:param position: 窗口的起始位置
"""
self._start_position = position
self._status = WindowStatus.CLOSED
self._reset_counts()
def _enter_into_open_status(self, position: float) -> None:
"""
进入打开状态
:param position: 窗口的起始位置
"""
self._start_position = position
self._status = WindowStatus.OPEN
self._reset_counts()
def _enter_into_half_open_status(self, position: float) -> None:
"""
进入半开状态
:param position: 窗口的起始位置
"""
self._start_position = position
self._status = WindowStatus.HALF_OPEN
self._reset_counts()
def get_failure_count(self) -> int:
"""
获取失败的请求数
:return: 失败的请求数
"""
return self._failure_count
def get_success_count(self) -> int:
"""
获取成功的请求数
:return: 成功的请求数
"""
return self._total_count - self._failure_count