RocketMQ(九)高级特性-消息重试机制

rocketMq具有消息重试的机制,重试也分为两种重试:producer重试consumer重试

producer重试

如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

可以通过以下方式进行配置:
1)使用starter的同学可以直接在yml文件中进行配置,主要属性有超时时间重试次数,另外提供一个在其他broker节点重试的机制,如下所示:

rocketmq:
  name-server: http://101.200.36.168:9876
  producer:
    #指定消息发送者的组,在控制台查询时会用到
    group: test
    #发送失败超时时间
    send-message-timeout: 3000
    #重试次数
    retry-times-when-send-failed: 3
    #在其他broker服务端进行重试默认false,开启设置为on
    retry-next-server: false

2)在java代码中可以对producer进行手动设置:

代码设置

consumer重试(两种:监听、自定义消费者)

两种方式其实监听是对自定义的一个封装,只不过自定义可能更灵活一些,使用监听的形式我还没找到在哪里设置重试次数。

下面先看看监听的形式: 在消费者监听器,自定义抛出异常,会发生重试:

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_reconsume", selectorExpression = "*", consumerGroup = "test_reconsume")
public class RetryConsumerListener implements RocketMQListener<MessageExt> {

    @SneakyThrows
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("receive sync message:{}", msg);
        throw new Exception("开始重试");
    }
}

重试时的消息属性如下图:

重试消息属性

上图中有一个delay属性,其实是延时等级,还记得我们前面学习的延时消息吗?在borker.conf中配置了了DelayDevel等级(延时等级),重试机制也是按照这个等级来的,默认情况下总共会重试16次,这个等级逐渐加1。下一次的重试属性如下图:

重试属性

下面举例一个自定义消费者代码:

package com.cloud.bssp.message.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 重试consumer
 *
 * @date: 2020/11/30
 * @author weirx
 * @version 3.0
 */
@Component
@Slf4j
public class RetryConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
        //设置NameServer地址,替换成自己的ip地址
        consumer.setNamesrvAddr("ip:9876");
        //设置实例名称
        consumer.setInstanceName("consumer");
        //订阅topic
        consumer.subscribe("test_reconsume_1", "");
        //监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //获取消息
                for (MessageExt messageExt : list) {
                    log.info("消息重试:{} ", messageExt.getMsgId() + "---" + new String(messageExt.getBody()));
                }
                try {
                    //模拟错误
                    int i = 5 / 0;
                } catch (Exception e) {
                    e.printStackTrace();
                    //需要重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //不需要重试
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
        System.out.println("Consumer Started!");
    }
}

两种方式默认情况下都是重试16次,使用延时等级配置的时间。 自定义消费者可以使用如下的方式进行配置最大重试次数:

 //设置重试次数为2
 consumer.setMaxReconsumeTimes(2);
全部评论

相关推荐

积极的小学生不要香菜:你才沟通多少,没500不要说难
点赞 评论 收藏
分享
07-11 22:27
中南大学 Java
程序员牛肉:学历的话没问题。但是没问题的也就只有学历了。 其实你的整体架构是正确的,博客接着干。但是项目有点过于简单了。从后端的角度上讲,你这也就是刚入门的水平,所以肯定约面试够呛。 如果你要应聘后端岗位,那你第一个项目竟然是仿写操作系统。这个你要面试官咋问你。你一定要记住一点,你简历上写的所有的东西,都是为了证明你有能力胜任当前的岗位,而不是为了证明你自己会什么。 如果你只是浅浅的做几个项目,描述也都是烂大街。技术点也都是各种混水类的配置类需求,那你就不要幻想自己能走多远。一定要保持思考,保持学习。
点赞 评论 收藏
分享
07-16 14:10
门头沟学院 Java
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务