Throttle的Python实现

参考文章和实现有:

throttle实现依赖于FlowRate来计算rate,根据计算出来的rate和设置的rate相比较,来决定delay多长时间。

下面是一种FlowRate的原始实现。这种实现的缺陷是没有办法对流量做到平滑处理:如果一段时间没有流量,相当于积累了比较多的时间片, 然后突然来了很大的流量的话,计算出来的rate会被低估。

class NaiveFlowRate:
    def __init__(self):
        self.cnt = 0
        self.ts = int(time.time())

    def update(self, inc=1):
        self.cnt += inc
        now_ts = int(time.time())
        return self.cnt / (now_ts - self.ts)

文章里面给出来的FlowRate实现则解决了这个问题,它会估算过去1分钟的rate,而对更早时间的流量会忽略掉。 当然这里1分钟也可以配置,可以实现成为5分钟,10分钟等等,只不过1分钟的实现会更加简单(可以在一个redis pipeline下3条指令完成)。 下面代码中,RedisFlowRate是用redis做存储可以用来做跨进程限速,而SmoothFlowRate则只能用于单进程限速

class RedisFlowRate:
    def __init__(self, client: redis.StrictRedis, key_prefix):
        self.key_prefix = key_prefix
        self.client: redis.StrictRedis = client

    def update(self, key, value=1):
        value = int(value)
        cur_ts = int(date_util.current_timestamp())
        cur_sec, cur_min = cur_ts % 60, (cur_ts // 60) % 60
        past_min = (cur_min + 59) % 60
        past_key = '{}{}.{}'.format(self.key_prefix, key, past_min)
        cur_key = '{}{}.{}'.format(self.key_prefix, key, cur_min)

        pipe = self.client.pipeline(transaction=False)
        pipe.get(past_key)
        pipe.incr(cur_key, value)
        pipe.expire(cur_key, 2 * 60 - cur_sec)

        resp = pipe.execute()
        past_counter = int(resp[0]) if resp[0] else 0
        current_counter = max(0, int(resp[1]) - value)
        current_rate = past_counter * ((60 - cur_sec) / 60) + current_counter
        return current_rate / 60

class SmoothFlowRate:
    def __init__(self, unit=60):
        self.last_count = 0
        self.cur_count = 0
        self.cur_ts = 0
        self.cur_bucket = 0
        self.unit = unit

    def __str__(self):
        return '{}(ts = {}, bucket = {}, last = {}, curr = {})'.format(
            __class__.__name__, self.cur_ts, self.cur_bucket,
            self.last_count, self.cur_count)

    def update(self, inc=1):
        now = int(date_util.current_timestamp())
        bucket = now // self.unit

        if self.cur_bucket == 0:
            self.cur_bucket = bucket
        elif self.cur_bucket < (bucket - 1):
            self.last_count = 0
            self.cur_count = 0
        elif self.cur_bucket == (bucket - 1):
            self.last_count = self.cur_count
            self.cur_count = 0

        self.cur_ts = now
        self.cur_bucket = bucket
        self.cur_count += inc

        last_ratio = (self.cur_ts % self.unit) / self.unit
        rate = (1 - last_ratio) * self.last_count + self.cur_count
        rate = rate / self.unit  # per seconds.
        return rate

实现完了FlowRate之后就可以实现Throttle了:

class SmoothThrottle(object):
    def __init__(self, rate_limit):
        self.rate_limit = rate_limit
        self.flow_rate = SmoothFlowRate(unit=60)

    def set_rate_limit(self, v):
        self.rate_limit = v

    def run(self, inc=1):
        rate = self.flow_rate.update(inc)
        if not self.rate_limit or rate < self.rate_limit:
            return 0
        delay = self.flow_rate.unit * (rate - self.rate_limit) / self.rate_limit
        if delay > 0.1:
            date_util.delay_seconds(delay)
            return delay
        return 0

为了确保rate计算正确以及throttle可以正常限速,我做了如下测试:

throttle = SmoothThrottle(rate_limit=50)
flow_rate = throttle.flow_rate

class Worker:
    def __init__(self, Q):
        self.Q: Queue = Q
        self.rates = []
        self._exit = False

    def run(self):
        Q = self.Q
        while True:
            try:
                v = self.Q.get(timeout=0.1)
            except queue.Empty:
                v = None
            if self._exit:
                break
            if v is not None:
                throttle.run(v) # 测试限速是否工作
                # flow_rate.update(v) # 测试rate是否估算正确
            rate = flow_rate.update(0)
            self.rates.append(rate)

    def exit(self):
        self._exit = True


class Producer:
    def __init__(self, Q, mean, std):
        self.Q: Queue = Q
        self.mean = mean
        self.std = std

    def run(self):
        size = 1000

        for x in range(5):
            values = np.random.normal(loc= self.mean, scale= self.std, size=size)
            for v in values:
                self.Q.put(v)
                date_util.delay_seconds(0.1)
            date_util.delay_seconds(100)


def collect():
    Q = queue.Queue(maxsize=1)
    worker = Worker(Q)
    t1 = threading.Thread(target=worker.run)
    producer = Producer(Q, mean=10, std=5)
    t2 = threading.Thread(target=producer.run)
    t1.start()
    t2.start()

    t2.join()
    worker.exit()
    t1.join()

    rates = worker.rates
    print('# of pts = {}'.format(len(rates)))
    return rates

产生的数字序列符合高斯分布N(10, 5),所以估计出来的rate应该是100.

Pasted-Image-20231225103921.png

throttle的rate_limit设置为50, 所以在50上会被削平.

Pasted-Image-20231225103953.png


UPDATE@202002 最近发现这个限速器在生产环境有点问题,没有对突发流量做到限速,而且实际速度和限制速度差别还蛮大的。

同事推荐给了我两个实现让我看看:

其实这些实现都是使用固定窗口(有1s, 10s, 60s)的,在这个时间窗口内设置一个次数上限。而我上面的rate limiter是60s的近似滑动窗口,在限速上会更加平滑一些。

滑动窗口比固定窗口有个好处是,没有办法预支流量。比如10s窗口的开始,固定窗口允许在第1s内请求10 * limit_qps的,这就是一个尖峰,但是滑动窗口会考虑到当前窗口大小,不会出现这样的情况。

滑动窗口和固定窗口都会存在窗口结束流量突增的问题。比如在一个10s窗口,前面9s没有请求,最后1s请求理论上是可以达到 10 * limit_qps的。1s固定窗口没有这个问题,窗口足够小了。

不管任何窗口以及窗口大小的rate limiter, 等待是个问题。等待时间还相对比较好算,即使是随机一个也没有问题。我觉得等待区分乐观等待和悲观等待:

我实现的限速器使用了乐观等待,但是并不是正确的实现,在超量的情况下面并没有撤销。在并发程度比较小的时候没有什么问题,当并发度非常高的时候,误差程度非常大。 考虑到撤销在实现上比较复杂,所以我还是改成了悲观等待的实现方式,虽然这样在基本不超量情况下会多一次redis交互,但是先照顾正确性吧。

class SmoothThrottle(object):
    def __init__(self, rate_limit):
        self.rate_limit = rate_limit
        self.flow_rate = SmoothFlowRate(unit=60)

    def set_rate_limit(self, v):
        self.rate_limit = v

    def _wait_once(self, rate):
        if not self.rate_limit or rate < self.rate_limit:
            return 0
        delay = self.flow_rate.unit * (rate - self.rate_limit) / self.rate_limit
        if delay > 0.1:
            date_util.delay_seconds(delay + random.random() * 0.5)
            return delay
        return 0

    def get_rate(self):
        return self.flow_rate.update(0)

    def _wait(self):
        res = 0
        while True:
            rate = self.get_rate()
            v = self._wait_once(rate)
            res += v
            if v == 0:
                break
        return res

    def run(self, inc=1):
        res = self._wait()
        rate = self.flow_rate.update(inc)
        return res + self._wait_once(rate)

不过不管是乐观还是悲观等待,在最后一次提交的时候还是可能会出现超量的情况,毕竟整个操作并不是原子的。如果所有这些逻辑都写在lua脚本提交到redis的话,是可以保证原子性的。