字节真题:高并发消息队列设计
面试重要程度:⭐⭐⭐⭐⭐
真题来源:字节跳动2024校招技术面试
考察重点:高并发架构设计、消息队列原理、系统优化
预计阅读时间:45分钟
真题背景
面试官: "我们需要设计一个高并发的消息队列系统,支持每秒百万级消息处理,要求消息不丢失、有序性保证、支持多种消费模式。请详细设计这个系统的架构,包括存储方案、网络通信、性能优化等方面。如果消息堆积严重怎么处理?"
考察意图:
- 高并发系统架构设计能力
- 消息队列核心原理的深度理解
- 性能优化和问题解决思路
- 分布式系统设计经验
🎯 系统架构设计
整体架构
/** * 高并发消息队列架构 */ @Component public class HighPerformanceMessageQueue { // 性能指标 public static final int MAX_TPS = 1_000_000; // 百万级TPS public static final int MAX_LATENCY_MS = 10; // 延迟<10ms public static final int REPLICATION_FACTOR = 3; // 3副本 /** * Broker节点 */ public static class BrokerNode { private String brokerId; private BrokerRole role; // MASTER, SLAVE private StorageEngine storageEngine; private NetworkServer networkServer; public SendResult handleSendMessage(SendMessageRequest request) { try { // 1. 验证消息 validateMessage(request.getMessage()); // 2. 选择队列 TopicQueue queue = selectQueue(request.getTopic()); // 3. 存储消息 MessageStoreResult result = storageEngine.storeMessage( queue.getQueueId(), request.getMessage()); // 4. 同步到从节点 if (role == BrokerRole.MASTER) { syncToSlaves(result); } return SendResult.success(result.getMessageId()); } catch (Exception e) { return SendResult.failed(e.getMessage()); } } } }
🚀 存储引擎设计
高性能存储方案
/** * 基于文件的存储引擎 */ @Component public class FileBasedStorageEngine { /** * CommitLog - 顺序写入的消息日志 */ public static class CommitLog { private final MappedByteBuffer mappedByteBuffer; private final AtomicLong wrotePosition; private final int fileSize = 1024 * 1024 * 1024; // 1GB /** * 追加消息到CommitLog */ public AppendMessageResult appendMessage(Message message) { try { ByteBuffer messageBuffer = serializeMessage(message); int messageSize = messageBuffer.remaining(); // 检查空间 if (wrotePosition.get() + messageSize > fileSize) { return AppendMessageResult.END_OF_FILE; } // 写入消息 long currentPos = wrotePosition.get(); mappedByteBuffer.position((int) currentPos); mappedByteBuffer.put(messageBuffer); wrotePosition.addAndGet(messageSize); return AppendMessageResult.success(currentPos, messageSize); } catch (Exception e) { return AppendMessageResult.failed(e.getMessage()); } } /** * 异步刷盘 */ public void flush() { if (mappedByteBuffer != null) { mappedByteBuffer.force(); } } } /** * ConsumeQueue - 消费队列索引 */ public static class ConsumeQueue { private final MappedByteBuffer mappedByteBuffer; private final AtomicLong maxOffset; // 每个索引项20字节:8字节偏移量 + 4字节大小 + 8字节Tag private static final int CQ_STORE_UNIT_SIZE = 20; public void putMessagePositionInfo(long offset, int size, long tagsCode) { ByteBuffer buffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); buffer.putLong(offset); buffer.putInt(size); buffer.putLong(tagsCode); buffer.flip(); long currentOffset = maxOffset.get(); mappedByteBuffer.position((int) (currentOffset * CQ_STORE_UNIT_SIZE)); mappedByteBuffer.put(buffer); maxOffset.incrementAndGet(); } } }
🌐 网络通信优化
高性能网络层
/** * 基于Netty的网络服务器 */ @Component public class NettyRemotingServer { private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; @PostConstruct public void start() { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(16); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, false) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast("decoder", new NettyDecoder()) .addLast("encoder", new NettyEncoder()) .addLast("handler", new NettyServerHandler()); } }); bootstrap.bind(8080).sync(); } /** * 消息处理器 */ public static class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { private final ExecutorService messageExecutor = new ThreadPoolExecutor( 16, 32, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000) ); @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand request) { messageExecutor.submit(() -> { try { RemotingCommand response = processRequest(ctx, request); if (response != null) { ctx.writeAndFlush(response); } } catch (Exception e) { ctx.writeAndFlush(createErrorResponse(request, e)); } }); } private RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { switch (request.getCode()) { case RequestCode.SEND_MESSAGE: return processSendMessage(ctx, request); case RequestCode.PULL_MESSAGE: return processPullMessage(ctx, request); default: return createErrorResponse(request, new UnsupportedOperationException()); } } } }
⚡ 性能优化策略
批量处理优化
/** * 批量处理优化 */ @Component public class BatchProcessingOptimization { /** * 批量发送优化 */ public static class BatchSendOptimizer { private final BlockingQueue<Message> messageBuffer = new LinkedBlockingQueue<>(); private final int batchSize = 100; private final long batchTimeoutMs = 10; public CompletableFuture<SendResult> sendMessage(Message message) { CompletableFuture<SendResult> future = new CompletableFuture<>(); BatchMessage batchMessage = new BatchMessage(message, future); try { messageBuffer.put(batchMessage); } catch (InterruptedException e) { future.completeExceptionally(e); } return future; } @Scheduled(fixedDelay = 10) public void processBatch() { List<BatchMessage> batch = new ArrayList<>(); messageBuffer.drainTo(batch, batchSize); if (!batch.isEmpty()) { try { List<Message> messages = batch.stream() .map(BatchMessage::getMessage) .collect(Collectors.toList()); List<SendResult> results = brokerService.sendMessageBatch(messages); for (int i = 0; i < batch.size(); i++) { batch.get(i).getFuture().complete(results.get(i)); } } catch (Exception e) { batch.forEach(bm -> bm.getFuture().completeExceptionally(e)); } } } } /** * 内存优化 - 对象池 */ public static class ObjectPoolOptimization { private final ObjectPool<ByteBuffer> byteBufferPool; public ObjectPoolOptimization() { this.byteBufferPool = new GenericObjectPool<>( new BasePooledObjectFactory<ByteBuffer>() { @Override public ByteBuffer create() { return ByteBuffer.allocateDirect(1024 * 1024); } @Override public PooledObject<ByteBuffer> wrap(ByteBuffer buffer) { return new DefaultPooledObject<>(buffer); } @Override public void passivateObject(PooledObject<ByteBuffer> p) { p.getObject().clear(); } } ); } public ByteBuffer borrowByteBuffer() { try { return byteBufferPool.borrowObject(); } catch (Exception e) { return ByteBuffer.allocateDirect(1024 * 1024); } } } }
🔥 消息堆积处理方案
堆积监控与处理
/** * 消息堆积处理 */ @Component public class MessageAccumulationHandler { private static final long ACCUMULATION_THRESHOLD = 10000; private static final long CRITICAL_THRESHOLD = 100000; /** * 堆积监控 */ @Scheduled(fixedRate = 30000) public void checkAccumulation() { for (TopicQueue queue : getAllQueues()) { long accumulation = queue.getMaxOffset() - queue.getConsumeOffset(); if (accumulation > ACCUMULATION_THRESHOLD) { handleAccumulation(queue, accumulation); } } } /** * 处理堆积 */ private void handleAccumulation(TopicQueue queue, long accumulation) { log.warn("Message accumulation detected: topic={}, accumulation={}", queue.getTopic(), accumulation); // 1. 发送告警 sendAccumulationAlert(queue, accumulation); // 2. 自动扩容消费者 if (accumulation > CRITICAL_THRESHOLD) { scaleUpConsumers(queue); } // 3. 启用快速消费模式 enableFastConsumeMode(queue); } /** * 自动扩容消费者 */ private void scaleUpConsumers(TopicQueue queue) { String consumerGroup = queue.getConsumerGroup(); int currentCount = getCurrentConsumerCount(consumerGroup); int targetCount = calculateTargetConsumerCount(queue); if (targetCount > currentCount) { int newConsumers = targetCount - currentCount; for (int i = 0; i < newConsumers; i++) { startNewConsumer(consumerGroup, queue.getTopic()); } } } /** * 快速消费模式 */ private void enableFastConsumeMode(TopicQueue queue) { // 增大批量拉取大小 queue.setBatchPullSize(1000); // 减少拉取间隔 queue.setPullInterval(1); // 启用并行消费 queue.setParallelConsume(true); } }
💡 面试回答要点
标准回答模板
第一部分:架构设计
"设计百万级TPS消息队列需要考虑: 1. 存储:基于文件的CommitLog + ConsumeQueue索引结构 2. 网络:Netty + 零拷贝 + 批量处理 3. 分布式:主从复制 + 分片 + 负载均衡 4. 可靠性:同步刷盘 + 多副本 + 故障转移"
第二部分:性能优化
"关键优化点: 1. 顺序写入:CommitLog顺序追加,避免随机IO 2. 内存映射:mmap减少用户态内核态切换 3. 批量处理:批量发送和消费提升吞吐量 4. 对象池:复用ByteBuffer等对象减少GC"
第三部分:堆积处理
"消息堆积处理策略: 1. 监控告警:实时监控堆积量,超阈值告警 2. 自动扩容:动态增加消费者实例 3. 快速消费:增大批量大小,启用并行消费 4. 限流降级:必要时对生产者限流"
核心要点总结:
- ✅ 深入理解高并发消息队列架构设计
- ✅ 掌握存储引擎和网络通信优化技术
- ✅ 具备性能调优和问题解决能力
- ✅ 理解分布式系统的可靠性保证机制
Java面试圣经 文章被收录于专栏
Java面试圣经