Java秋招面试之消息队列与分布式话题

第9章 消息队列与分布式

面试重要程度:⭐⭐⭐⭐

常见提问方式:消息可靠性保证、分布式事务、服务治理

预计阅读时间:40分钟

开场白

兄弟,消息队列和分布式系统是现代后端架构的核心!随着微服务架构的普及,这块知识在面试中的重要性越来越高。不管是RabbitMQ、Kafka,还是分布式事务、服务治理,都是面试官爱考的重点。

今天我们就把这些分布式核心技术搞透,让你在面试中展现出对大型系统架构的深度理解。

📨 9.1 消息队列选型对比

RabbitMQ vs Kafka vs RocketMQ

面试必问:

面试官:"常见的消息队列有哪些?各自的特点和适用场景是什么?"

三大消息队列对比:

开发语言

Erlang

Scala/Java

Java

协议支持

AMQP, STOMP, MQTT

自定义协议

自定义协议

消息顺序

支持

分区内有序

支持

消息可靠性

性能

万级QPS

十万级QPS

十万级QPS

延时

微秒级

毫秒级

毫秒级

可用性

高(镜像队列)

高(副本机制)

高(主从架构)

消息回溯

不支持

支持

支持

消息过滤

不支持

不支持

支持

事务消息

支持

支持

支持

定时消息

插件支持

不支持

支持

使用场景分析:

// RabbitMQ适用场景
/*
1. 业务逻辑复杂,需要多种消息模式
2. 对消息可靠性要求极高
3. 需要复杂的路由规则
4. 系统规模中等,QPS要求不是特别高
*/

// Kafka适用场景  
/*
1. 大数据处理,日志收集
2. 高吞吐量场景
3. 流式数据处理
4. 需要消息回溯功能
*/

// RocketMQ适用场景
/*
1. 电商、金融等对可靠性要求高的场景
2. 需要事务消息
3. 需要定时消息、延时消息
4. 需要消息过滤功能
*/

RabbitMQ核心概念

AMQP模型:

@Configuration
@EnableRabbit
public class RabbitMQConfig {
    
    // 1. 直连交换机(Direct Exchange)
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct.exchange", true, false);
    }
    
    // 2. 主题交换机(Topic Exchange)
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic.exchange", true, false);
    }
    
    // 3. 扇形交换机(Fanout Exchange)
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange", true, false);
    }
    
    // 4. 头部交换机(Headers Exchange)
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers.exchange", true, false);
    }
    
    // 队列定义
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            .withArgument("x-message-ttl", 60000)  // 消息TTL
            .withArgument("x-max-length", 1000)    // 队列最大长度
            .build();
    }
    
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("order.dlq").build();
    }
    
    // 绑定关系
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(directExchange())
            .with("order.create");
    }
    
    // 死信队列绑定
    @Bean
    public Queue orderQueueWithDLX() {
        return QueueBuilder.durable("order.queue.dlx")
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .withArgument("x-dead-letter-routing-key", "order.dead")
            .build();
    }
}

消息生产者:

@Service
public class OrderMessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 发送普通消息
    public void sendOrderMessage(Order order) {
        rabbitTemplate.convertAndSend("direct.exchange", "order.create", order);
    }
    
    // 发送延时消息
    public void sendDelayMessage(Order order, int delaySeconds) {
        rabbitTemplate.convertAndSend("direct.exchange", "order.delay", order, message -> {
            message.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));
            return message;
        });
    }
    
    // 发送事务消息
    @Transactional
    public void sendTransactionalMessage(Order order) {
        // 本地事务操作
        orderService.saveOrder(order);
        
        // 发送消息(在同一事务中)
        rabbitTemplate.convertAndSend("direct.exchange", "order.create", order);
    }
    
    // 发送确认消息
    public void sendConfirmMessage(Order order) {
        // 设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息发送成功: {}", correlationData);
            } else {
                log.error("消息发送失败: {}, 原因: {}", correlationData, cause);
                // 重试或记录失败日志
            }
        });
        
        // 设置返回回调
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("消息被退回: {}", returned);
            // 处理被退回的消息
        });
        
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("direct.exchange", "order.create", order, correlationData);
    }
}

消息消费者:

@Component
public class OrderMessageConsumer {
    
    @Autowired
    private OrderService orderService;
    
    // 基本消费
    @RabbitListener(queues = "order.queue")
    public void handleOrderMessage(Order order, Channel channel, 
                                  @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            log.info("收到订单消息: {}", order);
            orderService.processOrder(order);
            
            // 手动确认
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("处理订单消息失败", e);
            try {
                // 拒绝消息,重新入队
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ioException) {
                log.error("拒绝消息失败", ioException);
            }
        }
    }
    
    // 批量消费
    @RabbitListener(queues = "order.batch.queue", 
                   containerFactory = "batchRabbitListenerContainerFactory")
    public void handleBatchOrderMessages(List<Order> orders) {
        log.info("批量处理订单消息,数量: {}", orders.size());
        orderService.batchProcessOrders(orders);
    }
    
    // 死信队列消费
    @RabbitListener(queues = "order.dlq")
    public void handleDeadLetterMessage(Order order, 
                                       @Header Map<String, Object> headers) {
        log.warn("处理死信消息: {}, headers: {}", order, headers);
        
        // 记录失败原因
        String reason = (String) headers.get("x-first-death-reason");
        orderService.recordFailedOrder(order, reason);
        
        // 可以选择重试或人工处理
    }
}

Kafka核心概念

Kafka架构:

@Configuration
@EnableKafka
public class KafkaConfig {
    
    // 生产者配置
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // 可靠性配置
        props.put(ProducerConfig.ACKS_CONFIG, "all");  // 等待所有副本确认
        props.put(ProducerConfig.RETRIES_CONFIG, 3);   // 重试次数
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
        
        // 性能配置
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  // 批次大小
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);      // 等待时间
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
        
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    // 消费者配置
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        
        // 消费配置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);     // 每次拉取记录数
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // 并发配置
        factory.setConcurrency(3); // 3个消费者线程
        
        // 手动确认模式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        
        return factory;
    }
}

Kafka生产者:

@Service
public class KafkaOrderProducer {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    // 发送消息
    public void sendOrderEvent(String topic, Order order) {
        kafkaTemplate.send(topic, order.getId().toString(), order)
            .addCallback(
                result -> log.info("消息发送成功: {}", result),
                failure -> log.error("消息发送失败", failure)
            );
    }
    
    // 发送事务消息
    @Transactional
    public void sendTransactionalOrderEvent(Order order) {
        kafkaTemplate.executeInTransaction(operations -> {
            // 发送多个相关消息
            operations.send("order-created", order.getId().toString(), order);
            operations.send("inventory-update", order.getProductId().toString(), 
                new InventoryUpdateEvent(order.getProductId(), -order.getQuantity()));
            operations.send("payment-request", order.getId().toString(), 
                new PaymentRequestEvent(order.getId(), order.getAmount()));
            
            return true;
        });
    }
    
    // 发送分区消息
    public void sendPartitionedMessage(Order order) {
        // 根据用户ID分区,保证同一用户的消息有序
        int partition = Math.abs(order.getUserId().hashCode()) % 3;
        kafkaTemplate.send("order-events", partition, order.getId().toString(), order);
    }
}

Kafka消费者:

@Component
public class KafkaOrderConsumer {
    
    @Autowired
    private OrderService orderService;
    
    // 基本消费
    @KafkaListener(topics = "order-created", groupId = "order-service")
    public void handleOrderCreated(ConsumerRecord<String, Order> record, Acknowledgment ack) {
        try {
            Order order = record.value();
            log.info("处理订单创建事件: partition={}, offset={}, order={}", 
                record.partition(), record.offset(), order);
            
            orderService.handleOrderCreated(order);
            
            // 手动确认
            ack.acknowledge();
            
        } catch (Exception e) {
            log.error("处理订单创建事件失败", e);
            // 不确认,消息会重新消费
        }
    }
    
    // 批量消费
    @KafkaListener(topics = "order-events", 
                   containerFactory = "batchKafkaListenerContainerFactory")
    public void handleOrderEventsBatch(List<ConsumerRecord<String, Order>> records, 
                                      Acknowledgment ack) {
        log.info("批量处理订单事件,数量: {}", records.size());
        
        List<Order> orders = records.stream()
            .map(ConsumerRecord::value)
            .collect(Collectors.toList());
        
        orderService.batchProcessOrders(orders);
        ack.acknowledge();
    }
    
    // 错误处理
    @KafkaListener(topics = "order-created")
    public void handleOrderCreatedWithRetry(Order order, 
                                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                           @Header(KafkaHeaders.OFFSET) long offset) {
        try {
            orderService.handleOrderCreated(order);
        } catch (RetryableException e) {
            log.warn("处理失败,将重试: topic={}, partition={}, offset={}", topic, partition, offset);
            throw e; // 抛出异常,触发重试
        } catch (Exception e) {
            log.error("处理失败,发送到死信队列: topic={}, partition={}, offset={}", topic, partition, offset);
            sendToDeadLetterTopic(order, e);
        }
    }
    
    private void sendToDeadLetterTopic(Order order, Exception e) {
        // 发送到死信主题
        kafkaTemplate.send("order-created-dlt", order.getId().toString(), 
            new DeadLetterMessage(order, e.getMessage()));
    }
}

🔄 9.2 分布式事务解决方案

2PC、3PC、TCC、Saga模式

面试重点:

面试官:"分布式事务有哪些解决方案?各自的优缺点是什么?"

分布式事务对比:

2PC

强一致

小规模系统

3PC

强一致

改进2PC

TCC

最终一致

金融支付

Saga

最终一致

长流程业务

消息事务

最终一致

异步场景

TCC模式实现

TCC框架实现:

// TCC接口定义
public interface TccTransaction {
    
    /**
     * Try阶段:尝试执行业务,预留资源
     */
    boolean tryExecute(TccContext context);
    
    /**
     * Confirm阶段:确认执行业务,提交资源
     */
    boolean confirmExecute(TccContext context);
    
    /**
     * Cancel阶段:取消执行业务,释放资源
     */
    boolean cancelExecute(TccContext context);
}

// 账户服务TCC实现
@Service
public class AccountTccService implements TccTransaction {
    
    @Autowired
    private AccountMapper accountMapper;
    
    @Autowired
    private AccountFreezeMapper freezeMapper;
    
    @Override
    public boolean tryExecute(TccContext context) {
        TransferRequest request = context.getRequest(TransferRequest.class);
        
        try {
            // 1. 检查账户余额
            Account account = accountMapper.selectById(request.getFromAccountId());
            if (account.getBalance().compareTo(request.getAmount()) < 0) {
                return false; // 余额不足
            }
            
            // 2. 冻结资金
            AccountFreeze freeze = new AccountFreeze();
            freeze.setTransactionId(context.getTransactionId());
            freeze.setAccountId(request.getFromAccountId());
            freeze.setAmount(request.getAmount());
            freeze.setStatus(FreezeStatus.FROZEN);
            freeze.setCreateTime(LocalDateTime.now());
            
            freezeMapper.insert(freeze);
            
            // 3. 更新账户余额(预扣)
            account.setBalance(account.getBalance().subtract(request.getAmount()));
            account.setFrozenAmount(account.getFrozenAmount().add(request.getAmount()));
            accountMapper.updateById(account);
            
            log.info("TCC Try阶段成功: transactionId={}, amount={}", 
                context.getTransactionId(), request.getAmount());
            return true;
            
        } catch (Exception e) {
            log.error("TCC Try阶段失败", e);
            return false;
        }
    }
    
    @Override
    public boolean confirmExecute(TccContext context) {
        String transactionId = context.getTransactionId();
        
        try {
            // 1. 查询冻结记录
            AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId);
            if (freeze == null || freeze.getStatus() == FreezeStatus.CONFIRMED) {
                return true; // 幂等处理
            }
            
            // 2. 确认扣款,清除冻结金额
            Account account = accountMapper.selectById(freeze.getAccountId());
            account.setFrozenAmount(account.getFrozenAmount().subtract(freeze.getAmount()));
            accountMapper.updateById(account);
            
            // 3. 更新冻结记录状态
            freeze.setStatus(FreezeStatus.CONFIRMED);
            freeze.setUpdateTime(LocalDateTime.now());
            freezeMapper.updateById(freeze);
            
            log.info("TCC Confirm阶段成功: transactionId={}", transactionId);
            return true;
            
        } catch (Exception e) {
            log.error("TCC Confirm阶段失败", e);
            return false;
        }
    }
    
    @Override
    public boolean cancelExecute(TccContext context) {
        String transactionId = context.getTransactionId();
        
        try {
            // 1. 查询冻结记录
            AccountFreeze freeze = freezeMapper.selectByTransactionId(transactionId);
            if (freeze == null || freeze.getStatus() == FreezeStatus

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java面试圣经 文章被收录于专栏

Java面试圣经,带你练透java圣经

全部评论

相关推荐

秋招投简历提醒助手:一开始还觉得是正常交流。直到一看薪资4-6😨
点赞 评论 收藏
分享
AC鸽想进大厂:你是我见过最美的牛客女孩
点赞 评论 收藏
分享
09-23 15:16
门头沟学院 Java
点赞 评论 收藏
分享
评论
1
4
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务