商品中心—12.商品溯源系统的技术文档

大纲

1.商品数据变更溯源系统运行流程分析

2.Canal推送到RocketMQ的binlog数据模型

3.Canal推送到RocketMQ的binlog数据解析

4.表的监听配置查询以及关联字段的监听配置查询

5.根据binlog数据对象构建数据变更对象

6.完成binlog数据到数据变更消息的转换

7.数据变更消息推送到多系统多路topic中

8.外部消息数据格式转化以及写入DB中

9.外部消息发送触发以及多路系统Topic推送

10.商品数据变更溯源系统消息处理

1.商品数据变更溯源系统运行流程分析

(1)需求描述

(2)数据库表

(3)整体流程

(1)需求描述

在商品的整个⽣命周期中,商品数据是可以不断变化的。⽐如商品标题、图⽚、价格等相关数据都是可以被修改的。商品数据溯源需求就是要把商品在整个⽣命周期中的数据变更都记录下来,并且能清晰的展示出每次变更发⽣的变化。

(2)数据库表

在溯源配置表中配置的表,当表的记录发⽣变化时会记录对应的溯源数据。

create table if not exists data_trace_listen_config (
    id int unsigned auto_increment comment '主键' primary key,
    table_name varchar(40) null comment '数据表名称',
    del_flag tinyint(1) null comment '是否删除',
    create_user int null comment '创建⼈',
    create_time datetime null comment '创建时间',
    update_user int null comment '更新⼈',
    update_time datetime null comment '更新时间'
) comment '数据溯源监听表';

(3)整体流程

步骤一:Canal监听MySQL的binlog数据

步骤二:Canal发送binlog消息,所有的DDL、DML语句都会被Canal以binlog消息形式发送到RocketMQ

步骤三:消息处理系统消费RocketMQ的binlog消息,此时会过滤DML语句并根据binlog的表名从溯源配置表中查询是否需要记录数据溯源信息,如果需要则把binlog信息解析成多个TraceData对象

步骤四:消息处理系统把从binlog消息解析出来的多个TraceData对象发送到RocketMQ中

步骤五:商品溯源系统消费RocketMQ中的TraceData消息

步骤六:商品溯源系统把消费到的TraceData消息批量写⼊到ES对应的索引中

步骤七:商品运营后台查询商品溯源数据时会向商品溯源系统发起请求

步骤八:商品溯源系统会把请求参数封装成ES的查询请求,然后向ES发起查询并返回结果

整个内部消息和外部消息的流程如下:

流程一:商品中心系统的开发人员会对商品中心的表配置内部消息对象。

流程二:非商品中心系统的开发人员会对商品中心的表配置外部消息对象。

流程三:商品中心的表发生变更后,binlog消息应该先被商品中心系统自己消费。

流程四:binlog消息被商品中心系统自己消费后,再继续被非商品中心系统消费。

流程五:当商品消息处理系统消费binlog消息时,会先获取配置的内部消息对象。然后根据配置的信息,将binlog消息对应的变更数据发送到指定topic。接着获取配置的外部消息对象,把这次发送的变更数据保存到数据库中。此过程中,发送到指定topic和保存到DB的变更数据会由消息编号来关联。

流程六:商品中心的系统消费完binlog消息对应的变更数据消息后,会将消息编号发送到MQ中以外部消息的形式由商品消息系统去消费处理。

流程七:当商品消息系统在消费外部消息时,首先会将消息编号提取出来,然后根据消息编号去数据库查询关联的binlog消息对应的变更数据。由于这些保存的变更数据已经配置好完整的外部消息,包括发到那些topic。所以接着可以把变更数据发送到非商品中心系统的开发人员配置的topic,从而实现非商品中心系统对商品中心的表的binlog变化的监听,而且严格保正了binlog消息的消费顺序:先商品中心->再非商品中心。发送消息后,最后便会把消息编号相关的变更消息数据从数据库中删除。

2.Canal推送到RocketMQ的binlog数据模型

@Configuration
public class ConsumerBeanConfig {
    //配置内容对象
    @Autowired
    private RocketMQProperties rocketMQProperties;

    //数据变更消费者,负责数据变更监控后发送MQ消息通知订阅方
    @Bean("dataChangeConsumer")
    public DefaultMQPushConsumer createItemStageConsumer(DataChangeListener dataChangeListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_CHANGE_CONSUMER_GROUP);
        consumer.setInstanceName("dataChangeListener");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(RocketMqConstant.DATA_CHANGE_TOPIC, "*");
        consumer.registerMessageListener(dataChangeListener);
        consumer.start();
        return consumer;
    }
    ...
}

@Component
public class DataChangeListener implements MessageListenerConcurrently {
    @Autowired
    private MessageService messageService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String msg = new String(messageExt.getBody());
                log.info("DataChangeListener数据变更消息通知,消息内容:{}", msg);
                //获取binlog对象
                BinlogData binlogData = BinlogUtils.getBinlogData(msg);
                if (Objects.isNull(binlogData) || Objects.isNull(binlogData.getDataMap())) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //操作类型不是insert,delete,update的,不作处理
                String operateType = binlogData.getOperateType();
                if (!BinlogType.INSERT.getValue().equals(operateType)
                        && !BinlogType.DELETE.getValue().equals(operateType)
                        && !BinlogType.UPDATE.getValue().equals(operateType)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                messageService.processBinlogMessage(binlogData);
            }
        } catch (Exception e) {
            log.error("consume error, 消费数据变更消息失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

//Canal推送到RocketMQ的binlog数据模型
@Data
public class BinlogData implements Serializable {
    //binlog对应的表名
    private String tableName;

    //操作时间
    private Long operateTime;

    //操作类型
    private String operateType;

    //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
    //一条SQL语句可以更新多条语句,所以使用List
    private List<Map<String, Object>> dataMap;

    //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
    //一条SQL语句可以更新多条语句,所以使用List
    private List<Map<String, Object>> oldMap;
}

3.Canal推送到RocketMQ的binlog数据解析

注意:一条binlog消息可能对应多条数据的变更。

//binlog数据解析
public abstract class BinlogUtils {
    //解析binlog json字符串
    public static BinlogData getBinlogData(String binlogStr) {
        //isJson方法里面会判断字符串是不是为空,所以这里不需要重复判断
        if (JSONUtil.isJson(binlogStr)) {
            JSONObject binlogJson = JSONUtil.parseObj(binlogStr);
            //不处理DDL的binlog,只处理数据变更
            if (binlogJson.getBool("isDdl")) {
                return null;
            }
            BinlogData binlogData = new BinlogData();
            //表名
            String tableName = binlogJson.getStr("table");
            binlogData.setTableName(tableName);
            //操作类型
            String operateType = binlogJson.getStr("type");
            binlogData.setOperateType(operateType);
            //操作时间
            Long operateTime = binlogJson.getLong("ts");
            binlogData.setOperateTime(operateTime);

            //data数据,当前binlog对应的sql变更数据可能是多条,比如可能是这样的json数组:[{},{},{}]
            JSONArray dataArray = binlogJson.getJSONArray("data");

            //对binlog数据进行解析,可能会有多条数据,所以使用List
            List<Map<String, Object>> dataMap = jsonArrayToMapList(dataArray);
            binlogData.setDataMap(dataMap);

            if (!binlogJson.isNull("old")) {
                //old数据
                JSONArray oldArray = binlogJson.getJSONArray("old");
                List<Map<String, Object>> oldMap = jsonArrayToMapList(oldArray);
                binlogData.setOldMap(oldMap);
            }
            return binlogData;
        }
        return null;
    }
    
    private static List<Map<String, Object>> jsonArrayToMapList(JSONArray jsonArray) {
        if (null != jsonArray) {
            Iterable<JSONObject> arrayIterator = jsonArray.jsonIter();
            //遍历data节点或old节点并返回Map
            if (null != arrayIterator) {
                //binlog的data数组或old数组里数据的类型为Map
                List<Map<String, Object>> dataMap = new ArrayList<>();
                while (arrayIterator.iterator().hasNext()) {
                    JSONObject jsonObject = arrayIterator.iterator().next();
                    Map<String, Object> data = new HashMap<>(jsonObject.size());
                    jsonObject.keySet().forEach(key -> {
                        data.put(key, jsonObject.get(key));
                    });
                    dataMap.add(data);
                }
                return dataMap;
            }
        }
        return null;
    }
    ...
}

4.表的监听配置查询以及关联字段的监听配置查询

先从缓存查,缓存查不到再去DB查。

//消息业务实现类
@Service
public class MessageServiceImpl implements MessageService {
    @Resource
    private DataChangeRepository dataChangeRepository;
    ...

    //处理binlog消息
    @Override
    public void processBinlogMessage(BinlogData binlogData) {
        //获取当前表的监听信息
        DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
        //未配置监听信息的表,不作处理
        if (Objects.isNull(listenConfigDO)) {
            return;
        }

        //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象
        List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
        //不需要监听,或者要监听的字段值未变动
        if (CollectionUtils.isEmpty(dataChangeMessages)) {
            return;
        }

        //获取配置的消息对象
        //这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
        //不需要发送消息
        if (CollectionUtils.isEmpty(messageConfigBOS)) {
            return;
        }

        //封装成需要发送的消息对象
        //这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
        //待发送的消息为空,无需处理
        if (CollectionUtils.isEmpty(sendDataMessageList)) {
            return;
        }

        //发送消息
        sendDataMessage(sendDataMessageList);
        //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存
        if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
            //保存消息详细信息
            saveDataMessageDetail(dataChangeMessages, binlogData);
        }
    }

    //获取数据变更对象列表
    public List<DataChangeMessage> getDataChangeMessage(BinlogData binlogData, DataChangeListenConfigDO listenConfigDO) {
        //获取监听变更字段配置表信息
        List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeRepository.getColumnConfigByListenId(listenConfigDO.getId());
        //要监听的字段为空,不作处理
        if (CollectionUtils.isEmpty(columnConfigDOS)) {
            return null;
        }
        //构建数据变更对象DataChangeMessage并返回
        return buildChangeColumn(binlogData, columnConfigDOS, listenConfigDO);
    }
    ...
}

@Repository
public class DataChangeRepository {
    @Resource
    private RedisReadWriteManager redisReadWriteManager;
    ...

    //根据表名获取监听配置信息
    public DataChangeListenConfigDO getListenConfigByTable(String tableName) {
        Optional<DataChangeListenConfigDO> optional = redisReadWriteManager.getRedisStringDataByCache(
            tableName,
            DataChangeListenConfigDO.class,
            AbstractRedisKeyConstants::getListenConfigStringKey,
            this::getListenConfigByTableFromDB
        );
        return optional.orElse(null);
    }

    //根据表名获取监听配置信息
    public Optional<DataChangeListenConfigDO> getListenConfigByTableFromDB(String tableName) {
        LambdaQueryWrapper<DataChangeListenConfigDO> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(DataChangeListenConfigDO::getTableName, tableName).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
        DataChangeListenConfigDO listenConfigDO = dataChangeListenConfigMapper.selectOne(queryWrapper);
        return Objects.isNull(listenConfigDO) ? Optional.empty() : Optional.of(listenConfigDO);
    }

    //获取监听变更字段配置表信息
    public List<DataChangeColumnConfigDO> getColumnConfigByListenId(Long id) {
        Optional<List<DataChangeColumnConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(
            id, 
            DataChangeColumnConfigDO.class,
            AbstractRedisKeyConstants::getColumnConfigStringKey,
            this::getColumnConfigByListenIdFromDB
        );
        return optional.orElse(null);
    }

    //获取监听变更字段配置表信息
    public Optional<List<DataChangeColumnConfigDO>> getColumnConfigByListenIdFromDB(Long id) {
        LambdaQueryWrapper<DataChangeColumnConfigDO> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(DataChangeColumnConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
        List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeColumnConfigMapper.selectList(queryWrapper);
        return CollectionUtils.isEmpty(columnConfigDOS) ? Optional.empty() : Optional.of(columnConfigDOS);
    }
    ...
}

//缓存读写管理
@Service
public class RedisReadWriteManager {
    ...
    //批量获取缓存数据
    //@param key                 关键字列表
    //@param clazz               需要将缓存JSON转换的对象
    //@param getRedisKeyFunction 获取redis key的方法
    //@param getDbFunction       获取数据源对象的方法
    public <T> Optional<T> getRedisStringDataByCache(String key, Class<T> clazz,
            Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {
        try {
            String redisKey = getRedisKeyFunction.apply(key);
            String cache = redisCache.get(redisKey);
            //过滤无效缓存
            if (EMPTY_OBJECT_STRING.equals(cache)) {
                return Optional.empty();
            }
            if (StringUtils.isNotBlank(cache)) {
                T t = JSON.parseObject(cache, clazz);
                return Optional.of(t);
            }
            //缓存没有则读库
            return getRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);
        } catch (Exception e) {
            log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);
            throw e;
        }
    }

    //读取数据库表数据赋值到Redis
    public <T> Optional<T> getRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {
        if (StringUtils.isBlank(key) || Objects.isNull(getDbFunction)) {
            return Optional.empty();
        }
        try {
            if (!redisLock.lock(key)) {
                return Optional.empty();
            }
            String redisKey = getRedisKeyFunction.apply(key);
            Optional<T> optional = getDbFunction.apply(key);

            putCacheString(redisKey, optional);
            return optional;
        } finally {
            redisLock.unlock(key);
        }
    }

    private void putCacheString(String redisKey, Optional optional) {
        if (!optional.isPresent()) {
            //把空对象暂存到Redis
            redisCache.setex(redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));
            log.warn("发生缓存穿透 redisKey={}", redisKey);
            return;
        }
        //把数据对象存到Redis
        redisCache.setex(redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));
        log.info("数据对象存到redis redisKey={}, data={}", redisKey, JSON.toJSONString(optional.get()));
    }
    ...
}

5.根据binlog数据对象构建数据变更对象

//消息业务实现类
@Service
public class MessageServiceImpl implements MessageService {
    @Resource
    private DataChangeRepository dataChangeRepository;
    ...

    //获取数据变更对象列表
    public List<DataChangeMessage> getDataChangeMessage(BinlogData binlogData, DataChangeListenConfigDO listenConfigDO) {
        //获取监听变更字段配置表信息
        List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeRepository.getColumnConfigByListenId(listenConfigDO.getId());
        //要监听的字段为空,不作处理
        if (CollectionUtils.isEmpty(columnConfigDOS)) {
            return null;
        }
        //构建数据变更对象DataChangeMessage
        return buildChangeColumn(binlogData, columnConfigDOS, listenConfigDO);
    }

    //构建数据变更对象DataChangeMessage
    private List<DataChangeMessage> buildChangeColumn(BinlogData binlogData, List<DataChangeColumnConfigDO> columnConfigDOS, DataChangeListenConfigDO listenConfigDO) {
        //将当前binlog操作的每一条数据都封装成一个数据变更对象DataChangeMessage
        List<DataChangeMessage> dataChangeMessages = new ArrayList<>();
        //操作类型
        String operateType = binlogData.getOperateType();

        for (int i = 0; i < binlogData.getDataMap().size(); i++) {
            //每个map都是一条sql执行后被更新的一条数据
            Map<String, Object> data = binlogData.getDataMap().get(i);
            //判断是否对这条数据进行了新增或删除操作
            if (BinlogType.INSERT.getValue().equals(operateType) || BinlogType.DELETE.getValue().equals(operateType)) {
                //获取所有监听的字段列表
                List<String> updateColumns = columnConfigDOS.stream().map(DataChangeColumnConfigDO::getListenColumn).collect(Collectors.toList());
                //把新增或删除的binlog数据对象封装成一个数据变更对象DataChangeMessage
                DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));
                dataChangeMessages.add(dataChangeMessage);
            } else {//对这条数据进行了更新操作
                Map<String, Object> old = binlogData.getOldMap().get(i);
                List<String> updateColumns = new ArrayList<>();
                //遍历监听的字段,获取变更的字段列表
                for (DataChangeColumnConfigDO columnConfigDO : columnConfigDOS) {
                    String column = columnConfigDO.getListenColumn();
                    Object columnOldValue = old.get(column);
                    //旧的字段值有数据,就表示该字段变更了,添加至修改的字段集合
                    if (!Objects.isNull(columnOldValue)) {
                        updateColumns.add(column);
                    }
                }
                //监听的字段有数据变更
                if (!CollectionUtils.isEmpty(updateColumns)) {
                    //把更新的binlog数据对象封装成一个数据变更对象DataChangeMessage
                    DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));
                    dataChangeMessages.add(dataChangeMessage);
                }
            }
        }
        return dataChangeMessages;
    }

    //构建数据变更对象
    private DataChangeMessage buildDataChangeMessage(BinlogData binlogData, List<String> updateColumns, Object keyId) {
        DataChangeMessage dataChangeMessage = new DataChangeMessage(binlogData.getOperateType(), binlogData.getTableName(), updateColumns);
        dataChangeMessage.setMessageNo(SnowflakeIdWorker.getCode());
        dataChangeMessage.setKeyId(keyId);
        return dataChangeMessage;
    }
    ...
}

//数据变更对象
@Data
public class DataChangeMessage implements Serializable {
    //内部消息编号
    private String messageNo;

    //操作行为,INSERT、UPDATE、DELETE
    private String action;

    //表名
    private String tableName;

    //主键或业务id
    private Object keyId;

    //变更的列
    private List<String> updateColumns;

    //唯一确定当前数据的字段以及字段值
    private List<ColumnValue> columnValues;

    //消息处理成功之后的回调topic
    private String callbackTopic = RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC;

    public DataChangeMessage(String operateType, String tableName, List<String> updateColumns) {
        this.action = operateType;
        this.tableName = tableName;
        this.updateColumns = updateColumns;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class ColumnValue {
        private String column;//列
        private Object value;//值
    }
}

6.完成binlog数据到数据变更消息的转换

(1)监听表 + 监听字段 + 消息模型的关系

(2)获取配置的消息对象

(3)数据变更消息的多系统多路转发模型

(1)监听表 + 监听字段 + 消息模型的关系

create table data_change_listen_config (
    id int unsigned auto_increment comment '主键' primary key,
    table_name varchar(40) null comment '数据表名称',
    key_column varchar(40) default 'id' null comment '数据表对应的主键或业务id列名',
    filter_flag tinyint(1) null comment '是否过滤',
    del_flag tinyint(1) null comment '是否删除',
    create_user int null comment '创建⼈',
    create_time datetime null comment '创建时间',
    update_user int null comment '更新⼈',
    update_time datetime null comment '更新时间'
) comment '数据变更监听表';//一个表只有一条记录

create table data_change_column_config (
    id int unsigned auto_increment primary key,
    listen_id int null comment '监听表id',
    listen_column varchar(40) null comment '监听字段',
    del_flag tinyint(1) null comment '删除标记',
    create_user int null comment '创建⼈',
    create_time datetime null comment '创建时间',
    update_user int null comment '更新⼈',
    update_time datetime null comment '更新时间'
) comment '监听表变化字段配置表';

create table data_change_message_config (
    id int unsigned null comment '主键',
    listen_id int null comment '监听表id',
    notify_column varchar(2000) null comment '变更通知字段,逗号分隔',
    message_topic varchar(256) null comment '变更通知消息主题',
    delay_level int null comment '延迟等级',
    message_type tinyint(3) null comment '消息类型',
    del_flag tinyint(1) null comment '删除标记',
    create_user int null comment '创建⼈',
    create_time datetime null comment '创建时间',
    update_user int null comment '更新⼈',
    update_time datetime null comment '更新时间'
) comment '监听表消息模型表';

(2)获取配置的消息对象

//消息业务实现类
@Service
public class MessageServiceImpl implements MessageService {
    @Resource
    private DataChangeRepository dataChangeRepository;
    ...

    //处理binlog消息
    @Override
    public void processBinlogMessage(BinlogData binlogData) {
        //获取当前表的监听信息
        DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
        //未配置监听信息的表,不作处理
        if (Objects.isNull(listenConfigDO)) {
            return;
        }

        //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象
        List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
        //不需要监听,或者要监听的字段值未变动
        if (CollectionUtils.isEmpty(dataChangeMessages)) {
            return;
        }

        //获取配置的消息对象
        //这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
        //不需要发送消息
        if (CollectionUtils.isEmpty(messageConfigBOS)) {
            return;
        }

        //封装成需要发送的消息对象
        //这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
        //待发送的消息为空,无需处理
        if (CollectionUtils.isEmpty(sendDataMessageList)) {
            return;
        }

        //发送消息
        sendDataMessage(sendDataMessageList);
        //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存
        if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
            //保存消息详细信息
            saveDataMessageDetail(dataChangeMessages, binlogData);
        }
    }
    ...
}

@Repository
public class DataChangeRepository {
    @Resource
    private RedisReadWriteManager redisReadWriteManager;
    ...

    //获取监听表消息模型配置
    public List<DataChangeMessageConfigBO> getMessageConfigBOByListenId(Long id) {
        //获取监听表消息模型配置
        return dataMessageConverter.converterBOList(getMessageConfigByListenId(id, null));
    }

    //获取监听表消息模型配置
    public List<DataChangeMessageConfigDO> getMessageConfigByListenId(Long id, MessageTypeEnum messageTypeEnum) {
        //获取监听表消息模型配置
        Optional<List<DataChangeMessageConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(
            id,
            DataChangeMessageConfigDO.class,
            AbstractRedisKeyConstants::getMessageConfigStringKey,
            this::getMessageConfigByListenIdFromDB
        );
        //如果未指定是内部消息还是外部消息,则不需要过滤
        if (Objects.isNull(messageTypeEnum)) {
            return optional.orElse(null);
        }
        return optional.map(dataChangeMessageConfigDOS -> dataChangeMessageConfigDOS.stream()
            .filter(messageConfigBO -> messageTypeEnum.getCode().equals(messageConfigBO.getMessageType()))
            .collect(Collectors.toList())).orElse(null);
    }

    //查询数据变更消息列表
    public Optional<List<DataChangeMessageConfigDO>> getMessageConfigByListenIdFromDB(Long id) {
        LambdaQueryWrapper<DataChangeMessageConfigDO> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(DataChangeMessageConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
        List<DataChangeMessageConfigDO> messageConfigDOS = dataChangeMessageConfigMapper.selectList(queryWrapper);
        return CollectionUtils.isEmpty(messageConfigDOS) ? Optional.empty() : Optional.of(messageConfigDOS);
    }
    ...
}

//数据变更配置表
@Data
public class DataChangeMessageConfigBO implements Serializable {
    private static final long serialVersionUID = 1L;
    //是否过滤
    private Integer listenId;
    //消息通知列,逗号分隔
    private String notifyColumn;
    //消息topic
    private String messageTopic;
    //延迟等级
    private Integer delayLevel;
    //消息类型 1-内部消息,2-外部消息
    private Integer messageType;
}

(3)数据变更消息的多系统多路转发模型

注意:一条binlog可能会对应多条变更数据。一个配置的消息对象包含了要发送的topic以及感兴趣的字段。一个监听表可能会配置了多个消息对象DataChangeMessageConfigBO,不同系统或者不同业务对某个表及其字段的监听就会保存在该消息对象之中。

//消息业务实现类
@Service
public class MessageServiceImpl implements MessageService {
    @Resource
    private DataChangeRepository dataChangeRepository;
    ...

    //获取需要发送的消息对象
    //@param dataChangeMessages  一条binlog可能会对应多条变更数据
    //@param dataMap  传入binlog的多条变更数据的具体变更字段map
    public List<DataSendMessageBO> getInternalSendDataMessage(List<DataChangeMessage> dataChangeMessages,
            List<Map<String, Object>> dataMap, List<DataChangeMessageConfigBO> dataChangeMessageConfigBOS) {
        List<DataSendMessageBO> dataSendMessageBOS = new ArrayList<>();
        //一个配置的消息对象messageConfigBO,包含了要发送的topic,以及感兴趣的字段
        for (DataChangeMessageConfigBO messageConfigBO : dataChangeMessageConfigBOS) {
            //不是内部消息的不处理
            if (!MessageTypeEnum.INTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType())) {
                continue;
            }
            String notifyColumn = messageConfigBO.getNotifyColumn();
            String[] columns = notifyColumn.split(CoreConstant.COMMA);

            //遍历处理一条binlog对应的多条变更数据
            for (int i = 0; i < dataChangeMessages.size(); i++) {
                DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);
                List<DataChangeMessage.ColumnValue> columnValues = new ArrayList<>();
                dataChangeMessage.setColumnValues(columnValues);
                //获取一条变更数据具体的变更详情map
                Map<String, Object> data = dataMap.get(i);
                //提取配置的消息对象所感兴趣的列
                for (String column : columns) {
                    columnValues.add(new DataChangeMessage.ColumnValue(column, data.get(column)));
                }
                dataSendMessageBOS.add(new DataSendMessageBO(messageConfigBO, dataChangeMessage));
            }
        }
        return dataSendMessageBOS;
    }
    ...
}

//消息处理对象
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DataSendMessageBO implements Serializable {
    //消息对应的配置,
    private DataChangeMessageConfigBO dataChangeMessageConfigBO;

    //准备发送的消息对象
    private DataChangeMessage dataChangeMessage;
}

7.数据变更消息推送到多系统多路topic中

其中就会发送到商品溯源系统订阅的topic中。

//消息业务实现类
@Service
public class MessageServiceImpl implements MessageService {
    @Resource
    private DataChangeRepository dataChangeRepository;
    ...

    //处理binlog消息
    @Override
    public void processBinlogMessage(BinlogData binlogData) {
        //获取当前表的监听信息
        DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
        //未配置监听信息的表,不作处理
        if (Objects.isNull(listenConfigDO)) {
            return;
        }

        //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象
        List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
        //不需要监听,或者要监听的字段值未变动
        if (CollectionUtils.isEmpty(dataChangeMessages)) {
            return;
        }

        //获取配置的消息对象
        //这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
        //不需要发送消息
        if (CollectionUtils.isEmpty(messageConfigBOS)) {
            return;
        }

        //封装成需要发送的消息对象
        //这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
        //待发送的消息为空,无需处理
        if (CollectionUtils.isEmpty(sendDataMessageList)) {
            return;
        }

        //发送消息
        sendDataMessage(sendDataMessageList);
        //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存
        if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
            //保存消息详细信息
            saveDataMessageDetail(dataChangeMessages, binlogData);
        }
    }

    //发送内部消息
    private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) {
      for (DataSendMessageBO dataChangeMessage : sendDataMessageList) {
          DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage();
          DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO();
          //发送一个延迟队列的消息出去
          dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel());
      }
    }
    ...
}

8.外部消息数据格式转化以及写入DB中

//消息业务实现类
@Service
public class MessageServiceImpl implements MessageService {
    @Resource
    private DataChangeRepository dataChangeRepository;
    ...

    //处理binlog消息
    @Override
    public void processBinlogMessage(BinlogData binlogData) {
        //获取当前表的监听信息
        DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
        //未配置监听信息的表,不作处理
        if (Objects.isNull(listenConfigDO)) {
            return;
        }

        //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象
        List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
        //不需要监听,或者要监听的字段值未变动
        if (CollectionUtils.isEmpty(dataChangeMessages)) {
            return;
        }

        //获取配置的消息对象
        //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
        //不需要发送消息
        if (CollectionUtils.isEmpty(messageConfigBOS)) {
            return;
        }

        //封装成需要发送的消息对象
        //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
        List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
        //待发送的消息为空,无需处理
        if (CollectionUtils.isEmpty(sendDataMessageList)) {
            return;
        }

        //发送消息
        sendDataMessage(sendDataMessageList);

        //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存
        if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
            //保存外部消息详细信息
            saveDataMessageDetail(dataChangeMessages, binlogData);
        }
    }

    //保存消息详细信息
    public void saveDataMessageDetail(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {
        List<DataMessageBO> dataMessageBOS = converterDataMessageBOList(dataChangeMessages, binlogData);
        dataChangeRepository.saveDataMessageDetail(dataMessageBOS);
    }

    //转换消息详细信息
    private List<DataMessageBO> converterDataMessageBOList(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {
        List<DataMessageBO> dataMessageBOS = new ArrayList<>(dataChangeMessages.size());
        for (int i = 0; i < dataChangeMessages.size(); i++) {
            DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);
            DataMessageBO dataMessageBO = dataMessageConverter.converterBO(dataChangeMessage);
            dataMessageBO.setDiffDataArr(String.join(CoreConstant.COMMA, dataChangeMessage.getUpdateColumns()));
            dataMessageBO.setTableDataJson(JSON.toJSONString(binlogData.getDataMap().get(i)));
            dataMessageBOS.add(dataMessageBO);
        }
        return dataMessageBOS;
    }
    ...
}

@Repository
public class DataChangeRepository {
    ...
    //存储外部消息的数据信息
    public void saveDataMessageDetail(List<DataMessageBO> dataMessageBOS) {
        List<DataMessageDetailDO> dataMessageDetailDOS = dataMessageConverter.converterDOList(dataMessageBOS);
        int count = dataMessageDetailMapper.insertBatch(dataMessageDetailDOS);
        if (count <= 0) {
            throw new BaseBizException(CommonErrorCodeEnum.SQL_ERROR);
        }
    }
}

//外部消息处理对象
@Data
public class DataMessageBO implements Serializable {
    //内部消息编号
    private String messageNo;
    //变化的表信息内容
    private String tableDataJson;
    //消息变化字段数组
    private String diffDataArr;
    //表名
    private String tableName;
    //操作类型
    private String action;
}

9.外部消息发送触发以及多路系统Topic推送

(1)商品C端消费内部消息

(2)商品消息处理系统消费消息编号并发送外部消息

(1)商品C端消费内部消息

//商品变更时的缓存处理
@Component
public class ProductUpdateListener implements MessageListenerConcurrently {
    @DubboReference(version = "1.0.0")
    private TableDataUpdateApi tableDataUpdateApi;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                //消息处理这里,涉及到sku的缓存更新以及对应的整个商品明细的缓存更新
                String msg = new String(messageExt.getBody());
                log.info("执行商品缓存数据更新逻辑,消息内容:{}", msg);

                //这里的TableDataChangeDTO 其实应该改成 DataChangeMessage
                TableDataChangeDTO tableDataChangeMessage = JsonUtil.json2Object(msg, TableDataChangeDTO.class);
                if (BinlogType.INSERT.getValue().equals(JSON.parseObject(msg).get("action"))) {
                    //新增,需要将数据添加至布隆过滤器
                    tableDataUpdateApi.addBloomFilter(tableDataChangeMessage);
                }

                //更新sku对应的商品缓存信息
                tableDataUpdateApi.tableDataChange(tableDataChangeMessage);

                //发送回调消息通知,即发送消息编号到MQ
                tableDataUpdateApi.sendCallbackMessage(tableDataChangeMessage);
            }
        } catch (Exception e) {
            log.error("consume error, 商品缓存更新失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

@DubboService(version = "1.0.0", interfaceClass = TableDataUpdateApi.class, retries = 0)
public class TableDataUpdateApiImpl implements TableDataUpdateApi {
    @Autowired
    private DefaultProducer defaultProducer;
    ...

    @Override
    public JsonResult sendCallbackMessage(TableDataChangeDTO tableDataChangeDTO) {
        defaultProducer.sendMessage(
            tableDataChangeDTO.getCallbackTopic(),//这里的topic应该是RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC
            tableDataChangeDTO.getMessageNo(),
            RocketDelayedLevel.DELAYED_1m,
            "表变更消息处理完成返回延迟消息"
        );
        return JsonResult.buildSuccess();
    }
}

(2)商品消息处理系统消费消息编号并发送外部消息

@Configuration
public class ConsumerBeanConfig {
    //配置内容对象
    @Autowired
    private RocketMQProperties rocketMQProperties;
    ...

    //数据变更消费者,负责数据变更监控后发送MQ消息通知订阅方
    @Bean("dataExternalChangeConsumer")
    public DefaultMQPushConsumer dataExternalChangeTopic(DataExternalChangeListener dataExternalChangeListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_EXTERNAL_CHANGE_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC, "*");
        consumer.registerMessageListener(dataExternalChangeListener);
        consumer.start();
        return consumer;
    }
    ...
}

@Component
public class DataExternalChangeListener implements MessageListenerConcurrently {
    @Autowired
    private MessageService messageService;

    @Autowired
    private DataMessageProducer dataMessageProducer;

    @Autowired
    private RedisLock redisLock;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt messageExt : list) {
            String messageNo = new String(messageExt.getBody());
            if (!redisLock.lock(messageNo)) {
                continue;
            }
            try {
                DataMessageBO dataMessageDetail = messageService.getDataMessageDetail(messageNo);
                //未命中到外部消息的数据,默认不处理
                if (Objects.isNull(dataMessageDetail)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                //获取外部消息的报文对象
                List<DataSendMessageBO> sendDataMessageList = messageService.getSendDataMessage(dataMessageDetail);
                if (CollectionUtils.isEmpty(sendDataMessageList)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //发送外部消息
                sendDataMessage(sendDataMessageList);

                //删除这条记录
                messageService.deleteMessage(dataMessageDetail);
            } catch (Exception e) {
                log.error("consume error, 消费外部数据变更消息失败", e);
                //本次消费失败,下次重新消费
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            } finally {
                redisLock.unlock(messageNo);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    //发送外部的消息
    private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) {
        for (DataSendMessageBO dataChangeMessage : sendDataMessageList) {
            DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage();
            DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO();
            //发送一个延迟队列的消息出去
            dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel());
        }
    }
}

@Service
public class MessageServiceImpl implements MessageService {
    @Resource
    private DataChangeRepository dataChangeRepository;
    ...

    //获取对应的外部消息对象信息
    @Override
    public DataMessageBO getDataMessageDetail(String messageNo) {
        DataMessageDetailDO dataMessageDetail = dataChangeRepository.getDataMessageDetail(messageNo);
        return dataMessageConverter.converterBO(dataMessageDetail);
    }

    //获取需要发送的消息对象
    @Override
    public List<DataSendMessageBO> getSendDataMessage(DataMessageBO messageBO) {
        List<DataSendMessageBO> dataChangeMessageConfig = dataChangeRepository.getDataChangeMessageConfig(messageBO);
        return dataChangeMessageConfig;
    }
    ...
}

@Repository
public class DataChangeRepository {
    ...
    //获取某个消息对应的外部消息对象
    public DataMessageDetailDO getDataMessageDetail(String messageNo) {
        LambdaQueryWrapper<DataMessageDetailDO> queryWrapper = Wrappers.lambdaQuery();
        queryWrapper.eq(DataMessageDetailDO::getMessageNo, messageNo);
        return dataMessageDetailMapper.selectOne(queryWrapper);
    }

    //获取需要发送的外部消息对象
    public List<DataSendMessageBO> getDataChangeMessageConfig(DataMessageBO dataMessageBO) {
        List<DataSendMessageBO> dataChangeMessageList = new ArrayList<>();
        DataChangeListenConfigDO dataChangeListenConfig = getDataChangeListenConfig(dataMessageBO.getTableName());
        if (!Objects.isNull(dataChangeListenConfig)) {
            //获取配置的需要发送的外部消息信息
            List<DataChangeMessageConfigDO> dataChangeMessageConfigList = getMessageConfigByListenId(dataChangeListenConfig.getId(), MessageTypeEnum.EXTERNAL_MESSAGE);

            if (!CollectionUtils.isEmpty(dataChangeMessageConfigList)) {
                DataSendMessageBO dataSendMessageBO = new DataSendMessageBO();
                JSONObject tableDataJson = JSONObject.parseObject(dataMessageBO.getTableDataJson());
                List<String> updateColumns = converterList(dataMessageBO.getDiffDataArr());

                for (DataChangeMessageConfigDO messageConfigDO : dataChangeMessageConfigList) {
                    DataChangeMessage dataChangeMessage = dataMessageConverter.converter(dataMessageBO);
                    dataChangeMessage.setUpdateColumns(updateColumns);
                    //获取得到需要发送的字段信息
                    String[] notifyColumnArr = messageConfigDO.getNotifyColumn().split(CoreConstant.COMMA);
                    List<DataChangeMessage.ColumnValue> columnValueList = new ArrayList<>();
                    for (String notifyColumn : notifyColumnArr) {
                        columnValueList.add(new DataChangeMessage.ColumnValue(notifyColumn, tableDataJson.getString(notifyColumn)));
                    }
                    dataChangeMessage.setColumnValues(columnValueList);
                    dataSendMessageBO.setDataChangeMessage(dataChangeMessage);
                    dataSendMessageBO.setDataChangeMessageConfigBO(dataMessageConverter.converterBO(messageConfigDO));
                    dataChangeMessageList.add(dataSendMessageBO);
                }
            }
        }
        return dataChangeMessageList;
    }
    ...
}

10.商品数据变更溯源系统消息处理

(1)消费binlog消息的消费者

(2)消费商品溯源数据的消费者

(3)通用的数据溯源接口

(1)消费binlog消息的消费者

为了展示清晰逻辑,商品消息处理系统使用简单的流程来实现溯源功能。也就是在商品消息处理系统另新建一个Listener来消费binlog消息,然后发送binlog消息到商品溯源系统监听的topic中,所以这里没有接入使用通用的binlog消息处理流程。

@Configuration
public class ConsumerBeanConfig {
    //配置内容对象
    @Autowired
    private RocketMQProperties rocketMQProperties;
    ...

    //数据变更消费者,负责记录数据变更
    @Bean("dataTraceConsumer")
    public DefaultMQPushConsumer dataTraceTopic(DataTraceListener dataTraceListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_CHANGE_CONSUMER_GROUP);
        consumer.setInstanceName("dataTraceListener");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(RocketMqConstant.DATA_CHANGE_TOPIC, "*");
        consumer.registerMessageListener(dataTraceListener);
        consumer.start();
        return consumer;
    }
    ...
}

@Component
public class DataTraceListener implements MessageListenerConcurrently {
    @Autowired
    private DataMessageProducer dataMessageProducer;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String msg = new String(messageExt.getBody());
                log.info("DataTraceListener数据变更消息通知,消息内容:{}", msg);

                JSONObject jsonObject = JSONUtil.parseObj(msg);
                //操作类型
                String operateType = jsonObject.getStr("type");
                if (!BinlogType.INSERT.getValue().equals(operateType)
                        && !BinlogType.DELETE.getValue().equals(operateType)
                        && !BinlogType.UPDATE.getValue().equals(operateType)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //ddl
                if (jsonObject.getBool("isDdl")) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                //把消息解析成TraceData对象
                List<TraceData> traceDataList = BinlogUtils.getTraceData(msg);
                if (CollectionUtils.isEmpty(traceDataList)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                //发送消息
                dataMessageProducer.send(traceDataList, RocketMqConstant.PRODUCT_TRACE_TOPIC, -1);
            }
        } catch (Exception e) {
            log.error("DataTraceListener consume error, 消费数据变更消息失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

//MySQL binlog解析工具类
public abstract class BinlogUtils {
    ...
    //把binlog json字符串解析成TraceData
    public static List<TraceData> getTraceData(String binlogStr) {
        if (!JSONUtil.isJson(binlogStr)) {
            return null;
        }

        List<TraceData> traceDataList = new ArrayList<>();
        JSONObject jsonObject = JSONUtil.parseObj(binlogStr);

        //表名
        String tableName = jsonObject.getStr("table");

        //所有的新数据
        JSONArray dataArray = jsonObject.getJSONArray("data");
        List<Map<String, Object>> newDataList = BinlogUtils.jsonArrayToMapList(dataArray);

        //变化了的旧数据
        List<Map<String, Object>> changedOldDataList = null;
        if (!jsonObject.isNull("old")) {
            dataArray = jsonObject.getJSONArray("old");
            changedOldDataList = BinlogUtils.jsonArrayToMapList(dataArray);
        }

        int size = newDataList.size();
        for (int i = 0; i < size; i++) {
            TraceData traceData = new TraceData();
            traceData.setTableName(tableName);

            //newData里面有所有的字段
            Map<String, Object> newData = newDataList.get(i);
            traceData.setNewData(newData);

            if (changedOldDataList != null) {
                //oldData里面只有变化了的字段
                Map<String, Object> changedOldData = changedOldDataList.get(i);
                Map<String, Object> oldData = new HashMap<>();
                newData.forEach(oldData::put);
                changedOldData.forEach(oldData::put);
                traceData.setOldData(oldData);
            } else {
                traceData.setOldData(Collections.emptyMap());
            }
            traceDataList.add(traceData);
        }

        return traceDataList;
    }
    ...
}

@Data
public class TraceData implements Serializable {
    private static final long serialVersionUID = 1L;
    //binlog对应的表名
    private String tableName;
    //新数据,所有的字段和旧的值
    private Map<String, Object> newData;
    //旧数据,所有的字段和旧的值
    private Map<String, Object> oldData;
}

(2)商品溯源数据的消费者

@Configuration
public class ConsumerBeanConfig {
    //配置内容对象
    @Autowired
    private RocketMQProperties rocketMQProperties;

    @Bean("productTraceConsumer")
    public DefaultMQPushConsumer productUpdateTopic(ProductTraceListener productTraceListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PRODUCT_TRACE_CONSUMER_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(RocketMqConstant.PRODUCT_TRACE_TOPIC, "*");
        consumer.registerMessageListener(productTraceListener);
        consumer.start();
        return consumer;
    }
}

//把商品溯源信息保存到es
@Component
public class ProductTraceListener implements MessageListenerConcurrently {
    @Autowired
    private TraceDataRepository traceDataRepository;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String msg = new String(messageExt.getBody());
                log.info("把商品溯源信息保存到es,消息内容:{}", msg);

                List<TraceData> traceDataList = JsonUtil.json2Object(msg, new TypeReference<List<TraceData>>() { });
                if (!CollectionUtils.isEmpty(traceDataList)) {
                    traceDataRepository.saveTraceDataList(traceDataList);
                }
            }
        } catch (Exception e) {
            log.error("consume error, 把商品溯源信息保存到es失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

@Repository
public class TraceDataRepository {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    //批量保存商品溯源数据
    public void saveTraceDataList(List<TraceData> traceDataList) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (TraceData traceData : traceDataList) {
            //1.因为我们是日志类的数据可以直接使用es的动态mapping
            //2.按照月份归档存储
            String indexName = getIndexName(traceData.getTableName());
            //3.避免文档中的嵌套字段
            Map<String, Object> jsonMap = getJsonMap(traceData);
            IndexRequest indexRequest = new IndexRequest(indexName).source(jsonMap);
            bulkRequest.add(indexRequest);
        }
        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    }

    private Map<String, Object> getJsonMap(TraceData traceData) {
        Map<String, Object> jsonMap = new LinkedHashMap<>();
        traceData.getOldData().forEach((k, v) -> jsonMap.put("old_" + k, v));
        traceData.getNewData().forEach((k, v) -> jsonMap.put("new_" + k, v));
        return jsonMap;
    }

    private String getIndexName(String tableName) {
        String month = DateFormatUtils.format(new Date(), "_yyyy_MM");
        return tableName + "_trace_index" + month;
    }
}

写入的溯源数据如下:

[{
    "_index": "sku_info_trace_index_2022_03",
    "_type": "_doc",
    "_id": "TUs8oX8BYBoqB9x4nTOy",
    "_score": 1,
    "_source": {
        "old_del_flag": "1",
        "old_sku_grade": "1",
        "old_create_time": "2022-03-19 08:07:41",
        "old_item_id": "100001273622",
        "old_sku_type": "1",
        "old_channel": "1",
        "old_sku_id": "8000177339",
        "old_version_id": "0",
        "old_update_time": "2022-03-19 08:07:41",
        "old_update_user": "1",
        "old_base_price": "6199",
        "old_seller_type": "1",
        "old_sku_name": "Apple iPhone 13 (A2634) 256GB 粉⾊ ⽀持移动联通 电信5G 双卡双待⼿机",
        "old_id": "73",
        "old_vip_price": "4900",
        "old_create_user": "1",
        "new_del_flag": "1",
        "new_sku_grade": "1",
        "new_create_time": "2022-03-19 08:12:35",
        "new_item_id": "100001273622",
        "new_sku_type": "1",
        "new_channel": "1",
        "new_sku_id": "8000177339",
        "new_version_id": "0",
        "new_update_time": "2022-03-19 08:12:35",
        "new_update_user": "1",
        "new_base_price": "8100",
        "new_seller_type": "1",
        "new_sku_name": "Apple iPhone 13 (A2634) 512GB 午夜⾊ ⽀持移动联 通电信5G 双卡双待⼿机",
        "new_id": "73",
        "new_vip_price": "4900",
        "new_create_user": "1"
    }
},{
    "_index": "sku_info_trace_index_2022_03",
    "_type": "_doc",
    "_id": "Tks8oX8BYBoqB9x4nTOy",
    "_score": 1,
    "_source": {
        "old_del_flag": "1",
        "old_sku_grade": "1",
        "old_create_time": "2022-03-19 08:07:41",
        "old_item_id": "100001273622",
        "old_sku_type": "1",
        "old_channel": "1",
        "old_sku_id": "8000177340",
        "old_version_id": "0",
        "old_update_time": "2022-03-19 08:07:41",
        "old_update_user": "1",
        "old_base_price": "6199",
        "old_seller_type": "1",
        "old_sku_name": "Apple iPhone 13 (A2634) 256GB 粉⾊ ⽀持移动联通 电信5G 双卡双待⼿机",
        "old_id": "74",
        "old_vip_price": "6700",
        "old_create_user": "1",
        "new_del_flag": "1",
        "new_sku_grade": "1",
        "new_create_time": "2022-03-19 08:12:35",
        "new_item_id": "100001273622",
        "new_sku_type": "1",
        "new_channel": "1",
        "new_sku_id": "8000177340",
        "new_version_id": "0",
        "new_update_time": "2022-03-19 08:12:35",
        "new_update_user": "1",
        "new_base_price": "8100",
        "new_seller_type": "1",
        "new_sku_name": "Apple iPhone 13 (A2634) 512GB 午夜⾊ ⽀持移动联 通电信5G 双卡双待⼿机",
        "new_id": "74",
        "new_vip_price": "6700",
        "new_create_user": "1"
    }
}]

(3)通用的数据溯源接口

@DubboService(version = "1.0.0", interfaceClass = ProductTraceApi.class, retries = 0)
public class ProductTraceApiImpl implements ProductTraceApi {
    @Autowired
    private TraceDataService traceDataService;

    //通用的数据溯源接口
    @Override
    public JsonResult<PageResult<TraceDataDTO>> listTraceDataPage(QueryTraceDataPageRequest request) {
        try {
            PageResult<TraceDataDTO> resultDTO = traceDataService.listTraceDataPage(request);
            return JsonResult.buildSuccess(resultDTO);
        } catch (ProductBizException e) {
            log.error("biz error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
        } catch (Exception e) {
            log.error("system error: request={}", JSON.toJSONString(request), e);
            return JsonResult.buildError(e.getMessage());
        }
    }
}

//查询溯源数据请求
@Data
public class QueryTraceDataPageRequest extends PageRequest {
    //索引名称,格式为对应的mysql表的名称加上"_trace_index",比如sku_info表对应的溯源数据索引为sku_info_index_yyyy_MM
    private String indexName;

    //查询参数,key为mysql表里的字段名称,value为mysql表里的值,比如要查询skuId=8000177340的溯源数据,可以传{"sku_id":"8000177340"}
    private Map<String, Object> queryParams;
}

@Service
public class TraceDataServiceImpl implements TraceDataService {
    @Autowired
    private TraceDataRepository traceDataRepository;

    //查询数据溯源
    @Override
    public PageResult<TraceDataDTO> listTraceDataPage(QueryTraceDataPageRequest request) throws IOException {
        return traceDataRepository.listTraceDataPage(request);
    }
}

@Repository
public class TraceDataRepository {
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    ...

    //查询商品溯源数据
    public PageResult<TraceDataDTO> listTraceDataPage(QueryTraceDataPageRequest request) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        Map<String, Object> queryParams = request.getQueryParams();
        if (MapUtils.isNotEmpty(queryParams)) {
            queryParams.forEach((k, v) -> searchSourceBuilder.query(QueryBuilders.termQuery("new_" + k, v)));
        }

        int from = (request.getPageNum() - 1) * request.getPageSize();
        searchSourceBuilder.from(from);
        searchSourceBuilder.size(request.getPageSize());
        SearchRequest searchRequest = new SearchRequest(request.getIndexName() + "*");
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        PageResult<TraceDataDTO> pageResult = new PageResult<>();
        List<TraceDataDTO> pageContent = new ArrayList<>();
        SearchHit[] hits = searchResponse.getHits().getHits();
        for (SearchHit hit : hits) {
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            //取出所有的字段
            Set<String> fieldNames = sourceAsMap.keySet().stream()
                .map(e -> e.replace("new_", "").replace("old_", ""))
                .collect(Collectors.toSet());
            List<TraceDataDTO.FieldInfo> list = new ArrayList<>();
            for (String fieldName : fieldNames) {
                Object fieldValue = sourceAsMap.get("new_" + fieldName);
                Object oldValue = sourceAsMap.get("old_" + fieldName);
                boolean changed = !Objects.equals(fieldValue, oldValue);
                list.add(new TraceDataDTO.FieldInfo(fieldName, fieldValue, oldValue, changed));
            }
            pageContent.add(new TraceDataDTO(list));
        }

        pageResult.setContent(pageContent);
        pageResult.setTotalElements(searchResponse.getHits().getTotalHits().value);
        pageResult.setSize(request.getPageSize());
        pageResult.setNumber(request.getPageNum());
        return pageResult;
    }
}

查出的溯源数据如下:字段值是否变更通过"changed"来区分,true表示更改过,false表示未更改过。

{
    "data": {
      "number": 1,
      "size": 5,
      "content": [{
          "fieldInfos": [
              ...
              {
                  "fieldName": "create_time",
                  "oldValue": "2022-03-19 08:07:41",
                  "fieldValue": "2022-03-19 08:12:35",
                  "changed": true
              }, {
                  "fieldName": "item_id",
                  "oldValue": "100001273622",
                  "fieldValue": "100001273622",
                  "changed": false
              }, {
                  "fieldName": "sku_specs_value",
                  "oldValue": "[{\"key\":\"颜⾊\",\"sort\":1,\"value\":\"蓝⾊2\"},{\"key\":\"内存\",\"sort\":2,\"value\":\"128g3\"}]",
                  "fieldValue": "[{\"key\":\"颜⾊\",\"sort\":1,\"value\":\"蓝⾊2\"},{\"key\":\"内存\",\"sort\":2,\"value\":\"128g3\"}]",
                  "changed": false
              }, {
                  "fieldName": "update_time",
                  "oldValue": "2022-03-19 08:07:41",
                  "fieldValue": "2022-03-19 08:12:35",
                  "changed": true
              }, {
                  "fieldName": "update_user",
                  "oldValue": "1",
                  "fieldValue": "1",
                  "changed": false
              }, {
                  "fieldName": "base_price",
                  "oldValue": "6199",
                  "fieldValue": "8100",
                  "changed": true
              }, {
                  "fieldName": "seller_type",
                  "oldValue": "1",
                  "fieldValue": "1",
                  "changed": false
              }, {
                  "fieldName": "sku_name",
                  "oldValue": "Apple iPhone 13 (A2634) 256GB 粉⾊ ⽀ 持移动联通电信5G 双卡双待⼿机",
                  "fieldValue": "Apple iPhone 13 (A2634) 512GB 午夜⾊ ⽀持移动联通电信5G 双卡双待⼿机",
                  "changed": true
              }, {
                  "fieldName": "vip_price",
                  "oldValue": "6700",
                  "fieldValue": "6700",
                  "changed": false
              }, {
                  "fieldName": "create_user",
                  "oldValue": "1",
                  "fieldValue": "1",
                  "changed": false
              }
              ...
          ]
      }],
      "totalElements": 1
    },
    "success": true
}

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

06-23 11:43
门头沟学院 Java
allin校招的烤冷面很爱看电影:我靠,今天中午我也是这个hr隔一个星期发消息给我。问的问题还是一模一样的😅
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务