商品中心—7.自研缓存框架的技术文档
大纲
1.商品C端系统监听商品变更及刷新缓存
2.自研缓存框架的数据表缓存组件
3.自研缓存框架的通用缓存读写组件与DB操作组件
1.商品C端系统监听商品变更及刷新缓存
FlushRedisCache的flushRedisStringData()方法刷新缓存的逻辑是:首先从DB查询最新的数据 -> 然后删除旧缓存 -> 最后更新缓存。
@Configuration public class ConsumerBeanConfig { //配置内容对象 @Autowired private RocketMQProperties rocketMQProperties; //监听商品修改的MQ消息 @Bean("productUpdateTopic") public DefaultMQPushConsumer productUpdateTopic(ProductUpdateListener productUpdateListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PRODUCT_UPDATE_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.PRODUCT_UPDATE_TOPIC, "*"); consumer.registerMessageListener(productUpdateListener); consumer.start(); return consumer; } } //商品变更时的缓存处理 @Component public class ProductUpdateListener implements MessageListenerConcurrently { @DubboReference(version = "1.0.0") private TableDataUpdateApi tableDataUpdateApi; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { //消息处理这里,涉及到sku的缓存更新以及对应的整个商品明细的缓存更新 String msg = new String(messageExt.getBody()); log.info("执行商品缓存数据更新逻辑,消息内容:{}", msg); TableDataChangeDTO tableDataChangeMessage = JsonUtil.json2Object(msg, TableDataChangeDTO.class); //更新sku对应的商品缓存信息 tableDataUpdateApi.tableDataChange(tableDataChangeMessage); //发送回调消息通知 tableDataUpdateApi.sendCallbackMessage(tableDataChangeMessage); } } catch (Exception e) { log.error("consume error, 商品缓存更新失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } //商品信息变更服务 @DubboService(version = "1.0.0", interfaceClass = TableDataUpdateApi.class, retries = 0) public class TableDataUpdateApiImpl implements TableDataUpdateApi { @Resource private FlushRedisCache flushRedisCache; private ExecutorService executorService = Executors.newFixedThreadPool(10); //商品表数据变更逆向更新缓存 @Override public JsonResult tableDataChange(TableDataChangeDTO tableDataChangeDTO) { executorService.execute(() -> { try { //刷新缓存数据 flushRedisCache.flushRedisStringData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId())); } catch (Exception e) { log.error("刷新string类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e); } try { flushRedisCache.flushRedisSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId())); } catch (Exception e) { log.error("刷新set类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e); } try { flushRedisCache.flushRedisSortedSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId())); } catch (Exception e) { log.error("刷新sortedset类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e); } }); return JsonResult.buildSuccess(); } ... } //数据变更—刷新缓存 @Component public class FlushRedisCache { //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中 //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等 @Autowired private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap; //更新string类型缓存 //@param flushAll 是否全量刷新 //@param tableName 表名 //@param idSet 主键ID集合 public void flushRedisStringData(boolean flushAll, String tableName, Set<Long> idSet) { for (Map.Entry<String, AbstractRedisStringCache> entry : abstractRedisStringCacheMap.entrySet()) { AbstractRedisStringCache stringCache = entry.getValue(); if (flushAll) { stringCache.flushRedisStringDataByTableUpdateData(); continue; } //继承AbstractRedisStringCache的每个缓存实例都指定来表名,如下用于匹配出对应的缓存实例 if (stringCache.getTableName().contains(tableName)) { stringCache.flushRedisStringDataByTableAndIdSet(tableName, idSet); } } } ... } //Redis(string)缓存抽象类 public abstract class AbstractRedisStringCache<DO, BO> { @Resource private RedisReadWriteManager redisReadWriteManager; ... //刷新缓存—根据主表ID集合(关联表变更需要查询主表) public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) { Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet); if (!idSetOpt.isPresent()) { return; } flushRedisStringDataByIdSet(idSetOpt.get()); } //刷新缓存—根据主键ID集合 //@param idSet 数据表主键ID private void flushRedisStringDataByIdSet(Set<Long> idSet) { //根据id集合从数据库中查询出数据 Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType()); if (!stringSourceOpt.isPresent()) { return; } RedisStringCache<DO> redisStringCache = stringSourceOpt.get(); if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) { //通过缓存读写组件删除缓存 redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{})); } if (CollectionUtils.isEmpty(redisStringCache.getAddList())) { return; } List<BO> boList = convertDO2BO(redisStringCache.getAddList()); Map<String, BO> redisMap = convertBO2Map(boList); //通过缓存读写组件写入缓存 redisReadWriteManager.batchWriteRedisString(redisMap); } ... } //缓存读写管理 @Service public class RedisReadWriteManager { @Resource private RedisCache redisCache; ... //删除指定的key public void delete(String... keys) { Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key)); } //批量添加string缓存 public <T> void batchWriteRedisString(Map<String, T> redisMap) { List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet()); try { for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) { for (Map.Entry<String, T> entry : entries) { redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS)); } try { Thread.sleep(SLEEP_100); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { log.error("批量添加string缓存异常 redisMap={}", redisMap, e); } } ... }
2.自研缓存框架的数据表缓存组件
(1)自研缓存框架的目录结构
(2)通过注解实现继承同一个抽象类的数据表缓存组件实例自动注入Map
(1)自研缓存框架的目录结构
(2)通过注解实现继承同一个抽象类的数据表缓存组件实例自动注入Map
继承AbstractRedisStringCache的数据表缓存组件实例会被注入到Map中,之后通过Map便可以根据表名来获取对应的数据表缓存组件实例了。数据表缓存组件实例中会封装一些获取DB数据表数据、获取缓存key等方法,而其继承的抽象类会提供一些根据DB获取缓存、刷新缓存等模版方法。
@Component public class FlushRedisCache { //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中 //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等 @Autowired private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap; //继承了AbstractRedisSortedSetCache的缓存实例会被注入到abstractRedisSortedSetCacheMap这个map中 //例如SkuSellerCache @Autowired private Map<String, AbstractRedisSortedSetCache> abstractRedisSortedSetCacheMap; ... } @Service("productDetailCache") public class ProductDetailCache extends AbstractRedisStringCache<ProductDetailDO, ProductDetailBO> { //DB操作组件 @Resource private ProductDetailStringDatabase productDetailStringDatabase; @Resource private ProductDetailConverter productDetailConverter; @Override protected RedisStringDatabase<ProductDetailDO> getStringDatabase() { return productDetailStringDatabase; } @Override protected String getPendingRedisKey() { return AbstractRedisKeyConstants.PRODUCT_SKU_INFO_STRING; } @Override protected List<ProductDetailBO> convertDO2BO(Collection<ProductDetailDO> doList) { return productDetailConverter.converterDetailList(doList); } @Override protected Map<String, Object> getTableFieldsMap(String key) { Map<String, Object> dbInputMap = Maps.newHashMap(); dbInputMap.put(SkuCollectStringDatabase.SKU_ID, key); return dbInputMap; } @Override protected Class<ProductDetailBO> getBOClass() { return ProductDetailBO.class; } ... } //Redis(string)缓存抽象类 //@param <DO> 数据对象 //@param <BO> 缓存对象 public abstract class AbstractRedisStringCache<DO, BO> { @Resource private RedisReadWriteManager redisReadWriteManager; //获取redis key //@param key 需要替换的关键字 protected String getRedisKey(String key) { return String.format(getPendingRedisKey(), key); } //单个获取数据库表名 public String getTableName() { return getStringDatabase().getTableName(); } //获取DB读取对象 protected abstract RedisStringDatabase<DO> getStringDatabase(); //获取待处理的Redis key protected abstract String getPendingRedisKey(); //DO转BO protected abstract List<BO> convertDO2BO(Collection<DO> doList); //关联表字段值 protected abstract Map<String, Object> getTableFieldsMap(String key); //获取BO对象的class protected abstract Class<BO> getBOClass(); ... //模版方法:根据关键字批量获取数据 //@param useLocalCache 是否使用本地缓存 //@param keyList 入参关键字 public Optional<List<BO>> listRedisStringData(Boolean useLocalCache, List<String> keyList) { if (CollectionUtils.isEmpty(keyList)) { return Optional.empty(); } Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByCache(useLocalCache, keyList, getBloomKey(), getStringDatabase()::getTableSingleFiled, getStringDatabase().getBloomField(), getBOClass(), this::getRedisKey, (key) -> { Map<String, Object> tableFieldsMap = getTableFieldsMap(key); Optional<DO> doOpt; try { doOpt = getStringDatabase().getTableData(tableFieldsMap, queryType()); } catch (Exception e) { log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e); return Optional.empty(); } if (!doOpt.isPresent()) { return Optional.empty(); } List<BO> boList = convertDO2BO(Arrays.asList(doOpt.get())); if (CollectionUtils.isEmpty(boList)) { return Optional.empty(); } return Optional.of(boList.get(0)); } ); return boListOpt; } //模版方法:根据多关键字批量获取集合数据 //@param keyList 入参关键字 //@param requestMap key 数据库表字段,value 字段值,该map中的字段不要与getTableFieldsMap(key)获取的字段重复 public Optional<List<BO>> listRedisStringData(List<String> keyList, Map<String, Object> requestMap) { if (CollectionUtils.isEmpty(keyList)) { return Optional.empty(); } Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByBatchCache(keyList, getBOClass(), this::getRedisKey, (key) -> { Map<String, Object> tableFieldsMap = getTableFieldsMap(key); if (MapUtils.isNotEmpty(requestMap)) { tableFieldsMap.putAll(requestMap); } Optional<List<DO>> doOpt; try { doOpt = getStringDatabase().listTableData(tableFieldsMap, queryType()); } catch (Exception e) { log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e); return Optional.empty(); } if (!doOpt.isPresent()) { return Optional.empty(); } List<BO> boList = convertDO2BO(doOpt.get()); if (CollectionUtils.isEmpty(boList)) { return Optional.empty(); } return Optional.of(boList); }); return boListOpt; } //模版方法:刷新缓存—根据主表ID集合(关联表变更需要查询主表) //@param tableName 关联表名 //@param idSet 关联表主键集合 public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) { Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet); if (!idSetOpt.isPresent()) { return; } flushRedisStringDataByIdSet(idSetOpt.get()); } //模版方法:刷新缓存—根据表变更数据ID集合 public void flushRedisStringDataByTableUpdateData() { Optional<Set<Long>> updateIdSetOpt = getStringDatabase().getTableUpdateIdSet(); if (!updateIdSetOpt.isPresent()) { return; } flushRedisStringDataByIdSet(updateIdSetOpt.get()); } //模版方法:刷新缓存—根据主键ID集合 //@param idSet 数据表主键ID private void flushRedisStringDataByIdSet(Set<Long> idSet) { //根据id集合从数据库中查询出数据 Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType()); if (!stringSourceOpt.isPresent()) { return; } RedisStringCache<DO> redisStringCache = stringSourceOpt.get(); if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) { //通过缓存读写组件删除缓存 redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{})); } if (CollectionUtils.isEmpty(redisStringCache.getAddList())) { return; } List<BO> boList = convertDO2BO(redisStringCache.getAddList()); Map<String, BO> redisMap = convertBO2Map(boList); //通过缓存读写组件写入缓存 redisReadWriteManager.batchWriteRedisString(redisMap); } ... }
3.自研缓存框架的通用缓存读写组件与DB操作组件
(1)通用缓存读写组件
(2)数据表的DB操作组件
每个数据表缓存组件都会有至少两个必备组件:一个通用缓存读写组件 + 一个数据表的DB操作组件。
(1)通用缓存读写组件
通用缓存读写组件也包含两个必备组件:一个操作缓存的RedisCache组件 + 一个操作分布式锁的RedisLock组件。通用缓存读写组件封装了大量基础的缓存读写操作,这些基础的缓存读写操作会结合DB读库 + 缓存问题等解决方案来进行实现。
//缓存读写管理 @Service public class RedisReadWriteManager { @Resource private RedisCache redisCache; @Resource private RedisLock redisLock; ... //删除指定的key public void delete(String... keys) { Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key)); } //批量添加string缓存 public <T> void batchWriteRedisString(Map<String, T> redisMap) { List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet()); try { for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) { for (Map.Entry<String, T> entry : entries) { redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS)); } try { Thread.sleep(SLEEP_100); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { log.error("批量添加string缓存异常 redisMap={}", redisMap, e); } } //批量获取多缓存数据 public <T> Optional<List<T>> listRedisStringDataByBatchCache(List<String> keyList, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) { try { List<T> list = Lists.newArrayList(); List<String> pendingKeyList = keyList.stream().distinct().collect(toList()); List<String> redisKeyList = pendingKeyList.stream().map(getRedisKeyFunction).distinct().collect(toList()); List<String> cacheList = redisCache.mget(redisKeyList); for (int i = 0; i < cacheList.size(); i++) { String cache = cacheList.get(i); //过滤无效缓存 if (EMPTY_OBJECT_STRING.equals(cache)) { continue; } if (StringUtils.isNotBlank(cache)) { List<T> tList = JSON.parseArray(cache, clazz); list.addAll(tList); continue; } //缓存没有则读库 Optional<List<T>> optional = listRedisStringDataByDb(pendingKeyList.get(i), getRedisKeyFunction, getDbFuction); if (optional.isPresent()) { list.addAll(optional.get()); } } return CollectionUtils.isEmpty(list) ? Optional.empty() : Optional.of(list); } catch (Exception e) { log.error("批量获取缓存数据异常 keyList={},clazz={}", keyList, clazz, e); throw e; } } //读取数据库表多数据赋值到Redis public <T> Optional<List<T>> listRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) { if (StringUtils.isEmpty(key) || Objects.isNull(getDbFuction)) { return Optional.empty(); } try { if (!redisLock.lock(key)) { return Optional.empty(); } String redisKey = getRedisKeyFunction.apply(key); Optional<List<T>> optional = getDbFuction.apply(key); if (!optional.isPresent()) { //把空对象暂存到redis redisCache.setex(true, redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24)); log.warn("发生缓存穿透 redisKey={}", redisKey); return optional; } //把表数据对象存到redis redisCache.setex(true, redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS)); log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, optional.get()); return optional; } finally { redisLock.unlock(key); } } ... }
(2)数据表的DB操作组件
该组件主要提供对数据表数据的查询方法。
@Service("productDetailStringDatabase") public class ProductDetailStringDatabase extends AbstractRedisStringDatabase<ProductDetailDO> { public static final String SKU_ID = "skuId"; private static final String TABLE_NAME = "sku_info"; @Resource private ProductRepository productRepository; @Override public String getTableName() { return TABLE_NAME; } @Override public Set<String> getTableNameSet() { return Sets.newHashSet(getTableName()); } @Override public Optional<ProductDetailDO> getTableData(Map<String, Object> tableFieldsMap, String queryType) { if (tableFieldsMap.containsKey(SKU_ID)) { String skuId = (String) tableFieldsMap.get(SKU_ID); ProductDetailDO productDetailDO = productRepository.queryProductDetail(skuId); if (!Objects.isNull(productDetailDO) && DelFlagEnum.EFFECTIVE.getCode().equals(productDetailDO.getDelFlag())) { return Optional.of(productDetailDO); } return Optional.empty(); } return Optional.empty(); } @Override public Optional<List<ProductDetailDO>> listTableData(Map<String, Object> tableFieldsMap, String queryType) { return Optional.empty(); } @Override public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(Set<Long> idSet, String queryType) { RedisStringCache redisStringCache = new RedisStringCache(); List<ProductDetailDO> addList = new ArrayList<>(); for (Long skuId : idSet) { ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId)); if (!Objects.isNull(productDetailDO)) { addList.add(productDetailDO); } } redisStringCache.setAddList(addList); return Optional.of(redisStringCache); } @Override public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(List<Long> idSet, String queryType) { Long skuId = idSet.get(0); ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId)); RedisStringCache redisStringCache = new RedisStringCache(); redisStringCache.setAddList(Arrays.asList(productDetailDO)); return Optional.of(redisStringCache); } }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等