RocketMQ
RocketMQ 是阿里巴巴开源的一款分布式消息中间件,现已捐赠给 Apache 软件基金会并成为顶级项目。它具有高吞吐量、高可用性、严格的消息顺序性等特点,在众多大型互联网项目中广泛应用。以下从基础概念、特性、应用场景、架构组成以及 Java 使用示例方面介绍 RocketMQ:
基础概念
- 生产者(Producer):负责产生消息,将业务系统中的数据封装成消息发送到 RocketMQ 服务器。生产者可以是各种业务应用,比如电商系统中的订单生成模块、物流系统中的包裹状态更新模块等。
- 消费者(Consumer):负责消费消息,从 RocketMQ 服务器获取消息并进行业务处理。消费者可以是不同的业务逻辑,如订单处理后的库存更新、物流状态变化后的通知推送等。
- 主题(Topic):是消息的逻辑分类,用于区分不同类型的消息。例如,电商系统中可以有 “order - topic” 用于订单相关消息,“payment - topic” 用于支付相关消息。
- 队列(Queue):也叫消息队列,是 Topic 的物理分区,一个 Topic 可以包含多个 Queue。通过多队列机制,RocketMQ 可以实现消息的并行处理,提高消息的处理效率。
- 标签(Tag):对消息进行再分类,在同一个 Topic 下,可以使用不同的 Tag 来区分不同子类型的消息。比如在 “order - topic” 中,可以有 “create - order - tag” 表示创建订单消息,“cancel - order - tag” 表示取消订单消息。
特性
- 高吞吐量:能够支持海量消息的高并发发送和接收,适用于大数据量的实时处理场景。
- 高可用性:通过主从架构、多副本机制等保证消息服务的高可用性,即使部分节点出现故障,也能确保消息不丢失且系统正常运行。
- 消息顺序性:支持严格的消息顺序,在一些对顺序敏感的场景,如订单处理流程,先下单消息后支付消息,能保证消息按照发送顺序被消费。
- 可靠的消息投递:提供多种消息投递方式,如同步发送、异步发送、单向发送等,并支持消息重试机制,确保消息尽可能可靠地被投递和处理。
应用场景
- 异步处理:同 RabbitMQ 类似,将一些耗时的操作(如发送邮件、短信通知、复杂的业务计算等)封装成消息发送到 RocketMQ,主线程无需等待这些操作完成,提高系统的响应速度。
- 削峰填谷:在高并发场景下,如电商大促活动时,瞬间大量的请求可以先发送到 RocketMQ,后端系统按照自身处理能力从队列中消费消息,避免系统因流量高峰而崩溃。
- 数据分发:将数据从一个系统分发到多个不同的系统或模块。例如,一个数据采集系统采集到数据后,通过 RocketMQ 将数据分发给数据分析模块、数据存储模块等不同的下游系统。
架构组成
- NameServer:是一个轻量级的元数据服务,主要负责存储和管理 Topic、Broker 等元数据信息。生产者和消费者通过 NameServer 获取所需的元数据,从而与相应的 Broker 进行通信。NameServer 可以部署多个实例,相互独立,不存在单点故障问题。
- Broker:负责接收、存储和转发消息。Broker 分为 Master 和 Slave 两种角色,Master 负责处理读写请求,Slave 从 Master 同步数据,用于在 Master 出现故障时提供高可用性保障。多个 Broker 可以组成一个 Broker 集群,共同提供消息服务。
- Producer:生产者实例,负责将消息发送到 Broker。生产者与 NameServer 建立长连接,获取 Topic 对应的 Broker 地址列表,然后与 Broker 建立长连接进行消息发送。
- Consumer:消费者实例,负责从 Broker 拉取消息并进行消费。消费者同样与 NameServer 建立长连接获取元数据,然后与 Broker 建立长连接进行消息拉取。
Java 使用示例
以下是使用 RocketMQ 的 Java 客户端进行简单消息发送和接收的示例:
- 添加依赖
在
pom.xml
中添加 RocketMQ 客户端依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq - client</artifactId>
<version>4.9.4</version>
</dependency>
- 生产者发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("example - group");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定 Topic、Tag 和消息内容
Message message = new Message("example - topic", "example - tag", "Hello, RocketMQ!".getBytes());
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);
// 关闭生产者
producer.shutdown();
}
}
- 消费者接收消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example - group");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅 Topic 和 Tag
consumer.subscribe("example - topic", "example - tag");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}
以上示例展示了如何使用 RocketMQ 的 Java 客户端进行基本的消息发送和接收操作。在实际应用中,还可以根据具体需求进行更复杂的配置和功能实现,如事务消息、顺序消息的处理等。
#牛客创作赏金赛#