10.2 负载均衡与限流

面试重要程度:⭐⭐⭐⭐⭐

常见提问方式:如何设计负载均衡策略?限流算法有哪些?如何实现熔断降级?

预计阅读时间:40分钟

🎯 负载均衡策略设计

Nginx负载均衡配置

# nginx.conf 负载均衡配置
upstream backend_servers {
    # 轮询策略(默认)
    server 192.168.1.10:8080 weight=3;
    server 192.168.1.11:8080 weight=2;
    server 192.168.1.12:8080 weight=1;
    
    # 健康检查
    server 192.168.1.13:8080 backup;  # 备用服务器
    
    # 连接参数
    keepalive 32;                      # 保持连接数
    keepalive_requests 100;            # 每个连接最大请求数
    keepalive_timeout 60s;             # 连接超时时间
}

# IP Hash负载均衡
upstream ip_hash_backend {
    ip_hash;  # 基于客户端IP的哈希分配
    server 192.168.1.10:8080;
    server 192.168.1.11:8080;
    server 192.168.1.12:8080;
}

# 最少连接数负载均衡
upstream least_conn_backend {
    least_conn;  # 分配给连接数最少的服务器
    server 192.168.1.10:8080;
    server 192.168.1.11:8080;
    server 192.168.1.12:8080;
}

server {
    listen 80;
    server_name api.example.com;
    
    location /api/ {
        proxy_pass http://backend_servers;
        
        # 代理设置
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # 超时设置
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;
        
        # 重试设置
        proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
        proxy_next_upstream_tries 3;
        proxy_next_upstream_timeout 10s;
    }
}

应用层负载均衡实现

/**
 * 应用层负载均衡器
 */
@Component
public class ApplicationLoadBalancer {
    
    /**
     * 负载均衡策略接口
     */
    public interface LoadBalanceStrategy {
        ServerInstance select(List<ServerInstance> servers, String clientInfo);
    }
    
    /**
     * 轮询策略
     */
    @Component
    public static class RoundRobinStrategy implements LoadBalanceStrategy {
        
        private final AtomicInteger counter = new AtomicInteger(0);
        
        @Override
        public ServerInstance select(List<ServerInstance> servers, String clientInfo) {
            if (servers.isEmpty()) {
                return null;
            }
            
            int index = counter.getAndIncrement() % servers.size();
            return servers.get(index);
        }
    }
    
    /**
     * 加权轮询策略
     */
    @Component
    public static class WeightedRoundRobinStrategy implements LoadBalanceStrategy {
        
        private final ConcurrentHashMap<String, AtomicInteger> serverWeights = new ConcurrentHashMap<>();
        
        @Override
        public ServerInstance select(List<ServerInstance> servers, String clientInfo) {
            if (servers.isEmpty()) {
                return null;
            }
            
            // 计算总权重
            int totalWeight = servers.stream().mapToInt(ServerInstance::getWeight).sum();
            
            // 为每个服务器增加当前权重
            servers.forEach(server -> {
                String serverId = server.getId();
                AtomicInteger currentWeight = serverWeights.computeIfAbsent(serverId, 
                    k -> new AtomicInteger(0));
                currentWeight.addAndGet(server.getWeight());
            });
            
            // 选择当前权重最大的服务器
            ServerInstance selected = servers.stream()
                .max(Comparator.comparing(server -> 
                    serverWeights.get(server.getId()).get()))
                .orElse(null);
            
            if (selected != null) {
                // 减去总权重
                serverWeights.get(selected.getId()).addAndGet(-totalWeight);
            }
            
            return selected;
        }
    }
    
    /**
     * 一致性哈希策略
     */
    @Component
    public static class ConsistentHashStrategy implements LoadBalanceStrategy {
        
        private final TreeMap<Long, ServerInstance> hashRing = new TreeMap<>();
        private final int virtualNodes = 150; // 虚拟节点数
        
        public void addServer(ServerInstance server) {
            for (int i = 0; i < virtualNodes; i++) {
                String virtualNodeName = server.getId() + "&&VN" + i;
                long hash = hash(virtualNodeName);
                hashRing.put(hash, server);
            }
        }
        
        @Override
        public ServerInstance select(List<ServerInstance> servers, String clientInfo) {
            if (servers.isEmpty()) {
                return null;
            }
            
            long hash = hash(clientInfo);
            
            // 找到第一个大于等于该hash值的节点
            Map.Entry<Long, ServerInstance> entry = hashRing.ceilingEntry(hash);
            if (entry == null) {
                // 如果没有找到,返回第一个节点(环形)
                entry = hashRing.firstEntry();
            }
            
            return entry != null ? entry.getValue() : null;
        }
        
        private long hash(String key) {
            // 使用FNV1_32_HASH算法
            final int p = 16777619;
            int hash = (int) 2166136261L;
            for (int i = 0; i < key.length(); i++) {
                hash = (hash ^ key.charAt(i)) * p;
            }
            hash += hash << 13;
            hash ^= hash >> 7;
            hash += hash << 3;
            hash ^= hash >> 17;
            hash += hash << 5;
            return hash < 0 ? Math.abs(hash) : hash;
        }
    }
}

🚦 限流算法实现

令牌桶算法

/**
 * 令牌桶限流算法
 */
@Component
public class TokenBucketRateLimiter {
    
    /**
     * 令牌桶实现
     */
    public static class TokenBucket {
        
        private final long capacity;           // 桶容量
        private final long refillRate;         // 令牌生成速率(每秒)
        private final AtomicLong tokens;       // 当前令牌数
        private volatile long lastRefillTime;  // 上次补充时间
        private final ReentrantLock lock = new ReentrantLock();
        
        public TokenBucket(long capacity, long refillRate) {
            this.capacity = capacity;
            this.refillRate = refillRate;
            this.tokens = new AtomicLong(capacity);
            this.lastRefillTime = System.currentTimeMillis();
        }
        
        /**
         * 尝试获取令牌
         */
        public boolean tryAcquire() {
            return tryAcquire(1);
        }
        
        /**
         * 尝试获取指定数量的令牌
         */
        public boolean tryAcquire(long tokensRequested) {
            lock.lock();
            try {
                refill();
                
                if (tokens.get() >= tokensRequested) {
                    tokens.addAndGet(-tokensRequested);
                    return true;
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
        
        /**
         * 补充令牌
         */
        private void refill() {
            long currentTime = System.currentTimeMillis();
            long timePassed = currentTime - lastRefillTime;
            
            if (timePassed > 0) {
                long tokensToAdd = (timePassed * refillRate) / 1000;
                long newTokens = Math.min(capacity, tokens.get() + tokensToAdd);
                tokens.set(newTokens);
                lastRefillTime = currentTime;
            }
        }
    }
    
    /**
     * 分布式令牌桶(基于Redis)
     */
    @Component
    public static class DistributedTokenBucket {
        
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
        
        private final String luaScript = 
            "local key = KEYS[1]\n" +
            "local capacity = tonumber(ARGV[1])\n" +
            "local refillRate = tonumber(ARGV[2])\n" +
            "local tokensRequested = tonumber(ARGV[3])\n" +
            "local currentTime = tonumber(ARGV[4])\n" +
            "\n" +
            "local bucket = redis.call('HMGET', key, 'tokens', 'lastRefillTime')\n" +
            "local tokens = tonumber(bucket[1]) or capacity\n" +
            "local lastRefillTime = tonumber(bucket[2]) or currentTime\n" +
            "\n" +
            "-- 计算需要补充的令牌数\n" +
            "local timePassed = math.max(0, currentTime - lastRefillTime)\n" +
            "local tokensToAdd = math.floor((timePassed * refillRate) / 1000)\n" +
            "tokens = math.min(capacity, tokens + tokensToAdd)\n" +
            "\n" +
            "-- 检查是否有足够的令牌\n" +
            "if tokens >= tokensRequested then\n" +
            "    tokens = tokens - tokensRequested\n" +
            "    redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', currentTime)\n" +
            "    redis.call('EXPIRE', key, 3600)\n" +
            "    return 1\n" +
            "else\n" +
            "    redis.call('HMSET', key, 'tokens', tokens, 'lastRefillTime', currentTime)\n" +
            "    redis.call('EXPIRE', key, 3600)\n" +
            "    return 0\n" +
            "end";
        
        /**
         * 尝试获取令牌
         */
        public boolean tryAcquire(String key, long capacity, long refillRate, long tokensRequested) {
            Long result = redisTemplate.execute(
                new DefaultRedisScript<>(luaScript, Long.class),
                Collections.singletonList(key),
                String.valueOf(capacity),
                String.valueOf(refillRate),
                String.valueOf(tokensRequested),
                String.valueOf(System.currentTimeMillis())
            );
            
            return result != null && result == 1;
        }
    }
    
    /**
     * 限流注解
     */
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RateLimit {
        String key() default "";
        long capacity() default 100;
        long refillRate() default 10;
        long tokens() default 1;
        String fallback() default "";
    }
    
    /**
     * 限流切面
     */
    @Aspect
    @Component
    public static class RateLimitAspect {
        
        @Autowired
        private DistributedTokenBucket tokenBucket;
        
        @Around("@annotation(rateLimit)")
        public Object around(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
            String key = generateKey(joinPoint, rateLimit.key());
            
            boolean acquired = tokenBucket.tryAcquire(
                key, 
                rateLimit.capacity(), 
                rateLimit.refillRate(), 
                rateLimit.tokens()
            );
            
            if (acquired) {
                return joinPoint.proceed();
            } else {
                if (!rateLimit.fallback().isEmpty()) {
                    return executeFallback(joinPoint, rateLimit.fallback());
                } else {
                    throw new RateLimitException("Rate limit exceeded for key: " + key);
                }
            }
        }
    }
}

漏桶算法

/**
 * 漏桶限流算法
 */
@Component
public class LeakyBucketRateLimiter {
    
    /**
     * 漏桶实现
     */
    public static class LeakyBucket {
        
        private final long capacity;           // 桶容量
        private final long leakRate;           // 漏出速率(每秒)
        private final Queue<Long> requests;    // 请求队列
        private volatile long lastLeakTime;    // 上次漏出时间
        private final ReentrantLock lock = new ReentrantLock();
        
        public LeakyBucket(long capacity, long leakRate) {
            this.capacity = capacity;
            this.leakRate = leakRate;
            this.requests = new LinkedList<>();
            this.lastLeakTime = System.currentTimeMillis();
        }
        
        /**
         * 尝试添加请求
         */
        public boolean tryAdd() {
            lock.lock();
            try {
                leak();
                
                if (requests.size() < capacity) {
                    requests.offer(System.currentTimeMillis());
                    return true;
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
        
        /**
         * 漏出请求
         */
        private void leak() {
            long currentTime = System.currentTimeMillis();
            long timePassed = currentTime - lastLeakTime;
            
            if (timePassed > 0) {
                long requestsToLeak = (timePassed * leakRate) / 1000;
                
                for (int i = 0; i < requestsToLeak && !requests.isEmpty(); i++) {
                    requests.poll();
                }
                
                lastLeakTime = currentTime;
            }
        }
    }
}

🔥 熔断降级机制

熔断器实现

/**
 * 熔断器实现
 */
@Component
public class CircuitBreakerImplementation {
    
    /**
     * 熔断器状态
     */
    public enum CircuitBreakerState {
        CLOSED,    // 关闭状态,正常处理请求
        OPEN,      // 开启状态,直接返回失败
        HALF_OPEN  // 半开状态,允许少量请求通过
    }
    
    /**
     * 熔断器实现
     */
    public static class CircuitBreaker {
        
        private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED;
        private final AtomicInteger failureCount = new AtomicInteger(0);
        private final AtomicInteger successCount = new AtomicInteger(0);
        private final AtomicInteger requestCount = new AtomicInteger(0);
        private volatile long lastFailureTime = 0;
        
        // 配置参数
        private final int failureThreshold;      // 失败阈值
        private final int successThreshold;      // 成功阈值
        private final long timeout;              // 超时时间
        private final int minimumRequests;       // 最小请求数
        
        public CircuitBreaker(int failureThreshold, int successThreshold, 
                            long timeout, int minimumRequests) {
            this.failureThreshold = failureThreshold;
            this.successThreshold = successThreshold;
            this.timeout = timeout;
            this.minimumRequests = minimumRequests;
        }
        
        /**
         * 执行被保护的方法
         */
        public <T> T execute(Supplier<T> operation, Supplier<T> fallback) {
            if (!allowRequest()) {
                return fallback.get();
            }
            
            try {
                T result = operation.get();
                onSuccess();
                return result;
            } catch (Exception e) {
                onFailure();
                return fallback.get();
            }
        }
        
        /**
         * 是否允许请求通过
         */
        private boolean allowRequest() {
            switch (state) {
                case CLOSED:
                    return true;
                case OPEN:
                    return shouldAttemptReset();
                case HALF_OPEN:
                    return true;
                default:
                    return false;
            }
        }
        
        /**
         * 是否应该尝试重置
         */
        private boolean shouldAttemptReset() {
            return System.currentTimeMillis() - lastFailureTime >= timeout;
        }
        
        /**
         * 处理成功
         */
        private void onSuccess() {
            if (state == CircuitBreakerState.HALF_OPEN) {
                int currentSuccessCount = successCount.incrementAndGet();
                if (currentSuccessCount >= successThreshold) {
                    reset();
                }
            } else {
                reset();
            }
        }
        
        /**
         * 处理失败
         */
        private void onFailure() {
            int currentFailureCount = failureCount.incrementAndGet();
            int currentRequestCount = requestCount.incrementAndGet();
            
            lastFailureTime = System.currentTimeMillis();
            
            if (currentRequestCount >= minimumRequests) {
                double failureRate = (double) currentFailureCount / currentRequestCount;
                if (failureRate >= (double) failureThreshold / 100) {
                    tripCircuit();
                }
            }
        }
        
        /**
         * 触发熔断
         */
        private void tripCircuit() {
            state = CircuitBreakerState.OPEN;
            lastFailureTime = System.currentTimeMillis();
            log.warn("Circuit breaker opened due to high failure rate");
        }
        
        /**
         * 重置熔断器
         */
        private void reset() {
            state = CircuitBreakerState.CLOSED;
            failureCount.set(0);
            successCount.set(0);
            requestCount.set(0);
            log.info("Circuit breaker reset to closed state");
        }
    }
}

💡 面试回答要点

标准回答模板

第一部分:负载均衡策略

"负载均衡主要有四种策略:
1. 轮询:简单均匀分配,适合服务器性能相近的场景
2. 加权轮询:根据服务器性能分配权重,处理能力强的分配更多请求
3. 最少连接:分配给当前连接数最少的服务器,适合长连接场景
4. 一致性哈希:根据请求特征哈希分配,保证相同请求到同一服务器

Nginx层面配置upstream,应用层面可以基于注册中心实现。"

第二部分:限流算法对比

"常用限流算法有三种:
1. 令牌桶:允许突发流量,平滑限流,适合大部分场景
2. 漏桶:严格控制出口速率,流量整形,适合需要稳定输出的场景
3. 滑动窗口:精确控制时间窗口内的请求数,但内存占用较大

生产环境推荐令牌桶算法,结合Redis实现分布式限流。"

第三部分:熔断降级机制

"熔断器有三种状态:
1. 关闭状态:正常处理请求,统计失败率
2. 开启状态:直接返回失败,不调用下游服务
3. 半开状态:允许少量请求通过,测试服务是否恢复

关键参数:失败阈值、超时时间、最小请求数、成功阈值。
配合降级策略,提供兜底数据或默认响应。"

核心要点总结:

  • ✅ 掌握多种负载均衡策略的原理和适用场景
  • ✅ 理解限流算法的实现原理和性能特点
  • ✅ 具备熔断降级机制的设计和实现能力
  • ✅ 了解分布式环境下的流量控制方案
Java面试圣经 文章被收录于专栏

Java面试圣经

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务