18.8.4 消息队列削峰填谷应用场景

1. 削峰填谷原理与场景

1.1 削峰填谷基本概念

public class PeakShavingAndValleyFilling {
    
    /*
     * 削峰填谷原理:
     * 
     * 1. 削峰 (Peak Shaving)
     *    - 将瞬时高并发请求缓存到消息队列
     *    - 后端系统按自身处理能力消费
     *    - 避免系统过载崩溃
     * 
     * 2. 填谷 (Valley Filling)
     *    - 在系统空闲时处理积压消息
     *    - 充分利用系统资源
     *    - 提高整体吞吐量
     * 
     * 3. 应用场景
     *    - 秒杀活动
     *    - 促销抢购
     *    - 批量数据处理
     *    - 流量突发场景
     */
    
    public void demonstratePeakShaving() {
        System.out.println("=== 削峰填谷应用场景演示 ===");
        
        demonstrateFlashSaleScenario();
        demonstrateBatchProcessingScenario();
        demonstrateTrafficSpikeScenario();
    }
    
    private void demonstrateFlashSaleScenario() {
        System.out.println("--- 秒杀活动削峰场景 ---");
        
        FlashSaleSystem flashSaleSystem = new FlashSaleSystem();
        
        System.out.println("1. 秒杀活动开始,瞬时大量请求:");
        
        // 模拟瞬时大量秒杀请求
        for (int i = 1; i <= 1000; i++) {
            FlashSaleRequest request = new FlashSaleRequest("user-" + i, "product-seckill", 1);
            flashSaleSystem.handleFlashSaleRequest(request);
            
            if (i % 100 == 0) {
                System.out.println("  已接收请求数: " + i);
            }
        }
        
        System.out.println("\n2. 后端系统按能力处理:");
        flashSaleSystem.processQueuedRequests();
        
        System.out.println("\n3. 削峰效果:");
        System.out.println("   - 前端快速响应用户");
        System.out.println("   - 后端稳定处理订单");
        System.out.println("   - 避免系统崩溃");
    }
    
    private void demonstrateBatchProcessingScenario() {
        System.out.println("\n--- 批量数据处理填谷场景 ---");
        
        BatchProcessingSystem batchSystem = new BatchProcessingSystem();
        
        System.out.println("1. 白天业务高峰期,数据写入队列:");
        for (int i = 1; i <= 500; i++) {
            DataProcessingTask task = new DataProcessingTask("task-" + i, "用户行为数据-" + i);
            batchSystem.submitTask(task);
        }
        
        System.out.println("当前队列任务数: " + batchSystem.getQueueSize());
        
        System.out.println("\n2. 夜间低峰期,批量处理数据:");
        batchSystem.processTasksInBatch(50); // 批量处理50个任务
        
        System.out.println("\n3. 填谷效果:");
        System.out.println("   - 白天快速接收数据");
        System.out.println("   - 夜间充分利用资源");
        System.out.println("   - 提高整体处理效率");
    }
    
    private void demonstrateTrafficSpikeScenario() {
        System.out.println("\n--- 流量突发削峰场景 ---");
        
        TrafficSpikeHandler spikeHandler = new TrafficSpikeHandler();
        
        System.out.println("1. 正常流量处理:");
        spikeHandler.handleNormalTraffic(100); // 正常100 QPS
        
        System.out.println("\n2. 流量突发:");
        spikeHandler.handleTrafficSpike(5000); // 突发5000 QPS
        
        System.out.println("\n3. 流量恢复:");
        spikeHandler.handleNormalTraffic(100); // 恢复正常
        
        System.out.println("\n4. 突发处理效果:");
        System.out.println("   - 快速接收突发请求");
        System.out.println("   - 平滑处理流量峰值");
        System.out.println("   - 保护后端系统稳定");
    }
}

// 秒杀请求
class FlashSaleRequest {
    private String userId;
    private String productId;
    private int quantity;
    private long timestamp;
    
    public FlashSaleRequest(String userId, String productId, int quantity) {
        this.userId = userId;
        this.productId = productId;
        this.quantity = quantity;
        this.timestamp = System.currentTimeMillis();
    }
    
    public String getUserId() { return userId; }
    public String getProductId() { return productId; }
    public int getQuantity() { return quantity; }
    public long getTimestamp() { return timestamp; }
}

// 秒杀系统
class FlashSaleSystem {
    private java.util.Queue<FlashSaleRequest> requestQueue = new java.util.concurrent.ConcurrentLinkedQueue<>();
    private int maxProcessingRate = 100; // 每秒最大处理100个请求
    
    public void handleFlashSaleRequest(FlashSaleRequest request) {
        // 快速将请求放入队列
        requestQueue.offer(request);
        
        // 立即返回响应给用户
        if (requestQueue.size() % 100 == 1) {
            System.out.println("  请求已接收,排队处理中... 队列长度: " + requestQueue.size());
        }
    }
    
    public void processQueuedRequests() {
        System.out.println("  开始处理队列中的请求,当前队列长度: " + requestQueue.size());
        
        int processedCount = 0;
        long startTime = System.currentTimeMillis();
        
        while (!requestQueue.isEmpty() && processedCount < maxProcessingRate) {
            FlashSaleRequest request = requestQueue.poll();
            if (request != null) {
                processFlashSaleOrder(request);
                processedCount++;
            }
        }
        
        long endTime = System.currentTimeMillis();
        System.out.println("  本轮处理完成: " + processedCount + "个请求,耗时: " + (endTime - startTime) + "ms");
        System.out.println("  剩余队列长度: " + requestQueue.size());
    }
    
    private void processFlashSaleOrder(FlashSaleRequest request) {
        // 模拟订单处理逻辑
        // 1. 检查库存
        // 2. 创建订单
        // 3. 扣减库存
        // 4. 发送通知
        
        try {
            Thread.sleep(10); // 模拟处理时间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 数据处理任务
class DataProcessingTask {
    private String taskId;
    private String data;
    private long createTime;
    
    public DataProcessingTask(String taskId, String data) {
        this.taskId = taskId;
        this.data = data;
        this.createTime = System.currentTimeMillis();
    }
    
    public String getTaskId() { return taskId; }
    public String getData() { return data; }
    public long getCreateTime() { return createTime; }
}

// 批量处理系统
class BatchProcessingSystem {
    private java.util.Queue<DataProcessingTask> taskQueue = new java.util.concurrent.ConcurrentLinkedQueue<>();
    
    public void submitTask(DataProcessingTask task) {
        taskQueue.offer(task);
        
        if (taskQueue.size() % 100 == 1) {
            System.out.println("  任务已提交: " + task.getTaskId() + " 队列长度: " + taskQueue.size());
        }
    }
    
    public int getQueueSize() {
        return taskQueue.size();
    }
    
    public void processTasksInBatch(int batchSize) {
        System.out.println("  开始批量处理任务,批次大小: " + batchSize);
        
        int processedCount = 0;
        java.util.List<DataProcessingTask> batch = new java.util.ArrayList<>();
        
        // 收集一批任务
        while (!taskQueue.isEmpty() && batch.size() < batchSize) {
            DataProcessingTask task = taskQueue.poll();
            if (task != null) {
                batch.add(task);
            }
        }
        
        // 批量处理
        if (!batch.isEmpty()) {
            processBatch(batch);
            

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java面试圣经 文章被收录于专栏

Java面试圣经,带你练透java圣经

全部评论

相关推荐

面我面我面我_秋招版:不是戈门,干哪来了,这就是java嘛
点赞 评论 收藏
分享
今天 19:25
已编辑
太原理工大学 游戏测试
叁六玖:公司名发我,我要这个HR带我打瓦
我的秋招日记
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务