18.8.4 消息队列削峰填谷应用场景
1. 削峰填谷原理与场景
1.1 削峰填谷基本概念
public class PeakShavingAndValleyFilling { /* * 削峰填谷原理: * * 1. 削峰 (Peak Shaving) * - 将瞬时高并发请求缓存到消息队列 * - 后端系统按自身处理能力消费 * - 避免系统过载崩溃 * * 2. 填谷 (Valley Filling) * - 在系统空闲时处理积压消息 * - 充分利用系统资源 * - 提高整体吞吐量 * * 3. 应用场景 * - 秒杀活动 * - 促销抢购 * - 批量数据处理 * - 流量突发场景 */ public void demonstratePeakShaving() { System.out.println("=== 削峰填谷应用场景演示 ==="); demonstrateFlashSaleScenario(); demonstrateBatchProcessingScenario(); demonstrateTrafficSpikeScenario(); } private void demonstrateFlashSaleScenario() { System.out.println("--- 秒杀活动削峰场景 ---"); FlashSaleSystem flashSaleSystem = new FlashSaleSystem(); System.out.println("1. 秒杀活动开始,瞬时大量请求:"); // 模拟瞬时大量秒杀请求 for (int i = 1; i <= 1000; i++) { FlashSaleRequest request = new FlashSaleRequest("user-" + i, "product-seckill", 1); flashSaleSystem.handleFlashSaleRequest(request); if (i % 100 == 0) { System.out.println(" 已接收请求数: " + i); } } System.out.println("\n2. 后端系统按能力处理:"); flashSaleSystem.processQueuedRequests(); System.out.println("\n3. 削峰效果:"); System.out.println(" - 前端快速响应用户"); System.out.println(" - 后端稳定处理订单"); System.out.println(" - 避免系统崩溃"); } private void demonstrateBatchProcessingScenario() { System.out.println("\n--- 批量数据处理填谷场景 ---"); BatchProcessingSystem batchSystem = new BatchProcessingSystem(); System.out.println("1. 白天业务高峰期,数据写入队列:"); for (int i = 1; i <= 500; i++) { DataProcessingTask task = new DataProcessingTask("task-" + i, "用户行为数据-" + i); batchSystem.submitTask(task); } System.out.println("当前队列任务数: " + batchSystem.getQueueSize()); System.out.println("\n2. 夜间低峰期,批量处理数据:"); batchSystem.processTasksInBatch(50); // 批量处理50个任务 System.out.println("\n3. 填谷效果:"); System.out.println(" - 白天快速接收数据"); System.out.println(" - 夜间充分利用资源"); System.out.println(" - 提高整体处理效率"); } private void demonstrateTrafficSpikeScenario() { System.out.println("\n--- 流量突发削峰场景 ---"); TrafficSpikeHandler spikeHandler = new TrafficSpikeHandler(); System.out.println("1. 正常流量处理:"); spikeHandler.handleNormalTraffic(100); // 正常100 QPS System.out.println("\n2. 流量突发:"); spikeHandler.handleTrafficSpike(5000); // 突发5000 QPS System.out.println("\n3. 流量恢复:"); spikeHandler.handleNormalTraffic(100); // 恢复正常 System.out.println("\n4. 突发处理效果:"); System.out.println(" - 快速接收突发请求"); System.out.println(" - 平滑处理流量峰值"); System.out.println(" - 保护后端系统稳定"); } } // 秒杀请求 class FlashSaleRequest { private String userId; private String productId; private int quantity; private long timestamp; public FlashSaleRequest(String userId, String productId, int quantity) { this.userId = userId; this.productId = productId; this.quantity = quantity; this.timestamp = System.currentTimeMillis(); } public String getUserId() { return userId; } public String getProductId() { return productId; } public int getQuantity() { return quantity; } public long getTimestamp() { return timestamp; } } // 秒杀系统 class FlashSaleSystem { private java.util.Queue<FlashSaleRequest> requestQueue = new java.util.concurrent.ConcurrentLinkedQueue<>(); private int maxProcessingRate = 100; // 每秒最大处理100个请求 public void handleFlashSaleRequest(FlashSaleRequest request) { // 快速将请求放入队列 requestQueue.offer(request); // 立即返回响应给用户 if (requestQueue.size() % 100 == 1) { System.out.println(" 请求已接收,排队处理中... 队列长度: " + requestQueue.size()); } } public void processQueuedRequests() { System.out.println(" 开始处理队列中的请求,当前队列长度: " + requestQueue.size()); int processedCount = 0; long startTime = System.currentTimeMillis(); while (!requestQueue.isEmpty() && processedCount < maxProcessingRate) { FlashSaleRequest request = requestQueue.poll(); if (request != null) { processFlashSaleOrder(request); processedCount++; } } long endTime = System.currentTimeMillis(); System.out.println(" 本轮处理完成: " + processedCount + "个请求,耗时: " + (endTime - startTime) + "ms"); System.out.println(" 剩余队列长度: " + requestQueue.size()); } private void processFlashSaleOrder(FlashSaleRequest request) { // 模拟订单处理逻辑 // 1. 检查库存 // 2. 创建订单 // 3. 扣减库存 // 4. 发送通知 try { Thread.sleep(10); // 模拟处理时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } // 数据处理任务 class DataProcessingTask { private String taskId; private String data; private long createTime; public DataProcessingTask(String taskId, String data) { this.taskId = taskId; this.data = data; this.createTime = System.currentTimeMillis(); } public String getTaskId() { return taskId; } public String getData() { return data; } public long getCreateTime() { return createTime; } } // 批量处理系统 class BatchProcessingSystem { private java.util.Queue<DataProcessingTask> taskQueue = new java.util.concurrent.ConcurrentLinkedQueue<>(); public void submitTask(DataProcessingTask task) { taskQueue.offer(task); if (taskQueue.size() % 100 == 1) { System.out.println(" 任务已提交: " + task.getTaskId() + " 队列长度: " + taskQueue.size()); } } public int getQueueSize() { return taskQueue.size(); } public void processTasksInBatch(int batchSize) { System.out.println(" 开始批量处理任务,批次大小: " + batchSize); int processedCount = 0; java.util.List<DataProcessingTask> batch = new java.util.ArrayList<>(); // 收集一批任务 while (!taskQueue.isEmpty() && batch.size() < batchSize) { DataProcessingTask task = taskQueue.poll(); if (task != null) { batch.add(task); } } // 批量处理 if (!batch.isEmpty()) { processBatch(batch);
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经