面试官:如何实现分布式系统的限流?
限流的概念以及作用我前一篇文章已经做了介绍:并发限流算法的实践
目录
限流的几种算法 :
1、令牌桶算法
2、漏桶算法
3. 滑动时间窗口计数器算法
5. 全局限流
6. 客户端限流
7. API网关限流
8. 熔断与降级
本篇重点:
具体实现:
限流的几种算法 :
这里主要讲在分布式系统中,限流是控制系统负载、防止过载的重要手段。以下是一些常见的限流实现方法:
1、令牌桶算法
-
原理:系统以固定速率向桶中添加令牌,请求需要获取一个令牌才能执行。桶有最大容量,当桶满时,新令牌会被丢弃。
-
优点:能够平滑流量,同时允许突发流量。
2、漏桶算法
-
原理:请求像水一样流入桶,桶以固定速率“漏”出水,超出桶容量的请求将被丢弃。
-
优点:流量稳定且不受突发流量影响。
3. 滑动时间窗口计数器算法
-
原理:
1、当有新的请求来临时将窗口滑动到该请求来临的时刻
2、判断窗口内的请求数是否超过了限制, 超过则拒绝服务, 否则请求通过
3、丢弃滑动窗口以外的请求
4、使用一个滑动窗口来动态计算请求数,可以更精确地控制流量。
-
实现:使用Redis等工具维护计数器,并设定过期时间。
5. 全局限流
-
原理:在服务端实现全局限流,通过数据库或分布式缓存(如Redis)来保持全局请求计数。
-
注意:需考虑性能和一致性问题。
6. 客户端限流
-
原理:在客户端实现限流逻辑,控制发送请求的频率。
-
优点:减轻服务器负担,但易受篡改。
7. API网关限流
-
原理:通过API网关实施限流策略,统一管理和监控流量。
-
优点:集中管理和配置,能够灵活调整策略。
8. 熔断与降级
-
原理:在流量过大或服务异常时,暂时中止某些请求,保护后端服务。
-
实现:可以结合Hystrix等框架实现。
本篇重点:
本来在单服务器部署系统中,滑动时间窗口计数器限流方式由于要计算上一次和下一次的请求事件间隔,记录每次请求的时间这是比较麻烦的。(相对令牌桶算法有RateLimiter-API直接支持,自己去实现缓存也比较麻烦)而且没有令牌桶算法平滑,但是在分布式系统中,因为不可能只用一个本地API就能解决多台服务器进程的请求限流,因为没办法保持一致性。利用集中缓存redis来实现滑动时间窗口计数器算法就会简单很多。且不需要太复杂的分布式限流设计的话,相对Hystrix这类框架,更加轻量。关键点在于,相同的接口,每次请求都会覆盖前一次的请求时刻,这就很好的解决了滑动窗口记录请求时间的麻烦之处。
具体实现:
1、先自定义一个注解,对需要限流的接口作唯一标记、并携带时间窗口大小、限流的最大值等。
import java.lang.annotation.*;/*** 限流注解*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {/*** 限流key*/public String key() default CacheConstants.RATE_LIMIT_KEY;/*** 限流时间,单位秒*/public int time() default 60;/*** 限流次数*/public int count() default 100;/*** 限流类型*/public LimitType limitType() default LimitType.DEFAULT;
}
注解定义好了,直接利用AOP注解来实现:
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;/*** 限流处理** @author txzn*/
@Aspect
@Component
@Slf4j
public class RateLimiterAspect {private RedisTemplate<Object, Object> redisTemplate;private RedisScript<Long> limitScript;@Autowiredpublic void setRedisTemplate1(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Autowiredpublic void setLimitScript(RedisScript<Long> limitScript) {this.limitScript = limitScript;}@Before("@annotation(rateLimiter)")public void doBefore(JoinPoint point, RateLimiter rateLimiter) throws Throwable {int time = rateLimiter.time();int count = rateLimiter.count();String combineKey = getCombineKey(rateLimiter, point);List<Object> keys = Collections.singletonList(combineKey);try {Long number = redisTemplate.execute(limitScript, keys, count, time);if (StringUtils.isNull(number) || number.intValue() > count) {throw new ServiceException("访问过于频繁,请稍候再试");}log.info("限制请求'{}',当前请求'{}',缓存key'{}'", count, number.intValue(), combineKey);} catch (ServiceException e) {throw e;} catch (Exception e) {throw new RuntimeException("服务器限流异常,请稍候再试");}}public String getCombineKey(RateLimiter rateLimiter, JoinPoint point) {StringBuffer stringBuffer = new StringBuffer(rateLimiter.key());if (rateLimiter.limitType() == LimitType.IP) {stringBuffer.append(IpUtils.getIpAddr()).append("-");}MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();Class<?> targetClass = method.getDeclaringClass();stringBuffer.append(targetClass.getName()).append("-").append(method.getName());return stringBuffer.toString();}
}
还有python代码的实现:
import time
import redis# 初始化Redis连接
r = redis.StrictRedis(host='localhost', port=6379, db=0)def rate_limiter(user_id, limit, period):current_time = int(time.time())key = f"rate_limiter:{user_id}:{current_time // period}"# 在Redis中增加请求计数request_count = r.incr(key)# 设置过期时间if request_count == 1:r.expire(key, period)if request_count > limit:return False # 超过限制return True # 允许请求# 使用示例
if rate_limiter("user123", 5, 60):print("请求被允许")
else:print("请求被拒绝,超出限流")