18.8.4 消息队列削峰填谷应用场景
1. 削峰填谷原理与场景
1.1 削峰填谷基本概念
public class PeakShavingAndValleyFilling {
/*
* 削峰填谷原理:
*
* 1. 削峰 (Peak Shaving)
* - 将瞬时高并发请求缓存到消息队列
* - 后端系统按自身处理能力消费
* - 避免系统过载崩溃
*
* 2. 填谷 (Valley Filling)
* - 在系统空闲时处理积压消息
* - 充分利用系统资源
* - 提高整体吞吐量
*
* 3. 应用场景
* - 秒杀活动
* - 促销抢购
* - 批量数据处理
* - 流量突发场景
*/
public void demonstratePeakShaving() {
System.out.println("=== 削峰填谷应用场景演示 ===");
demonstrateFlashSaleScenario();
demonstrateBatchProcessingScenario();
demonstrateTrafficSpikeScenario();
}
private void demonstrateFlashSaleScenario() {
System.out.println("--- 秒杀活动削峰场景 ---");
FlashSaleSystem flashSaleSystem = new FlashSaleSystem();
System.out.println("1. 秒杀活动开始,瞬时大量请求:");
// 模拟瞬时大量秒杀请求
for (int i = 1; i <= 1000; i++) {
FlashSaleRequest request = new FlashSaleRequest("user-" + i, "product-seckill", 1);
flashSaleSystem.handleFlashSaleRequest(request);
if (i % 100 == 0) {
System.out.println(" 已接收请求数: " + i);
}
}
System.out.println("\n2. 后端系统按能力处理:");
flashSaleSystem.processQueuedRequests();
System.out.println("\n3. 削峰效果:");
System.out.println(" - 前端快速响应用户");
System.out.println(" - 后端稳定处理订单");
System.out.println(" - 避免系统崩溃");
}
private void demonstrateBatchProcessingScenario() {
System.out.println("\n--- 批量数据处理填谷场景 ---");
BatchProcessingSystem batchSystem = new BatchProcessingSystem();
System.out.println("1. 白天业务高峰期,数据写入队列:");
for (int i = 1; i <= 500; i++) {
DataProcessingTask task = new DataProcessingTask("task-" + i, "用户行为数据-" + i);
batchSystem.submitTask(task);
}
System.out.println("当前队列任务数: " + batchSystem.getQueueSize());
System.out.println("\n2. 夜间低峰期,批量处理数据:");
batchSystem.processTasksInBatch(50); // 批量处理50个任务
System.out.println("\n3. 填谷效果:");
System.out.println(" - 白天快速接收数据");
System.out.println(" - 夜间充分利用资源");
System.out.println(" - 提高整体处理效率");
}
private void demonstrateTrafficSpikeScenario() {
System.out.println("\n--- 流量突发削峰场景 ---");
TrafficSpikeHandler spikeHandler = new TrafficSpikeHandler();
System.out.println("1. 正常流量处理:");
spikeHandler.handleNormalTraffic(100); // 正常100 QPS
System.out.println("\n2. 流量突发:");
spikeHandler.handleTrafficSpike(5000); // 突发5000 QPS
System.out.println("\n3. 流量恢复:");
spikeHandler.handleNormalTraffic(100); // 恢复正常
System.out.println("\n4. 突发处理效果:");
System.out.println(" - 快速接收突发请求");
System.out.println(" - 平滑处理流量峰值");
System.out.println(" - 保护后端系统稳定");
}
}
// 秒杀请求
class FlashSaleRequest {
private String userId;
private String productId;
private int quantity;
private long timestamp;
public FlashSaleRequest(String userId, String productId, int quantity) {
this.userId = userId;
this.productId = productId;
this.quantity = quantity;
this.timestamp = System.currentTimeMillis();
}
public String getUserId() { return userId; }
public String getProductId() { return productId; }
public int getQuantity() { return quantity; }
public long getTimestamp() { return timestamp; }
}
// 秒杀系统
class FlashSaleSystem {
private java.util.Queue<FlashSaleRequest> requestQueue = new java.util.concurrent.ConcurrentLinkedQueue<>();
private int maxProcessingRate = 100; // 每秒最大处理100个请求
public void handleFlashSaleRequest(FlashSaleRequest request) {
// 快速将请求放入队列
requestQueue.offer(request);
// 立即返回响应给用户
if (requestQueue.size() % 100 == 1) {
System.out.println(" 请求已接收,排队处理中... 队列长度: " + requestQueue.size());
}
}
public void processQueuedRequests() {
System.out.println(" 开始处理队列中的请求,当前队列长度: " + requestQueue.size());
int processedCount = 0;
long startTime = System.currentTimeMillis();
while (!requestQueue.isEmpty() && processedCount < maxProcessingRate) {
FlashSaleRequest request = requestQueue.poll();
if (request != null) {
processFlashSaleOrder(request);
processedCount++;
}
}
long endTime = System.currentTimeMillis();
System.out.println(" 本轮处理完成: " + processedCount + "个请求,耗时: " + (endTime - startTime) + "ms");
System.out.println(" 剩余队列长度: " + requestQueue.size());
}
private void processFlashSaleOrder(FlashSaleRequest request) {
// 模拟订单处理逻辑
// 1. 检查库存
// 2. 创建订单
// 3. 扣减库存
// 4. 发送通知
try {
Thread.sleep(10); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 数据处理任务
class DataProcessingTask {
private String taskId;
private String data;
private long createTime;
public DataProcessingTask(String taskId, String data) {
this.taskId = taskId;
this.data = data;
this.createTime = System.currentTimeMillis();
}
public String getTaskId() { return taskId; }
public String getData() { return data; }
public long getCreateTime() { return createTime; }
}
// 批量处理系统
class BatchProcessingSystem {
private java.util.Queue<DataProcessingTask> taskQueue = new java.util.concurrent.ConcurrentLinkedQueue<>();
public void submitTask(DataProcessingTask task) {
taskQueue.offer(task);
if (taskQueue.size() % 100 == 1) {
System.out.println(" 任务已提交: " + task.getTaskId() + " 队列长度: " + taskQueue.size());
}
}
public int getQueueSize() {
return taskQueue.size();
}
public void processTasksInBatch(int batchSize) {
System.out.println(" 开始批量处理任务,批次大小: " + batchSize);
int processedCount = 0;
java.util.List<DataProcessingTask> batch = new java.util.ArrayList<>();
// 收集一批任务
while (!taskQueue.isEmpty() && batch.size() < batchSize) {
DataProcessingTask task = taskQueue.poll();
if (task != null) {
batch.add(task);
}
}
// 批量处理
if (!batch.isEmpty()) {
processBatch(batch);
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经
查看3道真题和解析