14.4 限流算法实现
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式: "设计API限流系统" "令牌桶vs漏桶算法" "分布式限流实现"
预计阅读时间:40分钟
🎯 限流算法基础
常见限流算法
限流是保护系统稳定性的重要手段,常见算法包括:
- 固定窗口计数器:简单但有突刺问题
- 滑动窗口计数器:平滑限流,内存开销大
- 令牌桶算法:允许突发流量,平滑限流
- 漏桶算法:强制限制输出速率
🪙 令牌桶算法
令牌桶实现
/** * 令牌桶限流算法 */ public class TokenBucketRateLimiter { private final int capacity; // 桶容量 private final double refillRate; // 令牌补充速率(个/秒) private final Map<String, TokenBucket> buckets; public TokenBucketRateLimiter(int capacity, double refillRate) { this.capacity = capacity; this.refillRate = refillRate; this.buckets = new ConcurrentHashMap<>(); } /** * 令牌桶 */ static class TokenBucket { private volatile double tokens; private volatile long lastRefillTime; private final int capacity; private final double refillRate; public TokenBucket(int capacity, double refillRate) { this.capacity = capacity; this.refillRate = refillRate; this.tokens = capacity; this.lastRefillTime = System.currentTimeMillis(); } public synchronized boolean tryAcquire(int tokensRequested) { refill(); if (tokens >= tokensRequested) { tokens -= tokensRequested; return true; } return false; } private void refill() { long currentTime = System.currentTimeMillis(); double timePassed = (currentTime - lastRefillTime) / 1000.0; if (timePassed > 0) { double tokensToAdd = timePassed * refillRate; tokens = Math.min(capacity, tokens + tokensToAdd); lastRefillTime = currentTime; } } public double getAvailableTokens() { refill(); return tokens; } } /** * 尝试获取许可 */ public boolean tryAcquire(String key) { return tryAcquire(key, 1); } /** * 尝试获取指定数量的许可 */ public boolean tryAcquire(String key, int permits) { TokenBucket bucket = buckets.computeIfAbsent(key, k -> new TokenBucket(capacity, refillRate)); return bucket.tryAcquire(permits); } }
🌐 分布式限流
基于Redis的分布式限流
/** * 基于Redis的分布式限流器 */ @Component public class DistributedRateLimiter { private final RedisTemplate<String, Object> redisTemplate; private final String keyPrefix = "rate_limiter:"; public DistributedRateLimiter(RedisTemplate<String, Object> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 固定窗口限流 */ public boolean tryAcquireFixedWindow(String key, int maxRequests, long windowSizeInSeconds) { String redisKey = keyPrefix + "fixed:" + key; long currentWindow = System.currentTimeMillis() / (windowSizeInSeconds * 1000); String windowKey = redisKey + ":" + currentWindow; // 使用Lua脚本保证原子性 String luaScript = "local current = redis.call('GET', KEYS[1]) " + "if current == false then " + " redis.call('SET', KEYS[1], 1) " + " redis.call('EXPIRE', KEYS[1], ARGV[2]) " + " return 1 " + "else " + " if tonumber(current) < tonumber(ARGV[1]) then " + " return redis.call('INCR', KEYS[1]) " + " else " + " return -1 " + " end " + "end"; DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class); Long result = redisTemplate.execute(script, Collections.singletonList(windowKey), maxRequests, windowSizeInSeconds); return result != null && result != -1; } /** * 令牌桶限流 */ public boolean tryAcquireTokenBucket(String key, int capacity, double refillRate) { String redisKey = keyPrefix + "token:" + key; long currentTime = System.currentTimeMillis(); String luaScript = "local bucket = redis.call('HMGET', KEYS[1], 'tokens', 'lastRefill') " + "local tokens = tonumber(bucket[1]) or tonumber(ARGV[1]) " + "local lastRefill = tonumber(bucket[2]) or tonumber(ARGV[4]) " + "local timePassed = (tonumber(ARGV[4]) - lastRefill) / 1000.0 " + "if timePassed > 0 then " + " tokens = math.min(tonumber(ARGV[1]), tokens + timePassed * tonumber(ARGV[2])) " + "end " + "if tokens >= tonumber(ARGV[3]) then " + " tokens = tokens - tonumber(ARGV[3]) " + " redis.call('HMSET', KEYS[1], 'tokens', tokens, 'lastRefill', ARGV[4]) " + " redis.call('EXPIRE', KEYS[1], 3600) " + " return 1 " + "else " + " redis.call('HMSET', KEYS[1], 'tokens', tokens, 'lastRefill', ARGV[4]) " + " redis.call('EXPIRE', KEYS[1], 3600) " + " return 0 " + "end"; DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class); Long result = redisTemplate.execute(script, Collections.singletonList(redisKey), capacity, refillRate, 1, currentTime); return result != null && result == 1; } }
🎯 限流注解和AOP
限流注解
/** * 限流注解 */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RateLimit { /** * 限流key,支持SpEL表达式 */ String key() default ""; /** * 限流算法类型 */ Algorithm algorithm() default Algorithm.TOKEN_BUCKET; /** * 最大请求数 */ int maxRequests() default 100; /** * 时间窗口大小(秒) */ long windowSize() default 60; /** * 令牌补充速率(个/秒) */ double refillRate() default 1.0; /** * 限流失败时的错误消息 */ String message() default "请求过于频繁,请稍后再试"; enum Algorithm { FIXED_WINDOW, SLIDING_WINDOW, TOKEN_BUCKET, LEAKY_BUCKET } }
限流AOP切面
/** * 限流AOP切面 */ @Aspect @Component public class RateLimitAspect { private final DistributedRateLimiter distributedRateLimiter; private final SpelExpressionParser parser; public RateLimitAspect(DistributedRateLimiter distributedRateLimiter) { this.distributedRateLimiter = distributedRateLimiter; this.parser = new SpelExpressionParser(); } @Around("@annotation(rateLimit)") public Object around(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable { String key = generateKey(joinPoint, rateLimit); // 检查限流 boolean allowed = false; switch (rateLimit.algorithm()) { case FIXED_WINDOW: allowed = distributedRateLimiter.tryAcquireFixedWindow( key, rateLimit.maxRequests(), rateLimit.windowSize()); break; case TOKEN_BUCKET: allowed = distributedRateLimiter.tryAcquireTokenBucket( key, rateLimit.maxRequests(), rateLimit.refillRate()); break; } if (!allowed) { throw new RateLimitException(rateLimit.message()); } return joinPoint.proceed(); } /** * 生成限流key */ private String generateKey(ProceedingJoinPoint joinPoint, RateLimit rateLimit) { if (StringUtils.hasText(rateLimit.key())) { // 解析SpEL表达式 Expression expression = parser.parseExpression(rateLimit.key()); EvaluationContext context = new StandardEvaluationContext(); // 添加方法参数到上下文 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); String[] paramNames = signature.getParameterNames(); Object[] args = joinPoint.getArgs(); for (int i = 0; i < paramNames.length; i++) { context.setVariable(paramNames[i], args[i]); } return expression.getValue(context, String.class); } // 默认使用方法签名 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); return signature.getDeclaringType().getSimpleName() + "." + signature.getName(); } } /** * 限流异常 */ public class RateLimitException extends RuntimeException { public RateLimitException(String message) { super(message); } }
🔧 实际应用示例
API限流示例
/** * API控制器示例 */ @RestController @RequestMapping("/api") public class UserController { /** * 用户登录 - 按IP限流 */ @PostMapping("/login") @RateLimit( key = "#request.remoteAddr", algorithm = RateLimit.Algorithm.FIXED_WINDOW, maxRequests = 5, windowSize = 300, message = "登录尝试过于频繁,请5分钟后再试" ) public ResponseEntity<?> login(@RequestBody LoginRequest request, HttpServletRequest httpRequest) { // 登录逻辑 return ResponseEntity.ok("登录成功"); } /** * 发送短信验证码 - 按手机号限流 */ @PostMapping("/sms/send") @RateLimit( key = "#request.phoneNumber", algorithm = RateLimit.Algorithm.TOKEN_BUCKET, maxRequests = 3, refillRate = 1.0/60, message = "短信发送过于频繁,请稍后再试" ) public ResponseEntity<?> sendSms(@RequestBody SmsRequest request) { // 发送短信逻辑 return ResponseEntity.ok("短信发送成功"); } /** * 文件上传 - 按用户ID限流 */ @PostMapping("/upload") @RateLimit( key = "#userId", algorithm = RateLimit.Algorithm.TOKEN_BUCKET, maxRequests = 10, refillRate = 0.1, message = "上传过于频繁,请稍后再试" ) public ResponseEntity<?> uploadFile(@RequestParam String userId, @RequestParam MultipartFile file) { // 文件上传逻辑 return ResponseEntity.ok("上传成功"); } }
限流监控和统计
/** * 限流监控服务 */ @Service public class RateLimitMonitorService { private final RedisTemplate<String, Object> redisTemplate; private final String statsKeyPrefix = "rate_limit_stats:"; public RateLimitMonitorService(RedisTemplate<String, Object> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 记录限流事件 */ public void recordRateLimitEvent(String key, boolean allowed) { String statsKey = statsKeyPrefix + getCurrentHour(); String field = key + (allowed ? ":allowed" : ":blocked"); redisTemplate.opsForHash().increment(statsKey, field, 1); redisTemplate.expire(statsKey, Duration.ofDays(7)); } /** * 获取限流统计 */ public Map<String, Object> getRateLimitStats(String key, int hours) { Map<String, Object> stats = new HashMap<>(); long totalAllowed = 0; long totalBlocked = 0; for (int i = 0; i < hours; i++) { String statsKey = statsKeyPrefix + (getCurrentHour() - i); Object allowed = redisTemplate.opsForHash().get(statsKey, key + ":allowed"); Object blocked = redisTemplate.opsForHash().get(statsKey, key + ":blocked"); if (allowed != null) { totalAllowed += Long.parseLong(allowed.toString()); } if (blocked != null) { totalBlocked += Long.parseLong(blocked.toString()); } } stats.put("totalAllowed", totalAllowed); stats.put("totalBlocked", totalBlocked); stats.put("totalRequests", totalAllowed + totalBlocked); stats.put("blockRate", totalAllowed + totalBlocked > 0 ? (double) totalBlocked / (totalAllowed + totalBlocked) : 0); return stats; } private long getCurrentHour() { return System.currentTimeMillis() / (60 * 60 * 1000); } }
💡 面试常见问题解答
Q1: 令牌桶和漏桶算法的区别?
标准回答:
令牌桶 vs 漏桶算法: 1. 令牌桶算法 - 允许突发流量:桶中有令牌就能处理 - 平滑限流:令牌按固定速率补充 - 适用场景:API限流、突发请求处理 2. 漏桶算法 - 强制限制输出速率:匀速处理请求 - 平滑输出:无论输入如何,输出恒定 - 适用场景:流量整形、消息队列 3. 主要区别 - 突发处理:令牌桶允许,漏桶不允许 - 实现复杂度:令牌桶较简单 - 应用场景:令牌桶更适合API限流 根据业务需求选择合适的算法。
Q2: 分布式限流如何实现?
标准回答:
分布式限流实现方案: 1. 基于Redis - 使用Lua脚本保证原子性 - 支持多种限流算法 - 性能好,易扩展 2. 基于数据库 - 使用数据库锁机制 - 一致性强但性能较差 - 适用于对一致性要求高的场景 3. 实现要点 - 原子性操作:避免并发问题 - 高可用性:Redis集群部署 - 性能优化:Lua脚本减少网络开销 - 监控告警:实时监控限流效果 Redis + Lua脚本是最常用的方案。
Q3: 如何设计一个高性能的限流系统?
标准回答:
高性能限流系统设计: 1. 算法选择 - 根据场景选择合适算法 - 令牌桶:API限流 - 滑动窗口:精确控制 2. 存储优化 - Redis集群:提高并发能力 - 本地缓存:减少网络开销 - 数据分片:避免热点问题 3. 性能优化 - Lua脚本:减少网络往返 - 批量操作:提高吞吐量 - 异步处理:避免阻塞 4. 监控运维 - 实时监控限流效果 - 动态调整限流参数 - 告警机制:及时发现问题 关键是在准确性和性能间找到平衡。
核心要点总结:
- ✅ 掌握各种限流算法的原理和实现
- ✅ 理解分布式限流的技术方案
- ✅ 能够设计基于注解的限流框架
- ✅ 了解限流系统的监控和运维要点
Java面试圣经 文章被收录于专栏
Java面试圣经