10.4 数据库架构演进
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式:如何进行分库分表?读写分离如何实现?数据库架构如何演进?
预计阅读时间:45分钟
🎯 数据库架构演进路径
单体数据库到分布式数据库
/** * 数据库架构演进阶段 */ @Component public class DatabaseArchitectureEvolution { /** * 架构演进阶段定义 */ public enum EvolutionStage { SINGLE_DB("单体数据库", "单机MySQL", "< 1万QPS"), MASTER_SLAVE("主从复制", "一主多从", "< 10万QPS"), READ_WRITE_SPLIT("读写分离", "读写分离中间件", "< 50万QPS"), HORIZONTAL_SHARDING("水平分片", "分库分表", "< 100万QPS"), DISTRIBUTED_DB("分布式数据库", "分布式事务", "> 100万QPS"); private final String name; private final String description; private final String capacity; } /** * 架构演进决策器 */ @Component public static class ArchitectureDecisionMaker { /** * 根据业务指标决定架构演进方向 */ public EvolutionStage decideNextStage(DatabaseMetrics metrics) { long qps = metrics.getQps(); long dataSize = metrics.getDataSizeGB(); double cpuUsage = metrics.getCpuUsage(); double memoryUsage = metrics.getMemoryUsage(); if (qps < 10000 && dataSize < 100 && cpuUsage < 70) { return EvolutionStage.SINGLE_DB; } else if (qps < 100000 && cpuUsage > 80) { return EvolutionStage.MASTER_SLAVE; } else if (qps < 500000 && hasReadWriteImbalance(metrics)) { return EvolutionStage.READ_WRITE_SPLIT; } else if (dataSize > 1000 || qps > 500000) { return EvolutionStage.HORIZONTAL_SHARDING; } else { return EvolutionStage.DISTRIBUTED_DB; } } private boolean hasReadWriteImbalance(DatabaseMetrics metrics) { return metrics.getReadQps() / (double) metrics.getWriteQps() > 3; } /** * 生成演进建议 */ public EvolutionRecommendation generateRecommendation(DatabaseMetrics metrics) { EvolutionStage nextStage = decideNextStage(metrics); return EvolutionRecommendation.builder() .currentStage(metrics.getCurrentStage()) .recommendedStage(nextStage) .reasons(generateReasons(metrics, nextStage)) .migrationSteps(generateMigrationSteps(nextStage)) .estimatedEffort(estimateEffort(nextStage)) .risks(identifyRisks(nextStage)) .build(); } } }
📖 读写分离架构
主从复制与读写分离实现
/** * 读写分离架构实现 */ @Configuration public class ReadWriteSplitConfiguration { /** * 主从数据源配置 */ @Bean @Primary public DataSource masterDataSource() { HikariConfig config = new HikariConfig(); config.setJdbcUrl("jdbc:mysql://master.db.com:3306/app_db"); config.setUsername("app_user"); config.setPassword("app_password"); config.setMaximumPoolSize(50); config.setMinimumIdle(10); return new HikariDataSource(config); } @Bean public DataSource slave1DataSource() { HikariConfig config = new HikariConfig(); config.setJdbcUrl("jdbc:mysql://slave1.db.com:3306/app_db"); config.setUsername("readonly_user"); config.setPassword("readonly_password"); config.setMaximumPoolSize(30); config.setMinimumIdle(5); config.setReadOnly(true); return new HikariDataSource(config); } @Bean public DataSource slave2DataSource() { HikariConfig config = new HikariConfig(); config.setJdbcUrl("jdbc:mysql://slave2.db.com:3306/app_db"); config.setUsername("readonly_user"); config.setPassword("readonly_password"); config.setMaximumPoolSize(30); config.setMinimumIdle(5); config.setReadOnly(true); return new HikariDataSource(config); } /** * 动态数据源路由 */ @Bean public DataSource routingDataSource() { DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource(); Map<Object, Object> dataSourceMap = new HashMap<>(); dataSourceMap.put("master", masterDataSource()); dataSourceMap.put("slave1", slave1DataSource()); dataSourceMap.put("slave2", slave2DataSource()); routingDataSource.setTargetDataSources(dataSourceMap); routingDataSource.setDefaultTargetDataSource(masterDataSource()); return routingDataSource; } /** * 动态数据源实现 */ public static class DynamicRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return DataSourceContextHolder.getDataSourceType(); } } /** * 数据源上下文持有者 */ public static class DataSourceContextHolder { private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>(); public static void setDataSourceType(String dataSourceType) { CONTEXT_HOLDER.set(dataSourceType); } public static String getDataSourceType() { return CONTEXT_HOLDER.get(); } public static void clearDataSourceType() { CONTEXT_HOLDER.remove(); } } }
读写分离中间件
/** * 读写分离中间件实现 */ @Component public class ReadWriteSplitMiddleware { /** * 读写分离注解 */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface DataSource { String value() default "master"; } /** * 读写分离切面 */ @Aspect @Component public static class DataSourceAspect { private final List<String> slaveDataSources = Arrays.asList("slave1", "slave2"); private final AtomicInteger counter = new AtomicInteger(0); @Around("@annotation(dataSource)") public Object around(ProceedingJoinPoint joinPoint, DataSource dataSource) throws Throwable { try { String targetDataSource = determineDataSource(dataSource, joinPoint); DataSourceContextHolder.setDataSourceType(targetDataSource); return joinPoint.proceed(); } finally { DataSourceContextHolder.clearDataSourceType(); } } /** * 确定目标数据源 */ private String determineDataSource(DataSource dataSource, ProceedingJoinPoint joinPoint) { String specifiedDataSource = dataSource.value(); // 如果明确指定了数据源,直接使用 if (!"auto".equals(specifiedDataSource)) { return specifiedDataSource; } // 自动判断读写操作 String methodName = joinPoint.getSignature().getName().toLowerCase(); if (isReadOperation(methodName)) { return selectSlaveDataSource(); } else { return "master"; } } /** * 判断是否为读操作 */ private boolean isReadOperation(String methodName) { return methodName.startsWith("select") || methodName.startsWith("get") || methodName.startsWith("find") || methodName.startsWith("query") || methodName.startsWith("count") || methodName.startsWith("exists"); } /** * 选择从库数据源(负载均衡) */ private String selectSlaveDataSource() { int index = counter.getAndIncrement() % slaveDataSources.size(); return slaveDataSources.get(index); } } /** * 主从延迟监控 */ @Component public static class ReplicationLagMonitor { @Autowired private JdbcTemplate masterJdbcTemplate; @Autowired private JdbcTemplate slaveJdbcTemplate; /** * 检查主从延迟 */ @Scheduled(fixedRate = 30000) // 30秒检查一次 public void checkReplicationLag() { try { // 获取主库位置 String masterStatus = masterJdbcTemplate.queryForObject( "SHOW MASTER STATUS", String.class); // 获取从库状态 Map<String, Object> slaveStatus = slaveJdbcTemplate.queryForMap( "SHOW SLAVE STATUS"); long lagSeconds = calculateLag(masterStatus, slaveStatus); if (lagSeconds > 10) { // 延迟超过10秒告警 log.warn("Replication lag detected: {} seconds", lagSeconds); handleReplicationLag(lagSeconds); } } catch (Exception e) { log.error("Failed to check replication lag", e); } } private long calculateLag(String masterStatus, Map<String, Object> slaveStatus) { // 计算主从延迟的具体逻辑 return 0; } private void handleReplicationLag(long lagSeconds) { // 处理主从延迟的逻辑,如切换到主库读取 if (lagSeconds > 60) { // 延迟过大,暂时禁用从库 disableSlaveReading(); } } private void disableSlaveReading() { // 实现禁用从库读取的逻辑 } } }
🔀 分库分表策略
水平分片实现
/** * 分库分表实现 */ @Component public class ShardingImplementation { /** * 分片策略接口 */ public interface ShardingStrategy { String determineDatabase(Object shardingValue); String determineTable(Object shardingValue); } /** * 哈希分片策略 */ @Component public static class HashShardingStrategy implements ShardingStrategy { private final int databaseCount = 4; // 4个数据库 private final int tableCount = 16; // 每个库16张表 @Override public String determineDatabase(Object shardingValue) { int hash = Math.abs(shardingValue.hashCode()); int dbIndex = hash % databaseCount; return "db_" + dbIndex; } @Override public String determineTable(Object shardingValue) { int hash = Math.abs(shardingValue.hashCode()); int tableIndex = hash % tableCount; return "user_" + tableIndex; } } /** * 范围分片策略 */ @Component public static class RangeShardingStrategy implements ShardingStrategy { @Override public String determineDatabase(Object shardingValue) { if (shardingValue instanceof Long) { long id = (Long) shardingValue; if (id <= 1000000) { return "db_0"; } else if (id <= 2000000) { return "db_1"; } else if (id <= 3000000) { return "db_2"; } else { return "db_3"; } } throw new IllegalArgumentException("Unsupported sharding value type"); } @Override public String determineTable(Object shardingValue) { if (shardingValue instanceof Long) { long id = (Long) shardingValue; int tableIndex = (int) ((id - 1) / 100000) % 16; return "user_" + tableIndex; } throw new IllegalArgumentException("Unsupported sharding value type"); } } /** * 分片路由器 */ @Component public static class ShardingRouter { @Autowired private ShardingStrategy shardingStrategy; /** * 路由单个查询 */ public ShardingTarget routeSingle(Object shardingValue) { String database = shardingStrategy.determineDatabase(shardingValue); String table = shardingStrategy.determineTable(shardingValue); return ShardingTarget.builder() .database(database) .table(table) .build(); } /** * 路由范围查询 */ public List<ShardingTarget> routeRange(Object startValue, Object endValue) { Set<ShardingTarget> targets = new HashSet<>(); // 根据范围计算涉及的分片 if (startValue instanceof Long && endValue instanceof Long) { long start = (Long) startValue; long end = (Long) endValue; for (long id = start; id <= end; id += 100000) { ShardingTarget target = routeSingle(id); targets.add(target); } } return new ArrayList<>(targets); } /** * 路由批量查询 */ public Map<ShardingTarget, List<Object>> routeBatch(List<Object> shardingValues) { Map<ShardingTarget, List<Object>> routingResult = new HashMap<>(); for (Object value : shardingValues) { ShardingTarget target = routeSingle(value); routingResult.computeIfAbsent(target, k -> new ArrayList<>()).add(value); } return routingResult; } } /** * 分片执行器 */ @Component public static class ShardingExecutor { @Autowired private ShardingRouter shardingRouter; @Autowired private Map<String, JdbcTemplate> jdbcTemplateMap; /** * 执行分片查询 */ public <T> List<T> executeQuery(String sql, Object[] params, RowMapper<T> rowMapper, Object shardingValue) { ShardingTarget target = shardingRouter.routeSingle(shardingValue); String actualSql = replacePlaceholders(sql, target); JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(target.getDatabase()); return jdbcTemplate.query(actualSql, params, rowMapper); } /** * 执行分片更新 */ public int executeUpdate(String sql, Object[] params, Object shardingValue) { ShardingTarget target = shardingRouter.routeSingle(shardingValue); String actualSql = replacePlaceholders(sql, target); JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(target.getDatabase()); return jdbcTemplate.update(actualSql, params); } /** * 执行跨分片聚合查询 */ public <T> List<T> executeAggregateQuery(String sql, Object[] params, RowMapper<T> rowMapper, List<Object> shardingValues) { Map<ShardingTarget, List<Object>> routingResult = shardingRouter.routeBatch(shardingValues); List<CompletableFuture<List<T>>> futures = new ArrayList<>(); for (Map.Entry<ShardingTarget, List<Object>> entry : routingResult.entrySet()) { ShardingTarget target = entry.getKey(); CompletableFuture<List<T>> future = CompletableFuture.supplyAsync(() -> { String actualSql = replacePlaceholders(sql, target); JdbcTemplate jdbcTemplate = jdbcTemplateMap.get(target.getDatabase()); return jdbcTemplate.query(actualSql, params, rowMapper); }); futures.add(future); } // 合并结果 List<T> result = new ArrayList<>(); for (CompletableFuture<List<T>> future : futures) { try { result.addAll(future.get()); } catch (Exception e) { log.error("Failed to execute sharding query", e); } } return result; } private String replacePlaceholders(String sql, ShardingTarget target) { return sql.replace("${table}", target.getTable()) .replace("${database}", target.getDatabase()); } } }
分布式事务处理
/** * 分布式事务处理 */ @Component public class DistributedTransactionHandler { /** * 分布式事务管理器 */ @Component public static class DistributedTransactionManager { @Autowired private Map<String, DataSource> dataSourceMap; /** * 两阶段提交实现 */ public void executeTwoPhaseCommit(List<TransactionTask> tasks) { Map<String, Connection> connections = new HashMap<>(); try { // Phase 1: Prepare for (TransactionTask task : tasks) { String dataSource = task.getDataSource(); Connection conn = getConnection(dataSource, connections); // 设置为手动提交 conn.setAutoCommit(false); // 执行SQL try (PreparedStatement stmt = conn.prepareStatement(task.getSql())) { setParameters(stmt, task.getParams()); stmt.executeUpdate(); } } // Phase 2: Commit for (Connection conn : connections.values()) { conn.commit(); } log.info("Distributed transaction committed successfully"); } catch (Exception e) { // Rollback all connections for (Connection conn : connections.values()) { try { conn.rollback(); } catch (SQLException rollbackEx) { log.error("Failed to rollback connection", rollbackEx); } } log.error("Distributed transaction failed, rolled back", e); throw new RuntimeException("Distributed transaction failed", e); } finally { // Close all connections for (Connection conn : connections.values()) { try { conn.close(); } catch (SQLException closeEx) { log.error("Failed to close connection", closeEx); } } } } private Connection getConnection(String dataSourceName, Map<String, Connection> connections) throws SQLException { return connections.computeIfAbsent(dataSourceName, name -> { try { return dataSourceMap.get(name).getConnection(); } catch (SQLException e) { throw new RuntimeException("Failed to get connection", e); } }); } private void setParameters(PreparedStatement stmt, Object[] params) throws SQLException { if (params != null) { for (int i = 0; i < params.length; i++) { stmt.setObject(i + 1, params[i]); } } } } /** * 基于消息的最终一致性 */ @Component public static class EventualConsistencyHandler { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private TransactionOutboxService outboxService; /** * 本地事务 + 消息发送 */ @Transactional public void executeWithEventualConsistency(String sql, Object[] params, String dataSource, Object event) { try { // 1. 执行本地事务 JdbcTemplate jdbcTemplate = getJdbcTemplate(dataSource); jdbcTemplate.update(sql, params); // 2. 保存事件到本地事务表 outboxService.saveEvent(event); // 3. 事务提交后异步发送消息 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronization() { @Override public void afterCommit() { try { rabbitTemplate.convertAndSend("transaction.exchange", "transaction.event", event); outboxService.markEventSent(event); } catch (Exception e) { log.error("Failed to send transaction event", e); } } } ); } catch (Exception e) { log.error("Failed to execute transaction with eventual consistency", e); throw e; } } /** * 补偿任务处理器 */ @RabbitListener(queues = "compensation.queue") public void handleCompensation(CompensationEvent event) { try { String compensationSql = event.getCompensationSql(); Object[] params = event.getParams(); String dataSource = event.getDataSource(); JdbcTemplate jdbcTemplate = getJdbcTemplate(dataSource); jdbcTemplate.update(compensationSql, params); log.info("Compensation executed successfully for event: {}", event.getId()); } catch (Exception e) { log.error("Failed to execute compensation", e); // 重试或人工处理 } } private JdbcTemplate getJdbcTemplate(String dataSource) { // 根据数据源名称获取JdbcTemplate return null; } } }
💡 面试回答要点
标准回答模板
第一部分:数据库架构演进
"数据库架构演进路径: 1. 单体数据库:适合初期,简单直接 2. 主从复制:解决读性能问题,数据冗余 3. 读写分离:读写请求分离,提升并发能力 4. 分库分表:解决数据量和并发瓶颈 5. 分布式数据库:终极方案,复杂度高 每个阶段都有明确的触发条件和适用场景。"
第二部分:分库分表策略
"分库分表策略选择: 1. 垂直分库:按业务模块分离,降低耦合 2. 水平分库:按数据特征分片,提升并发 3. 垂直分表:按字段使用频率分离 4. 水平分表:按数据量分片,解决单表瓶颈 分片键选择:均匀分布、业务相关、扩展友好。"
第三部分:分布式事务处理
"分布式事务解决方案: 1. 两阶段提交:强一致性,性能差 2. TCC模式:补偿机制,业务侵入性强 3. 消息最终一致性:性能好,复杂度适中 4. Saga模式:长事务处理,适合复杂业务 推荐消息最终一致性,平衡了性能和复杂度。"
核心要点总结:
- ✅ 掌握数据库架构演进的各个阶段和触发条件
- ✅ 理解读写分离的实现原理和注意事项
- ✅ 具备分库分表的设计和实现能力
- ✅ 了解分布式事务的处理方案和选择策略
Java面试圣经 文章被收录于专栏
Java面试圣经