RocketMQ

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,现已捐赠给 Apache 软件基金会并成为顶级项目。它具有高吞吐量、高可用性、严格的消息顺序性等特点,在众多大型互联网项目中广泛应用。以下从基础概念、特性、应用场景、架构组成以及 Java 使用示例方面介绍 RocketMQ:

基础概念

  1. 生产者(Producer):负责产生消息,将业务系统中的数据封装成消息发送到 RocketMQ 服务器。生产者可以是各种业务应用,比如电商系统中的订单生成模块、物流系统中的包裹状态更新模块等。
  2. 消费者(Consumer):负责消费消息,从 RocketMQ 服务器获取消息并进行业务处理。消费者可以是不同的业务逻辑,如订单处理后的库存更新、物流状态变化后的通知推送等。
  3. 主题(Topic):是消息的逻辑分类,用于区分不同类型的消息。例如,电商系统中可以有 “order - topic” 用于订单相关消息,“payment - topic” 用于支付相关消息。
  4. 队列(Queue):也叫消息队列,是 Topic 的物理分区,一个 Topic 可以包含多个 Queue。通过多队列机制,RocketMQ 可以实现消息的并行处理,提高消息的处理效率。
  5. 标签(Tag):对消息进行再分类,在同一个 Topic 下,可以使用不同的 Tag 来区分不同子类型的消息。比如在 “order - topic” 中,可以有 “create - order - tag” 表示创建订单消息,“cancel - order - tag” 表示取消订单消息。

特性

  1. 高吞吐量:能够支持海量消息的高并发发送和接收,适用于大数据量的实时处理场景。
  2. 高可用性:通过主从架构、多副本机制等保证消息服务的高可用性,即使部分节点出现故障,也能确保消息不丢失且系统正常运行。
  3. 消息顺序性:支持严格的消息顺序,在一些对顺序敏感的场景,如订单处理流程,先下单消息后支付消息,能保证消息按照发送顺序被消费。
  4. 可靠的消息投递:提供多种消息投递方式,如同步发送、异步发送、单向发送等,并支持消息重试机制,确保消息尽可能可靠地被投递和处理。

应用场景

  1. 异步处理:同 RabbitMQ 类似,将一些耗时的操作(如发送邮件、短信通知、复杂的业务计算等)封装成消息发送到 RocketMQ,主线程无需等待这些操作完成,提高系统的响应速度。
  2. 削峰填谷:在高并发场景下,如电商大促活动时,瞬间大量的请求可以先发送到 RocketMQ,后端系统按照自身处理能力从队列中消费消息,避免系统因流量高峰而崩溃。
  3. 数据分发:将数据从一个系统分发到多个不同的系统或模块。例如,一个数据采集系统采集到数据后,通过 RocketMQ 将数据分发给数据分析模块、数据存储模块等不同的下游系统。

架构组成

  1. NameServer:是一个轻量级的元数据服务,主要负责存储和管理 Topic、Broker 等元数据信息。生产者和消费者通过 NameServer 获取所需的元数据,从而与相应的 Broker 进行通信。NameServer 可以部署多个实例,相互独立,不存在单点故障问题。
  2. Broker:负责接收、存储和转发消息。Broker 分为 Master 和 Slave 两种角色,Master 负责处理读写请求,Slave 从 Master 同步数据,用于在 Master 出现故障时提供高可用性保障。多个 Broker 可以组成一个 Broker 集群,共同提供消息服务。
  3. Producer:生产者实例,负责将消息发送到 Broker。生产者与 NameServer 建立长连接,获取 Topic 对应的 Broker 地址列表,然后与 Broker 建立长连接进行消息发送。
  4. Consumer:消费者实例,负责从 Broker 拉取消息并进行消费。消费者同样与 NameServer 建立长连接获取元数据,然后与 Broker 建立长连接进行消息拉取。

Java 使用示例

以下是使用 RocketMQ 的 Java 客户端进行简单消息发送和接收的示例:

  1. 添加依赖pom.xml 中添加 RocketMQ 客户端依赖:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq - client</artifactId>
    <version>4.9.4</version>
</dependency>
  1. 生产者发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("example - group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建消息实例,指定 Topic、Tag 和消息内容
        Message message = new Message("example - topic", "example - tag", "Hello, RocketMQ!".getBytes());
        // 发送消息
        SendResult sendResult = producer.send(message);
        System.out.println("SendResult: " + sendResult);

        // 关闭生产者
        producer.shutdown();
    }
}
  1. 消费者接收消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example - group");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅 Topic 和 Tag
        consumer.subscribe("example - topic", "example - tag");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

以上示例展示了如何使用 RocketMQ 的 Java 客户端进行基本的消息发送和接收操作。在实际应用中,还可以根据具体需求进行更复杂的配置和功能实现,如事务消息、顺序消息的处理等。

#牛客创作赏金赛#
全部评论

相关推荐

入职也有段时间了,闲来无事,记录记录参与的平台换了好几波人(前后端方向都一样),整体的发展态势便是:接新需求、重构每天坐下,没有需求的日子便是“熟悉历史(屎?)逻辑”,作为一个实习生,我也没啥资格说他到底是不是💩。经常接口一堆字段,逻辑一堆字段,死无对账。是的,没有接口文档,你没听错;也没有readme,意思全靠猜。那种无力感,你懂我意思?起初我还是斗志满满,觉得如果能在实习这段时间跟着mt在平台有所小成就,也算不枉这段实习。但是他们似乎也是在“赌”以及小心翼翼的不想推动。尽管他们确实在过去半年(一年?)已经做了一些动作有时候整理了一些问题,希望能够得到解答,得到的也是“这些历史逻辑,你问我我也只能告诉你我也不清楚,我也只能靠猜”(尬笑😅);不然就是,这部分逻辑你不需要去深究每部分的细节,知道哪个数据在哪里处理的即可...再有激情和热情,说白了也被浇灭了,倒是每天得为日报内容发愁(唯一期待的就剩每天傍晚健个身、吃个饭)我惊叹于为什么有人写的代码根本就没有所谓的单向数据流(常常在意想不到的地方进行了改动)、根本就没有所谓的cr、根本就没有规范性可言,然后留给后来者的只有唏嘘和更多的成本(作为一个实习生而言,能发出这种疑问,那代码真的是无敌了)我无法站在一个旁观者的角度去谴责历代开发者的佳作,毕竟他们当时可能是处在一个高压、短期的环境下完成的,毕竟人和代码总要跑通一个但其实,更多的是,意识到以后我工作了也是接触这些?那我感觉是真的对之后无感我深刻的意识到到了哪都是一个草台班子,是我的期待值过高,于是想逃,但发现根本无处可去,才意识到都tm一个鸟样...
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客企业服务