Java秋招面试之消息队列与分布式话题
第9章 消息队列与分布式
面试重要程度:⭐⭐⭐⭐
常见提问方式:消息可靠性保证、分布式事务、服务治理
预计阅读时间:40分钟
开场白
兄弟,消息队列和分布式系统是现代后端架构的核心!随着微服务架构的普及,这块知识在面试中的重要性越来越高。不管是RabbitMQ、Kafka,还是分布式事务、服务治理,都是面试官爱考的重点。
今天我们就把这些分布式核心技术搞透,让你在面试中展现出对大型系统架构的深度理解。
📨 9.1 消息队列选型对比
RabbitMQ vs Kafka vs RocketMQ
面试必问:
面试官:"常见的消息队列有哪些?各自的特点和适用场景是什么?"
三大消息队列对比:
开发语言 |
Erlang |
Scala/Java |
Java |
协议支持 |
AMQP, STOMP, MQTT |
自定义协议 |
自定义协议 |
消息顺序 |
支持 |
分区内有序 |
支持 |
消息可靠性 |
高 |
高 |
高 |
性能 |
万级QPS |
十万级QPS |
十万级QPS |
延时 |
微秒级 |
毫秒级 |
毫秒级 |
可用性 |
高(镜像队列) |
高(副本机制) |
高(主从架构) |
消息回溯 |
不支持 |
支持 |
支持 |
消息过滤 |
不支持 |
不支持 |
支持 |
事务消息 |
支持 |
支持 |
支持 |
定时消息 |
插件支持 |
不支持 |
支持 |
使用场景分析:
// RabbitMQ适用场景 /* 1. 业务逻辑复杂,需要多种消息模式 2. 对消息可靠性要求极高 3. 需要复杂的路由规则 4. 系统规模中等,QPS要求不是特别高 */ // Kafka适用场景 /* 1. 大数据处理,日志收集 2. 高吞吐量场景 3. 流式数据处理 4. 需要消息回溯功能 */ // RocketMQ适用场景 /* 1. 电商、金融等对可靠性要求高的场景 2. 需要事务消息 3. 需要定时消息、延时消息 4. 需要消息过滤功能 */
RabbitMQ核心概念
AMQP模型:
@Configuration @EnableRabbit public class RabbitMQConfig { // 1. 直连交换机(Direct Exchange) @Bean public DirectExchange directExchange() { return new DirectExchange("direct.exchange", true, false); } // 2. 主题交换机(Topic Exchange) @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.exchange", true, false); } // 3. 扇形交换机(Fanout Exchange) @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.exchange", true, false); } // 4. 头部交换机(Headers Exchange) @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.exchange", true, false); } // 队列定义 @Bean public Queue orderQueue() { return QueueBuilder.durable("order.queue") .withArgument("x-message-ttl", 60000) // 消息TTL .withArgument("x-max-length", 1000) // 队列最大长度 .build(); } @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("order.dlq").build(); } // 绑定关系 @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(directExchange()) .with("order.create"); } // 死信队列绑定 @Bean public Queue orderQueueWithDLX() { return QueueBuilder.durable("order.queue.dlx") .withArgument("x-dead-letter-exchange", "dlx.exchange") .withArgument("x-dead-letter-routing-key", "order.dead") .build(); } }
消息生产者:
@Service public class OrderMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; // 发送普通消息 public void sendOrderMessage(Order order) { rabbitTemplate.convertAndSend("direct.exchange", "order.create", order); } // 发送延时消息 public void sendDelayMessage(Order order, int delaySeconds) { rabbitTemplate.convertAndSend("direct.exchange", "order.delay", order, message -> { message.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000)); return message; }); } // 发送事务消息 @Transactional public void sendTransactionalMessage(Order order) { // 本地事务操作 orderService.saveOrder(order); // 发送消息(在同一事务中) rabbitTemplate.convertAndSend("direct.exchange", "order.create", order); } // 发送确认消息 public void sendConfirmMessage(Order order) { // 设置确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息发送成功: {}", correlationData); } else { log.error("消息发送失败: {}, 原因: {}", correlationData, cause); // 重试或记录失败日志 } }); // 设置返回回调 rabbitTemplate.setReturnsCallback(returned -> { log.error("消息被退回: {}", returned); // 处理被退回的消息 }); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("direct.exchange", "order.create", order, correlationData); } }
消息消费者:
@Component public class OrderMessageConsumer { @Autowired private OrderService orderService; // 基本消费 @RabbitListener(queues = "order.queue") public void handleOrderMessage(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { log.info("收到订单消息: {}", order); orderService.processOrder(order); // 手动确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("处理订单消息失败", e); try { // 拒绝消息,重新入队 channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("拒绝消息失败", ioException); } } } // 批量消费 @RabbitListener(queues = "order.batch.queue", containerFactory = "batchRabbitListenerContainerFactory") public void handleBatchOrderMessages(List<Order> orders) { log.info("批量处理订单消息,数量: {}", orders.size()); orderService.batchProcessOrders(orders); } // 死信队列消费 @RabbitListener(queues = "order.dlq") public void handleDeadLetterMessage(Order order, @Header Map<String, Object> headers) { log.warn("处理死信消息: {}, headers: {}", order, headers); // 记录失败原因 String reason = (String) headers.get("x-first-death-reason"); orderService.recordFailedOrder(order, reason); // 可以选择重试或人工处理 } }
Kafka核心概念
Kafka架构:
@Configuration @EnableKafka public class KafkaConfig { // 生产者配置 @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 可靠性配置 props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性 // 性能配置 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小 props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待时间 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小 return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } // 消费者配置 @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // 消费配置 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次拉取记录数 return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 并发配置 factory.setConcurrency(3); // 3个消费者线程 // 手动确认模式 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
Kafka生产者:
@Service public class KafkaOrderProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 public void sendOrderEvent(String topic, Order order) { kafkaTemplate.send(topic, order.getId().toString(), order) .addCallback( result -> log.info("消息发送成功: {}", result), failure -> log.error("消息发送失败", failure) ); } // 发送事务消息 @Transactional public void sendTransactionalOrderEvent(Order order) { kafkaTemplate.executeInTransaction(operations -> { // 发送多个相关消息 operations.send("order-created", order.getId().toString(), order); operations.send("inventory-update", order.getProductId().toString(), new InventoryUpdateEvent(order.getProductId(), -order.getQuantity())); operations.send("payment-request", order.getId().toString(), new PaymentRequestEvent(order.getId(), order.getAmount())); return true; }); } // 发送分区消息 public void sendPartitionedMessage(Order order) { // 根据用户ID分区,保证同一用户的消息有序 int partition = Math.abs(order.getUserId().hashCode()) % 3; kafkaTemplate.send("order-events", partition, order.getId().toString(), order); } }
Kafka消费者:
@Component public class KafkaOrderConsumer { @Autowired private OrderService orderService; // 基本消费 @KafkaListener(topics = "order-created", groupId = "order-service") public void handleOrderCreated(ConsumerRecord<String, Order> record, Acknowledgment ack) { try { Order order = record.value(); log.info("处理订单创建事件: partition={}, offset={}, order={}", record.partition(), record.offset(), order); orderService.handleOrderCreated(order); // 手动确认 ack.acknowledge(); } catch (Exception e) { log.error("处理订单创建事件失败", e); // 不确认,消息会重新消费 } } // 批量消费 @KafkaListener(topics = "order-events", containerFactory = "batchKafkaListenerContainerFactory") public void handleOrderEventsBatch(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) { log.info("批量处理订单事件,数量: {}", records.size()); List<Order> orders = records.stream() .map(ConsumerRecord::value) .collect(Collectors.toList()); orderService.batchProcessOrders(orders); ack.acknowledge(); } // 错误处理 @KafkaListener(topics = "order-created") public void handleOrderCreatedWithRetry(Order order, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { try { orderService.handleOrderCreated(order); } catch (RetryableException e) { log.warn("处理失败,将重试: topic={}, partition={}, offset={}", topic, partition, offset); throw e; // 抛出异常,触发重试 } catch (Exception e) { log.error("处理失败,发送到死信队列: topic={}, partition={}, offset={}", topic, partition, offset); sendToDeadLetterTopic(order, e); } } private void sendToDeadLetterTopic(Order order, Exception e) { // 发送到死信主题 kafkaTemplate.send("order-created-dlt", order.getId().toString(), new DeadLetterMessage(order, e.getMessage())); } }
🔄 9.2 分布式事务解决方案
2PC、3PC、TCC、Saga模式
面试重点:
面试官:"分布式事务有哪些解决方案?各自的优缺点是什么?"
分布式事务对比:
2PC |
强一致 |
低 |
低 |
低 |
小规模系统 |
3PC |
强一致 |
中 |
低 |
中 |
改进2PC |
TCC |
最终一致 |
高 |
中 |
高 |
金融支付 |
Saga |
最终一致 |
高 |
高 |
中 |
长流程业务 |
消息事务 |
最终一致 |
高 |
高 |
中 |
异步场景 |
TCC模式实现
TCC框架实现:
// TCC接口定义 public interface TccTransaction { /** * Try阶段:尝试执行业务,预留资源 */ boolean tryExecute(TccContext context); /** * Confirm阶段:确认执行业务,提交资源 */ boolean confirmExecute(TccContext context); /** * Cancel阶段:取消执行业务,释放资源 */ boolean cancelExecute(TccContext context); } // 账户服务TCC实现 @Service public class AccountTccService implements TccTransaction { @Autowired private AccountMapper accountMapper; @Autowired private AccountFreezeMapper freezeMapper; @Override public boolean tryExecute(TccContext context) { TransferRequest request = context.getRequest(TransferRequest.class); try { // 1. 检查账户余额 Account account = accountMapper.selectById(request.getFromAccountId()); if (account.getBalance().compareTo(request.getAmount()) < 0) { return false; // 余额不足 } // 2. 冻结资金 AccountFreeze freeze = new AccountFreeze(); freeze.setTransactionId(context.getTransactionId()); freeze.setAccountId(request.getFromAccountId()); freeze.setAmount(request.getAmount()); freeze.setStatus(FreezeStatus.FROZEN); freeze.setCreateTime(LocalDateTime.now()); freezeMapper.insert(freeze); // 3. 更新账户余额(预扣) account.setBalance(account.getBalance().subtract(request.getAmount())); account.setFrozenAmount(account.getFrozenAmount().add(request.getAmount())); accountMapper.updateById(account); log.info("TCC Try阶段成功: transactionId={}, amount={}", context.getTransactionId(), request.getAmount()); return true; } catch (Exception e) { log.error("TCC Try阶段失败", e); return false; } } @Override public boolean confirmExecute(TccContext context) { String transactionId = context.getTransactionId(); try { // 1. 查询冻结记录 AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId); if (freeze == null || freeze.getStatus() == FreezeStatus.CONFIRMED) { return true; // 幂等处理 } // 2. 确认扣款,清除冻结金额 Account account = accountMapper.selectById(freeze.getAccountId()); account.setFrozenAmount(account.getFrozenAmount().subtract(freeze.getAmount())); accountMapper.updateById(account); // 3. 更新冻结记录状态 freeze.setStatus(FreezeStatus.CONFIRMED); freeze.setUpdateTime(LocalDateTime.now()); freezeMapper.updateById(freeze); log.info("TCC Confirm阶段成功: transactionId={}", transactionId); return true; } catch (Exception e) { log.error("TCC Confirm阶段失败", e); return false; } } @Override public boolean cancelExecute(TccContext context) { String transactionId = context.getTransactionId(); try { // 1. 查询冻结记录 AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId); if (freeze == null || freeze.getStatus() == FreezeStatus
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经