本文参考两个使用Redis做限流的项目,分析分布式系统限流服务的实现,包括固定窗口以及滚动窗口的限流。

参考项目

固定窗口

fixed-window

go-redis/redis_rate

redis_rate使用窗口标识key的字符串,用INCRBY统计窗口已使用流量n,且key在此窗口结束后自动过期。

窗口标识的生成:

  • slot = utime(时间戳s)➗udur(窗口大小sdur ≥ 1s)
  • key = name + slot

utime与系统时钟相关,考虑一种较为极端的情况,窗口大小1sAB两个系统时钟差2s,而key的过期时间与窗口大小相同1s, 假设key在窗口最后有更新则key的生存周期是2s,而恰好时钟相差2s,这样AB系统的分布式限流就变成了单机限流。
虽然比较极端,但在使用时还是需要注意,根据需要可以适当加大key的过期时间,同时存活多个窗口,使存活窗口范围覆盖系统间的时钟误差。

func (l *Limiter) AllowN(
	name string, maxn int64, dur time.Duration, n int64,
) (count int64, delay time.Duration, allow bool) {
	udur := int64(dur / time.Second)
	utime := time.Now().Unix()
	slot := utime / udur
	delay = time.Duration((slot+1)*udur-utime) * time.Second

	if l.Fallback != nil {
		allow = l.Fallback.Allow()
	}

	name = allowName(name, slot)
	count, err := l.incr(name, dur, n)
	if err == nil {
		allow = count <= maxn
	}

	return count, delay, allow
}

adapter/redisquota/fixed_window

redisquota使用lua脚本实现,设计上使用一个带有tokenexpire的哈希表,key过期时间为windowLength,窗口在key过期或timestampexpire时重置。 另外redisquota根据场景做了幂等设计,同一deduplicationid在窗口有效期内不会重新分配token

因为timestamp与系统时钟相关,同样假设AB两个系统时钟差2s(A>B),窗口大小10s,如果窗口①由A系统时间戳触发,而窗口②由B系统时间戳触发(A在后2s没有流量), 这样①实际窗口大小就是12s,这样继续到窗口③由A系统时间戳触发,②的实际窗口代销就是8s,所以在分布式环境下限流效果同样受系统时钟误差的影响。

local key_meta = KEYS[1]
-- local key_data = KEYS[2]

local credit = tonumber(ARGV[1])
local windowLength = tonumber(ARGV[2])
-- local bucketLength = tonumber(ARGV[3])
local bestEffort = tonumber(ARGV[4])
local token = tonumber(ARGV[5])
local timestamp = tonumber(ARGV[6])
local deduplicationid = ARGV[7]

-- lookup previous response for the deduplicationid and returns if it is still valid
--------------------------------------------------------------------------------
if (deduplicationid or '') ~= '' then
    local previous_token = tonumber(redis.call("HGET", deduplicationid .. "-" .. key_meta, "token"))
    local previous_expire = tonumber(redis.call("HGET", deduplicationid .. "-" .. key_meta, "expire"))

    if previous_token and previous_expire then
        if timestamp < previous_expire then
            return {previous_token, previous_expire - timestamp}
        end
    end
end

-- read or initialize meta information
--------------------------------------------------------------------------------
local info_token = tonumber(redis.call("HGET", key_meta, "token"))
local info_expire = tonumber(redis.call("HGET", key_meta, "expire"))

if (not info_token or not info_expire) or (timestamp >= info_expire) then
    info_token = 0
    info_expire = windowLength + timestamp

    redis.call("HMSET", key_meta, "token", info_token, "expire", windowLength + timestamp)
    -- set the expiration time for automatic cleanup
    redis.call("PEXPIRE", key_meta, windowLength / 1000000)
end

if info_token + token > credit then
    if bestEffort == 1 then
        local exceeded = info_token + token - credit

        if exceeded < token then
            -- return maximum available allocated token
            redis.call("HMSET", key_meta, "token", credit)

            -- save current request and set expiration time for auto cleanup
            if (deduplicationid or '') ~= '' then
                redis.call("HMSET", deduplicationid .. "-" .. key_meta, "token", token - exceeded, "expire", info_expire)
                redis.call("PEXPIRE", deduplicationid .. "-" .. key_meta, math.floor((info_expire - timestamp) / 1000000))
            end

            return {token - exceeded, info_expire - timestamp}
        else
            -- not enough available credit
            return {0, 0}
        end
    else
        -- not enough available credit
        return {0, 0}
    end
else
    -- allocated token
    redis.call("HMSET", key_meta, "token", info_token + token)

    -- save current request and set expiration time for auto cleanup
    if (deduplicationid or '') ~= '' then
        redis.call("HMSET", deduplicationid .. "-" .. key_meta, "token", token, "expire", info_expire)
        redis.call("PEXPIRE", deduplicationid .. "-" .. key_meta, math.floor((info_expire - timestamp) / 1000000))
    end

    return {token, info_expire - timestamp}
end

Lua脚本获取时间戳

既然在分布式环境系统时钟有可能存在误差,那自然考虑将时间戳转到Redis中获取,消除时钟误差对窗口的影响,这里有一个限制条件是需要Lua脚本支持随机写入, 有关Redis Lua脚本随机写入请参考此文《redis4.0之Lua脚本新姿势》,结合这个特性可以将两个方案的时间戳由外部传入改为在Lua脚本中获取Redis系统时间, 我在hb-go/pkg/rate包分别使用SETHSET做了实现可以参考。

-- Redis version ≥ 3.2
redis.replicate_commands()
local now = redis.call('TIME')
timestamp = (tonumber(now[1]) * 1e6 + tonumber(now[2])) * 1e3

滚动窗口

rolling-window

滚动窗口在固定窗口基础上,将一个window拆成多个bucket,这样窗口根据bucket的大小向前移动。Redis的结构为哈希表:tokenbucket.tokenbucket.timestampkey, 以及一个存储bucket历史数据的有序集合(bucket的时间戳排序)

  • token:窗口内已使用token
  • bucket.token:当前桶内已使用token
  • bucket.timestamp:当前桶的时间戳
  • key:累计bucket数量
local key_meta = KEYS[1]
local key_data = KEYS[2]

local credit = tonumber(ARGV[1])
local windowLength = tonumber(ARGV[2])
local bucketLength = tonumber(ARGV[3])
local bestEffort = tonumber(ARGV[4])
local token = tonumber(ARGV[5])
local timestamp = tonumber(ARGV[6])
local deduplicationid = ARGV[7]

-- lookup previous response for the deduplicationid and returns if it is still valid
--------------------------------------------------------------------------------
if (deduplicationid or '') ~= '' then
    local previous_token = tonumber(redis.call("HGET", deduplicationid .. "-" .. key_meta, "token"))
    local previous_expire = tonumber(redis.call("HGET", deduplicationid .. "-" .. key_meta, "expire"))

    if previous_token and previous_expire then
        if timestamp < previous_expire then
            return {previous_token, previous_expire - timestamp}
        end
    end
end

-- read meta information
--------------------------------------------------------------------------------
local info_token = tonumber(redis.call("HGET", key_meta, "token"))
local info_bucket_token = tonumber(redis.call("HGET", key_meta, "bucket.token"))
local info_bucket_timestamp = tonumber(redis.call("HGET", key_meta, "bucket.timestamp"))

-- initialize meta
--------------------------------------------------------------------------------
if not info_token or not info_bucket_token or not info_bucket_timestamp then
    info_token = 0
    info_bucket_token = 0
    info_bucket_timestamp = timestamp

    redis.call("HMSET", key_meta,
        "token", info_token,
        "bucket.token", info_bucket_token,
        "bucket.timestamp", info_bucket_timestamp,
        "key", 0)
end


-- move buffer to bucket list if bucket timer is older than bucket window
--------------------------------------------------------------------------------
if (timestamp - info_bucket_timestamp + 1) > bucketLength then
    if tonumber(info_bucket_token) > 0 then
        local nextKey = redis.call("HINCRBY", key_meta, "key", 1)
        local value = tostring(nextKey) .. "." .. tostring(info_bucket_token)
        redis.call("ZADD", key_data, info_bucket_timestamp, value);
    end
    redis.call("HMSET", key_meta,
        "bucket.token", 0,
        "bucket.timestamp", timestamp)
end

local time_to_expire = timestamp - windowLength

-- reclaim tokens from expired records
--------------------------------------------------------------------------------
local reclaimed = 0
local expired = redis.call("ZRANGEBYSCORE", key_data, 0, time_to_expire)

for idx, value in ipairs(expired) do
    reclaimed = reclaimed + tonumber(string.sub(value, string.find(value, "%.")+1))
end

-- remove expired records
--------------------------------------------------------------------------------
redis.call("ZREMRANGEBYSCORE", key_data, 0, time_to_expire)

-- update consumed token
--------------------------------------------------------------------------------
if reclaimed > 0 then
    info_token = info_token - reclaimed;
    if info_token < 0 then
        info_token = 0
    end
    redis.call("HSET", key_meta, "token", info_token)
end

-- update the expiration time for automatic cleanup
--------------------------------------------------------------------------------
redis.call("PEXPIRE", key_meta, windowLength / 1000000)
redis.call("PEXPIRE", key_meta, windowLength / 1000000)

-- calculate available token
--------------------------------------------------------------------------------
local available_token = credit - info_token

-- check available token and requested token
--------------------------------------------------------------------------------

if available_token <= 0 then
    -- credit exhausted
    return {0, 0}
elseif available_token >= token then
    -- increase token and bucket.token by token
    redis.call("HINCRBY", key_meta, "token", token)
    redis.call("HINCRBY", key_meta, "bucket.token", token)

    -- save current request and set expiration time for auto cleanup
    if (deduplicationid or '') ~= '' then
        redis.call("HMSET", deduplicationid .. "-" .. key_meta, "token", token, "expire", timestamp + windowLength)
        redis.call("PEXPIRE", deduplicationid .. "-" .. key_meta, windowLength / 1000000)
    end

    return {token,  windowLength}
else

    if bestEffort == 0 then
        -- not enough token
        return {0, 0}
    end

    -- allocate available token only
    redis.call("HINCRBY", key_meta, "token", available_token)
    redis.call("HINCRBY", key_meta, "bucket.token", available_token)

    -- save current request and set expiration time for auto cleanup
    if (deduplicationid or '') ~= '' then
        redis.call("HMSET", deduplicationid .. "-" .. key_meta, "token", available_token, "expire", timestamp + windowLength)
        redis.call("PEXPIRE", deduplicationid .. "-" .. key_meta, windowLength / 1000000)
    end

    return {available_token, windowLength}
end

限流的窗口与Flink的Window类似, 对于固定窗口结合使用场景的需求,可以借鉴Flink的Tumbling Window接口设计提供offset