10.2 负载均衡与限流
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式:如何设计负载均衡策略?限流算法有哪些?如何实现熔断降级?
预计阅读时间:40分钟
🎯 负载均衡策略设计
Nginx负载均衡配置
# nginx.conf 负载均衡配置 upstream backend_servers { # 轮询策略(默认) server 192.168.1.10:8080 weight=3; server 192.168.1.11:8080 weight=2; server 192.168.1.12:8080 weight=1; # 健康检查 server 192.168.1.13:8080 backup; # 备用服务器 # 连接参数 keepalive 32; # 保持连接数 keepalive_requests 100; # 每个连接最大请求数 keepalive_timeout 60s; # 连接超时时间 } # IP Hash负载均衡 upstream ip_hash_backend { ip_hash; # 基于客户端IP的哈希分配 server 192.168.1.10:8080; server 192.168.1.11:8080; server 192.168.1.12:8080; } # 最少连接数负载均衡 upstream least_conn_backend { least_conn; # 分配给连接数最少的服务器 server 192.168.1.10:8080; server 192.168.1.11:8080; server 192.168.1.12:8080; } server { listen 80; server_name api.example.com; location /api/ { proxy_pass http://backend_servers; # 代理设置 proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 超时设置 proxy_connect_timeout 5s; proxy_send_timeout 10s; proxy_read_timeout 10s; # 重试设置 proxy_next_upstream error timeout invalid_header http_500 http_502 http_503; proxy_next_upstream_tries 3; proxy_next_upstream_timeout 10s; } }
应用层负载均衡实现
/** * 应用层负载均衡器 */ @Component public class ApplicationLoadBalancer { /** * 负载均衡策略接口 */ public interface LoadBalanceStrategy { ServerInstance select(List<ServerInstance> servers, String clientInfo); } /** * 轮询策略 */ @Component public static class RoundRobinStrategy implements LoadBalanceStrategy { private final AtomicInteger counter = new AtomicInteger(0); @Override public ServerInstance select(List<ServerInstance> servers, String clientInfo) { if (servers.isEmpty()) { return null; } int index = counter.getAndIncrement() % servers.size(); return servers.get(index); } } /** * 加权轮询策略 */ @Component public static class WeightedRoundRobinStrategy implements LoadBalanceStrategy { private final ConcurrentHashMap<String, AtomicInteger> serverWeights = new ConcurrentHashMap<>(); @Override public ServerInstance select(List<ServerInstance> servers, String clientInfo) { if (servers.isEmpty()) { return null; } // 计算总权重 int totalWeight = servers.stream().mapToInt(ServerInstance::getWeight).sum(); // 为每个服务器增加当前权重 servers.forEach(server -> { String serverId = server.getId(); AtomicInteger currentWeight = serverWeights.computeIfAbsent(serverId, k -> new AtomicInteger(0)); currentWeight.addAndGet(server.getWeight()); }); // 选择当前权重最大的服务器 ServerInstance selected = servers.stream() .max(Comparator.comparing(server -> serverWeights.get(server.getId()).get())) .orElse(null); if (selected != null) { // 减去总权重 serverWeights.get(selected.getId()).addAndGet(-totalWeight); } return selected; } } /** * 一致性哈希策略 */ @Component public static class ConsistentHashStrategy implements LoadBalanceStrategy { private final TreeMap<Long, ServerInstance> hashRing = new TreeMap<>(); private final int virtualNodes = 150; // 虚拟节点数 public void addServer(ServerInstance server) { for (int i = 0; i < virtualNodes; i++) { String virtualNodeName = server.getId() + "&&VN" + i; long hash = hash(virtualNodeName); hashRing.put(hash, server); } } @Override public ServerInstance select(List<ServerInstance> servers, String clientInfo) { if (servers.isEmpty()) { return null; } long hash = hash(clientInfo); // 找到第一个大于等于该hash值的节点 Map.Entry<Long, ServerInstance> entry = hashRing.ceilingEntry(hash); if (entry == null) { // 如果没有找到,返回第一个节点(环形) entry = hashRing.firstEntry(); } return entry != null ? entry.getValue() : null; } private long hash(String key) { // 使用FNV1_32_HASH算法 final int p = 16777619; int hash = (int) 2166136261L; for (int i = 0; i < key.length(); i++) { hash = (hash ^ key.charAt(i)) * p; } hash += hash << 13; hash ^= hash >> 7; hash += hash << 3; hash ^= hash >> 17; hash += hash << 5; return hash < 0 ? Math.abs(hash) : hash; } } }
🚦 限流算法实现
令牌桶算法
/** * 令牌桶限流算法 */ @Component public class TokenBucketRateLimiter { /** * 令牌桶实现 */ public static class TokenBucket { private final long capacity; // 桶容量 private final long refillRate; // 令牌生成速率(每秒) private final AtomicLong tokens; // 当前令牌数 private volatile long lastRefillTime; // 上次补充时间 private final ReentrantLock lock = new ReentrantLock(); public TokenBucket(long capacity, long refillRate) { this.capacity = capacity; this.refillRate = refillRate; this.tokens = new AtomicLong(capacity); this.lastRefillTime = System.currentTimeMillis(); } /** * 尝试获取令牌 */ public boolean tryAcquire() { return tryAcquire(1); } /** * 尝试获取指定数量的令牌 */ public boolean tryAcquire(long tokensRequested) { lock.lock(); try { refill(); if (tokens.get() >= tokensRequested) { tokens.addAndGet(-tokensRequested); return true; } return false; } finally { lock.unlock(); } } /** * 补充令牌 */ private void refill() { long currentTime = System.currentTimeMillis(); long timePassed = currentTime - lastRefillTime; if (timePassed > 0) { long tokensToAdd = (timePassed * refillRate) / 1000; long newTokens = Math.min(capacity, tokens.get() + tokensToAdd); tokens.set(newTokens); lastRefillTime = currentTime; } } } /** * 分布式令牌桶(基于Redis) */ @Component public static class DistributedTokenBucket { @Autowired private RedisTemplate<String, String> redisTemplate; private final String luaScript = "local key = KEYS[1]\n" + "local capacity = tonumber(ARGV[1])\n" + "local refillRate = tonumber(ARGV[2])\n" + "local tokensRequested = tonumber(ARGV[3])\n" + "local currentTime = tonumber(ARGV[4])\n" + "\n" + "local bucket = redis.call('HMGET', key, 'tokens', 'lastRefillTime')\n" + "local tokens = tonumber(bucket[1]) or capacity\n" + "local lastRefillTime = tonumber(bucket[2]) or currentTime\n" + "\n" + "-- 计算需要补充的令牌数\n" + "local timePassed = math.max(0, currentTime - lastRefillTime)\n" + "local tokensToAdd = math.floor((timePassed * refillRate) / 1000)\n" + "tokens = math.min(capacity, tokens + tokensToAdd)\n" + "\n" + "-- 检查是否有足够的令牌\n" + "if tokens >= tokensRequested then\n" + " tokens = tokens - tokensRequested\n" + " redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', currentTime)\n" + " redis.call('EXPIRE', key, 3600)\n" + " return 1\n" + "else\n" + " redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', currentTime)\n" + " redis.call('EXPIRE', key, 3600)\n" + " return 0\n" + "end"; /** * 尝试获取令牌 */ public boolean tryAcquire(String key, long capacity, long refillRate, long tokensRequested) { Long result = redisTemplate.execute( new DefaultRedisScript<>(luaScript, Long.class), Collections.singletonList(key), String.valueOf(capacity), String.valueOf(refillRate), String.valueOf(tokensRequested), String.valueOf(System.currentTimeMillis()) ); return result != null && result == 1; } } /** * 限流注解 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RateLimit { String key() default ""; long capacity() default 100; long refillRate() default 10; long tokens() default 1; String fallback() default ""; } /** * 限流切面 */ @Aspect @Component public static class RateLimitAspect { @Autowired private DistributedTokenBucket tokenBucket; @Around("@annotation(rateLimit)") public Object around(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable { String key = generateKey(joinPoint, rateLimit.key()); boolean acquired = tokenBucket.tryAcquire( key, rateLimit.capacity(), rateLimit.refillRate(), rateLimit.tokens() ); if (acquired) { return joinPoint.proceed(); } else { if (!rateLimit.fallback().isEmpty()) { return executeFallback(joinPoint, rateLimit.fallback()); } else { throw new RateLimitException("Rate limit exceeded for key: " + key); } } } } }
漏桶算法
/** * 漏桶限流算法 */ @Component public class LeakyBucketRateLimiter { /** * 漏桶实现 */ public static class LeakyBucket { private final long capacity; // 桶容量 private final long leakRate; // 漏出速率(每秒) private final Queue<Long> requests; // 请求队列 private volatile long lastLeakTime; // 上次漏出时间 private final ReentrantLock lock = new ReentrantLock(); public LeakyBucket(long capacity, long leakRate) { this.capacity = capacity; this.leakRate = leakRate; this.requests = new LinkedList<>(); this.lastLeakTime = System.currentTimeMillis(); } /** * 尝试添加请求 */ public boolean tryAdd() { lock.lock(); try { leak(); if (requests.size() < capacity) { requests.offer(System.currentTimeMillis()); return true; } return false; } finally { lock.unlock(); } } /** * 漏出请求 */ private void leak() { long currentTime = System.currentTimeMillis(); long timePassed = currentTime - lastLeakTime; if (timePassed > 0) { long requestsToLeak = (timePassed * leakRate) / 1000; for (int i = 0; i < requestsToLeak && !requests.isEmpty(); i++) { requests.poll(); } lastLeakTime = currentTime; } } } }
🔥 熔断降级机制
熔断器实现
/** * 熔断器实现 */ @Component public class CircuitBreakerImplementation { /** * 熔断器状态 */ public enum CircuitBreakerState { CLOSED, // 关闭状态,正常处理请求 OPEN, // 开启状态,直接返回失败 HALF_OPEN // 半开状态,允许少量请求通过 } /** * 熔断器实现 */ public static class CircuitBreaker { private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED; private final AtomicInteger failureCount = new AtomicInteger(0); private final AtomicInteger successCount = new AtomicInteger(0); private final AtomicInteger requestCount = new AtomicInteger(0); private volatile long lastFailureTime = 0; // 配置参数 private final int failureThreshold; // 失败阈值 private final int successThreshold; // 成功阈值 private final long timeout; // 超时时间 private final int minimumRequests; // 最小请求数 public CircuitBreaker(int failureThreshold, int successThreshold, long timeout, int minimumRequests) { this.failureThreshold = failureThreshold; this.successThreshold = successThreshold; this.timeout = timeout; this.minimumRequests = minimumRequests; } /** * 执行被保护的方法 */ public <T> T execute(Supplier<T> operation, Supplier<T> fallback) { if (!allowRequest()) { return fallback.get(); } try { T result = operation.get(); onSuccess(); return result; } catch (Exception e) { onFailure(); return fallback.get(); } } /** * 是否允许请求通过 */ private boolean allowRequest() { switch (state) { case CLOSED: return true; case OPEN: return shouldAttemptReset(); case HALF_OPEN: return true; default: return false; } } /** * 是否应该尝试重置 */ private boolean shouldAttemptReset() { return System.currentTimeMillis() - lastFailureTime >= timeout; } /** * 处理成功 */ private void onSuccess() { if (state == CircuitBreakerState.HALF_OPEN) { int currentSuccessCount = successCount.incrementAndGet(); if (currentSuccessCount >= successThreshold) { reset(); } } else { reset(); } } /** * 处理失败 */ private void onFailure() { int currentFailureCount = failureCount.incrementAndGet(); int currentRequestCount = requestCount.incrementAndGet(); lastFailureTime = System.currentTimeMillis(); if (currentRequestCount >= minimumRequests) { double failureRate = (double) currentFailureCount / currentRequestCount; if (failureRate >= (double) failureThreshold / 100) { tripCircuit(); } } } /** * 触发熔断 */ private void tripCircuit() { state = CircuitBreakerState.OPEN; lastFailureTime = System.currentTimeMillis(); log.warn("Circuit breaker opened due to high failure rate"); } /** * 重置熔断器 */ private void reset() { state = CircuitBreakerState.CLOSED; failureCount.set(0); successCount.set(0); requestCount.set(0); log.info("Circuit breaker reset to closed state"); } } }
💡 面试回答要点
标准回答模板
第一部分:负载均衡策略
"负载均衡主要有四种策略: 1. 轮询:简单均匀分配,适合服务器性能相近的场景 2. 加权轮询:根据服务器性能分配权重,处理能力强的分配更多请求 3. 最少连接:分配给当前连接数最少的服务器,适合长连接场景 4. 一致性哈希:根据请求特征哈希分配,保证相同请求到同一服务器 Nginx层面配置upstream,应用层面可以基于注册中心实现。"
第二部分:限流算法对比
"常用限流算法有三种: 1. 令牌桶:允许突发流量,平滑限流,适合大部分场景 2. 漏桶:严格控制出口速率,流量整形,适合需要稳定输出的场景 3. 滑动窗口:精确控制时间窗口内的请求数,但内存占用较大 生产环境推荐令牌桶算法,结合Redis实现分布式限流。"
第三部分:熔断降级机制
"熔断器有三种状态: 1. 关闭状态:正常处理请求,统计失败率 2. 开启状态:直接返回失败,不调用下游服务 3. 半开状态:允许少量请求通过,测试服务是否恢复 关键参数:失败阈值、超时时间、最小请求数、成功阈值。 配合降级策略,提供兜底数据或默认响应。"
核心要点总结:
- ✅ 掌握多种负载均衡策略的原理和适用场景
- ✅ 理解限流算法的实现原理和性能特点
- ✅ 具备熔断降级机制的设计和实现能力
- ✅ 了解分布式环境下的流量控制方案
Java面试圣经 文章被收录于专栏
Java面试圣经