腾讯真题:Redis集群扩容方案
面试重要程度:⭐⭐⭐⭐⭐
真题来源:腾讯2024社招技术面试
考察重点:Redis集群架构、数据迁移、高可用设计
预计阅读时间:45分钟
真题背景
面试官: "我们的Redis集群目前有6个节点(3主3从),存储了约500GB数据,QPS达到10万。由于业务快速增长,需要扩容到12个节点(6主6从)。请详细设计扩容方案,包括数据迁移策略、服务可用性保证、回滚预案等。另外,如果在扩容过程中发现某个节点出现故障,应该如何处理?"
考察意图:
- Redis集群架构的深度理解
- 大规模数据迁移的工程实践能力
- 高可用系统设计思维
- 故障处理和应急响应能力
- 生产环境运维经验
🎯 现状分析与扩容规划
当前集群状态分析
集群拓扑:
# 当前集群状态 redis-cli --cluster info 127.0.0.1:7000 # 节点分布 Master1 (7000): slots 0-5460 (5461 slots) Master2 (7001): slots 5461-10922 (5462 slots) Master3 (7002): slots 10923-16383 (5461 slots) Slave1 (7003): replicates Master1 Slave2 (7004): replicates Master2 Slave3 (7005): replicates Master3
性能指标分析:
/** * 集群性能监控 */ @Component public class ClusterMonitor { @Autowired private RedisClusterConnection clusterConnection; /** * 获取集群性能指标 */ public ClusterMetrics getClusterMetrics() { ClusterMetrics metrics = new ClusterMetrics(); // 获取所有主节点 Iterable<RedisClusterNode> masters = clusterConnection.clusterGetNodes() .stream() .filter(RedisClusterNode::isMaster) .collect(Collectors.toList()); for (RedisClusterNode master : masters) { NodeMetrics nodeMetrics = getNodeMetrics(master); metrics.addNodeMetrics(nodeMetrics); } return metrics; } private NodeMetrics getNodeMetrics(RedisClusterNode node) { Properties info = clusterConnection.info(node); NodeMetrics metrics = new NodeMetrics(); metrics.setNodeId(node.getId()); metrics.setHost(node.getHost()); metrics.setPort(node.getPort()); // 内存使用情况 metrics.setUsedMemory(Long.parseLong(info.getProperty("used_memory", "0"))); metrics.setMaxMemory(Long.parseLong(info.getProperty("maxmemory", "0"))); // QPS统计 metrics.setCommandsProcessed(Long.parseLong(info.getProperty("total_commands_processed", "0"))); metrics.setConnectedClients(Integer.parseInt(info.getProperty("connected_clients", "0"))); // 槽位信息 Set<SlotRange> slotRanges = node.getSlotRange(); metrics.setSlotCount(slotRanges.stream() .mapToInt(range -> range.getEnd() - range.getStart() + 1) .sum()); return metrics; } } @Data public class ClusterMetrics { private List<NodeMetrics> nodeMetrics = new ArrayList<>(); private long totalMemoryUsed; private long totalQPS; private int totalSlots = 16384; public void addNodeMetrics(NodeMetrics nodeMetrics) { this.nodeMetrics.add(nodeMetrics); this.totalMemoryUsed += nodeMetrics.getUsedMemory(); this.totalQPS += nodeMetrics.getQps(); } /** * 分析是否需要扩容 */ public boolean needsExpansion() { // 内存使用率超过70% boolean memoryPressure = nodeMetrics.stream() .anyMatch(node -> node.getMemoryUsageRatio() > 0.7); // 单节点QPS超过3万 boolean qpsPressure = nodeMetrics.stream() .anyMatch(node -> node.getQps() > 30000); // 连接数超过5000 boolean connectionPressure = nodeMetrics.stream() .anyMatch(node -> node.getConnectedClients() > 5000); return memoryPressure || qpsPressure || connectionPressure; } }
扩容目标规划
扩容后集群架构:
/** * 扩容规划 */ @Component public class ExpansionPlanner { /** * 制定扩容计划 */ public ExpansionPlan createExpansionPlan(ClusterMetrics currentMetrics) { ExpansionPlan plan = new ExpansionPlan(); // 目标:6主6从架构 plan.setTargetMasterCount(6); plan.setTargetSlaveCount(6); // 新增节点规划 List<NodeConfig> newNodes = Arrays.asList( new NodeConfig("192.168.1.10", 7006, NodeType.MASTER), new NodeConfig("192.168.1.11", 7007, NodeType.MASTER), new NodeConfig("192.168.1.12", 7008, NodeType.MASTER), new NodeConfig("192.168.1.13", 7009, NodeType.SLAVE), new NodeConfig("192.168.1.14", 7010, NodeType.SLAVE), new NodeConfig("192.168.1.15", 7011, NodeType.SLAVE) ); plan.setNewNodes(newNodes); // 槽位重新分配计划 plan.setSlotReallocation(calculateSlotReallocation()); // 预估迁移时间 plan.setEstimatedMigrationTime(estimateMigrationTime(currentMetrics)); return plan; } private Map<String, SlotRange> calculateSlotReallocation() { Map<String, SlotRange> allocation = new HashMap<>(); // 6个主节点,每个节点约2731个槽位 int slotsPerMaster = 16384 / 6; int remainder = 16384 % 6; for (int i = 0; i < 6; i++) { int startSlot = i * slotsPerMaster; int endSlot = (i + 1) * slotsPerMaster - 1; // 前remainder个节点多分配1个槽位 if (i < remainder) { endSlot++; } allocation.put("master" + i, new SlotRange(startSlot, endSlot)); } return allocation; } private Duration estimateMigrationTime(ClusterMetrics metrics) { // 根据数据量和网络带宽估算 long totalDataSize = metrics.getTotalMemoryUsed(); long networkBandwidth = 1000 * 1024 * 1024; // 1GB/s long migrationBandwidth = networkBandwidth / 4; // 预留75%带宽给业务 long estimatedSeconds = totalDataSize / migrationBandwidth; return Duration.ofSeconds(estimatedSeconds); } }
🚀 扩容实施方案
阶段一:环境准备
新节点部署:
#!/bin/bash # 新节点部署脚本 # 1. 创建新节点配置文件 create_node_config() { local port=$1 local node_dir="/opt/redis/node-${port}" mkdir -p ${node_dir} cat > ${node_dir}/redis.conf << EOF port ${port} cluster-enabled yes cluster-config-file nodes-${port}.conf cluster-node-timeout 5000 appendonly yes appendfilename "appendonly-${port}.aof" dir ${node_dir} logfile ${node_dir}/redis-${port}.log pidfile /var/run/redis_${port}.pid # 内存配置 maxmemory 8gb maxmemory-policy allkeys-lru # 网络配置 tcp-keepalive 300 timeout 0 # 持久化配置 save 900 1 save 300 10 save 60 10000 EOF } # 2. 启动新节点 start_new_nodes() { for port in 7006 7007 7008 7009 7010 7011; do echo "Starting Redis node on port ${port}..." create_node_config ${port} redis-server /opt/redis/node-${port}/redis.conf & sleep 2 done } # 3. 验证节点状态 verify_nodes() { for port in 7006 7007 7008 7009 7010 7011; do if redis-cli -p ${port} ping | grep -q PONG; then echo "Node ${port}: OK" else echo "Node ${port}: FAILED" exit 1 fi done } start_new_nodes verify_nodes
环境检查清单:
/** * 扩容前环境检查 */ @Component public class PreExpansionChecker { /** * 执行扩容前检查 */ public CheckResult performPreExpansionCheck() { CheckResult result = new CheckResult(); // 1. 集群健康检查 result.addCheck("cluster_health", checkClusterHealth()); // 2. 节点资源检查 result.addCheck("node_resources", checkNodeResources()); // 3. 网络连通性检查 result.addCheck("network_connectivity", checkNetworkConnectivity()); // 4. 备份验证 result.addCheck("backup_verification", checkBackupStatus()); // 5. 监控系统检查 result.addCheck("monitoring_system", checkMonitoringSystem()); return result; } private boolean checkClusterHealth() { try { // 检查所有节点状态 Iterable<RedisClusterNode> nodes = clusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.getFlags().contains(RedisClusterNode.Flag.FAIL)) { log.error("Node {} is in FAIL state", node.getId()); return false; } } // 检查槽位分配 Properties clusterInfo = clusterConnection.clusterGetClusterInfo(); String clusterState = clusterInfo.getProperty("cluster_state"); return "ok".equals(clusterState); } catch (Exception e) { log.error("Cluster health check failed", e); return false; } } private boolean checkNodeResources() { // 检查CPU、内存、磁盘空间 return true; // 简化实现 } private boolean checkNetworkConnectivity() { // 检查新旧节点间网络连通性 return true; // 简化实现 } private boolean checkBackupStatus() { // 验证最近的备份是否可用 return true; // 简化实现 } private boolean checkMonitoringSystem() { // 确保监控系统正常工作 return true; // 简化实现 } }
阶段二:节点加入集群
添加新主节点:
/** * 集群扩容执行器 */ @Component public class ClusterExpansionExecutor { @Autowired private RedisClusterConnection clusterConnection; /** * 添加新主节点到集群 */ public void addMasterNodes(List<NodeConfig> masterNodes) { for (NodeConfig nodeConfig : masterNodes) { try { log.info("Adding master node: {}:{}", nodeConfig.getHost(), nodeConfig.getPort()); // 1. 将新节点加入集群 clusterConnection.clusterMeet(nodeConfig.getHost(), nodeConfig.getPort()); // 2. 等待节点握手完成 waitForNodeHandshake(nodeConfig); // 3. 验证节点状态 verifyNodeStatus(nodeConfig); log.info("Master node {}:{} added successfully", nodeConfig.getHost(), nodeConfig.getPort()); } catch (Exception e) { log.error("Failed to add master node {}:{}", nodeConfig.getHost(), nodeConfig.getPort(), e); throw new ExpansionException("Failed to add master node", e); } } } /** * 添加新从节点 */ public void addSlaveNodes(List<NodeConfig> slaveNodes, Map<String, String> masterSlaveMapping) { for (NodeConfig slaveConfig : slaveNodes) { try { log.info("Adding slave node: {}:{}", slaveConfig.getHost(), slaveConfig.getPort()); // 1. 将从节点加入集群 clusterConnection.clusterMeet(slaveConfig.getHost(), slaveConfig.getPort()); // 2. 等待握手完成 waitForNodeHandshake(slaveConfig); // 3. 设置主从关系 String masterId = masterSlaveMapping.get(slaveConfig.getNodeId()); clusterConnection.clusterReplicate(slaveConfig.getNodeId(), masterId); // 4. 验证主从关系 verifyReplicationStatus(slaveConfig, masterId); log.info("Slave node {}:{} added successfully", slaveConfig.getHost(), slaveConfig.getPort()); } catch (Exception e) { log.error("Failed to add slave node {}:{}", slaveConfig.getHost(), slaveConfig.getPort(), e); throw new ExpansionException("Failed to add slave node", e); } } } private void waitForNodeHandshake(NodeConfig nodeConfig) throws InterruptedException { int maxRetries = 30; int retryCount = 0; while (retryCount < maxRetries) { try { RedisClusterNode node = findNodeById(nodeConfig.getNodeId()); if (node != null && !node.getFlags().contains(RedisClusterNode.Flag.HANDSHAKE)) { return; // 握手完成 } } catch (Exception e) { // 忽略异常,继续重试 } Thread.sleep(1000); retryCount++; } throw new ExpansionException("Node handshake timeout: " + nodeConfig.getNodeId()); } }
阶段三:槽位迁移
槽位迁移策略:
/** * 槽位迁移管理器 */ @Component public class SlotMigrationManager { @Autowired private RedisClusterConnection clusterConnection; /** * 执行槽位迁移 */ public void migrateSlots(Map<String, SlotRange> reallocationPlan) { // 按批次迁移,避免对业务造成太大影响 int batchSize = 100; // 每批迁移100个槽位 for (Map.Entry<String, SlotRange> entry : reallocationPlan.entrySet()) { String targetNodeId = entry.getKey(); SlotRange slotRange = entry.getValue(); migrateSlotRange(targetNodeId, slotRange, batchSize); } } private void migrateSlotRange(String targetNodeId, SlotRange slotRange, int batchSize) { int startSlot = slotRange.getStart(); int endSlot = slotRange.getEnd(); for (int slot = startSlot; slot <= endSlot; slot += batchSize) { int batchEndSlot = Math.min(slot + batchSize - 1, endSlot); try { migrateSlotBatch(targetNodeId, slot, batchEndSlot); // 迁移完成后短暂休息,避免影响业务 Thread.sleep(100); } catch (Exception e) { log.error("Failed to migrate slots {}-{} to node {}", slot, batchEndSlot, targetNodeId, e); // 迁移失败时的处理策略 handleMigrationFailure(targetNodeId, slot, batchEndSlot, e); } } } private void migrateSlotBatch(String targetNodeId, int startSlot, int endSlot) { for (int slot = startSlot; slot <= endSlot; slot++) { migrateSlot(slot, targetNodeId); } } /** * 迁移单个槽位 */ private void migrateSlot(int slot, String targetNodeId) { // 1. 找到当前槽位所在的源节点 String sourceNodeId = findSlotOwner(slot); if (sourceNodeId.equals(targetNodeId)) { return; // 槽位已在目标节点 } // 2. 在源节点设置槽位为迁移状态 clusterConnection.clusterSetSlotMigrating(slot, targetNodeId); // 3. 在目标节点设置槽位为导入状态 clusterConnection.clusterSetSlotImporting(slot, sourceNodeId); // 4. 获取槽位中的所有key List<String> keys = clusterConnection.clusterGetKeysInSlot(slot, 1000); // 5. 迁移key for (String key : keys) { clusterConnection.migrate(targetNodeId, 6379, key, 0, 5000); } // 6. 确认槽位迁移完成 clusterConnection.clusterSetSlotNode(slot, targetNodeId); log.info("Slot {} migrated from {} to {}", slot, sourceNodeId, targetNodeId); } /** * 处理迁移失败 */ private void handleMigrationFailure(String targetNodeId, int startSlot, int endSlot, Exception e) { // 记录失败信息 MigrationFailure failure = new MigrationFailure(); failure.setTargetNodeId(targetNodeId); failure.setStartSlot(startSlot); failure.setEndSlot(endSlot); failure.setException(e); failure.setTimestamp(LocalDateTime.now()); // 发送告警 alertService.sendMigrationFailureAlert(failure); // 根据错误类型决定是否重试 if (isRetryableError(e)) { // 等待一段时间后重试 scheduleRetry(targetNodeId, startSlot, endSlot); } else { // 不可重试的错误,需要人工介入 log.error("Non-retryable migration error, manual intervention required", e); } } }
阶段四:数据一致性验证
数据一致性检查:
/** * 数据一致性验证器 */ @Component public class DataConsistencyValidator { /** * 验证集群数据一致性 */ public ConsistencyReport validateDataConsistency() { ConsistencyReport report = new ConsistencyReport(); // 1. 验证槽位分配 report.setSlotAllocationValid(validateSlotAllocation()); // 2. 验证主从数据一致性 report.setReplicationConsistencyValid(validateReplicationConsistency()); // 3. 验证数据完整性 report.setDataIntegrityValid(validateDataIntegrity()); // 4. 验证集群状态 report.setClusterStateValid(validateClusterState()); return report; } private boolean validateSlotAllocation() { try { // 检查所有16384个槽位是否都有归属 Set<Integer> assignedSlots = new HashSet<>(); Iterable<RedisClusterNode> masters = clusterConnection.clusterGetNodes() .stream() .filter(RedisClusterNode::isMaster) .collect(Collectors.toList()); for (RedisClusterNode master : masters) { Set<SlotRange> slotRanges = master.getSlotRange(); for (SlotRange range : slotRanges) { for (int slot = range.getStart(); slot <= range.getEnd(); slot++) { if (assignedSlots.contains(slot)) { log.error("Slot {} is assigned to multiple masters", slot); return false; } assignedSlots.add(slot); } } } // 检查是否所有槽位都已分配 return assignedSlots.size() == 16384; } catch (Exception e) { log.error("Slot allocation validation failed", e); return false; } } private boolean validateReplicationConsistency() { try { Iterable<RedisClusterNode> masters = clusterConnection.clusterGetNodes() .stream() .filter(RedisClusterNode::isMaster) .collect(Collectors.toList()); for (RedisClusterNode master : masters) { // 获取主节点的数据校验和 String masterChecksum = getDataChecksum(master); // 获取所有从节点 List<RedisClusterNode> slaves = getSlaveNodes(master.getId()); for (RedisClusterNode slave : slaves) { String slaveChecksum = getDataChecksum(slave); if (!masterChecksum.equals(slaveChecksum)) { log.error("Data inconsistency between master {} and slave {}", master.getId(), slave.getId()); return false; } } } return true; } catch (Exception e) { log.error("Replication consistency validation failed", e); return false; } } /** * 采样验证数据完整性 */ private boolean validateDataIntegrity() { try { // 随机选择1000个key进行验证 List<String> sampleKeys = getSampleKeys(1000); for (String key : sampleKeys) { // 计算key应该在哪个槽位 int slot = CRC16.crc16(key.getBytes()) % 16384; // 找到槽位所在的节点 String expectedNodeId = findSlotOwner(slot); // 验证key确实在该节点上 RedisClusterNode node = findNodeById(expectedNodeId); Object value = clusterConnection.get(node, key.getBytes()); if (value == null) { log.error("Key {} not found in expected node {}", key, expectedNodeId); return false; } } return true; } catch (Exception e) { log.error("Data integrity validation failed", e); return false; } } }
🛡️ 高可用保障措施
扩容过程监控
实时监控系统:
/** * 扩容过程监控 */ @Component public class ExpansionMonitor { @Autowired private MeterRegistry meterRegistry; private final Timer migrationTimer = Timer.builder("cluster.migration.duration") .description("Slot migration duration") .register(meterRegistry); private final Counter migrationFailures = Counter.builder("cluster.migration.failures") .description("Migration failure count") .register(meterRegistry); /** * 监控迁移进度 */ @EventListener public void onSlotMigrationStart(SlotMigrationStartEvent event) { log.info("Slot migration started: slot={}, source={}, target={}", event.getSlot(), event.getSourceNode(), event.getTargetNode()); // 记录迁移开始时间 event.setStartTime(System.currentTimeMillis()); } @EventListener public void onSlotMigrationComplete(SlotMigrationCompleteEvent event) { long duration = System.currentTimeMillis() - event.getStartTime(); log.info("Slot migration completed: slot={}, duration={}ms", event.getSlot(), duration); // 记录迁移时间指标 migrationTimer.record(duration, TimeUnit.MILLISECONDS); // 更新迁移进度 updateMigrationProgress(event); } @EventListener public void onSlotMigrationFailure(SlotMigrationFailureEvent event) { log.error("Slot migration failed: slot={}, error={}", event.getSlot(), event.getException().getMessage()); // 记录失败指标 migrationFailures.increment(); // 发送告警 sendMigrationFailureAlert(event); } /** * 监控集群健康状态 */ @Scheduled(fixedRate = 10000) // 每10秒检查一次 public void monitorClusterHealth() { try { ClusterHealthStatus status = checkClusterHealth(); // 记录健康状态指标 Gauge.builder("cluster.health.score") .description("Cluster health score") .register(meterRegistry, status, ClusterHealthStatus::getHealthScore); // 如果健康状态异常,发送告警 if (status.getHealthScore() < 0.8) { sendClusterHealthAlert(status); } } catch (Exception e) { log.error("Cluster health monitoring failed", e); } } private void sendMigrationFailureAlert(SlotMigrationFailureEvent event) { AlertMessage alert = AlertMessage.builder() .title("Redis集群槽位迁移失败") .content(String.format("槽位 %d 从节点 %s 迁移到节点 %s 失败:%s", event.getSlot(), event.getSourceNode(), event.getTargetNode(), event.getException().getMessage())) .level(AlertLevel.HIGH) .timestamp(LocalDateTime.now()) .build(); alertService.sendAlert(alert); } }
回滚预案
扩容回滚机制:
/** * 扩容回滚管理器 */ @Component public class ExpansionRollbackManager { /** * 执行扩容回滚 */ public void rollbackExpansion(String expansionId) { log.info("Starting expansion rollback for expansion: {}", expansionId); try { // 1. 获取扩容记录 ExpansionRecord record = getExpansionRecord(expansionId); // 2. 停止正在进行的迁移 stopOngoingMigrations(record); // 3. 回滚槽位分配 rollbackSlotAllocation(record); // 4. 移除新增节点 removeNewNodes(record); // 5. 恢复原始配置 restoreOriginalConfiguration(record); // 6. 验证回滚结果 validateRollbackResult(record); log.info("Expansion rollback completed successfully"); } catch (Exception e) { log.error("Expansion rollback failed", e); // 回滚失败,需要人工介入 sendRollbackFailureAlert(expansionId, e); throw new RollbackException("Rollback failed", e); } } private void rollbackSlotAllocation(ExpansionRecord record) { // 将槽位迁移回原始节点 Map<Integer, String> originalSlotAllocation = record.getOriginalSlotAllocation(); for (Map.Entry<Integer, String> entry : originalSlotAllocation.entrySet()) { int slot = entry.getKey(); String originalOwner = entry.getValue(); String currentOwner = findSlotOwner(slot); if (!originalOwner.equals(currentOwner)) { // 需要将槽位迁移回原始节点 migrateSlotBack(slot, currentOwner, originalOwner); } } } private void removeNewNodes(ExpansionRecord record) { List<String> newNodeIds = record.getNewNodeIds(); for (String nodeId : newNodeIds) { try { // 1. 确保节点上没有槽位 RedisClusterNode node = findNodeById(nodeId); if (!node.getSlotRange().isEmpty()) { log.warn("Node {} still has slots assigned, skipping removal", nodeId); continue; } // 2. 如果是从节点,先停止复制 if (!node.isMaster()) { clusterConnection.clusterReplicate(nodeId, ""); } // 3. 从集群中移除节点 clusterConnection.clusterForget(nodeId); // 4. 停止节点进程 stopRedisNode(node.getHost(), node.getPort()); log.info("Node {} removed successfully", nodeId); } catch (Exception e) { log.error("Failed to remove node {}", nodeId, e); } } } /** * 创建扩容快照 */ public ExpansionSnapshot createExpansionSnapshot() { ExpansionSnapshot snapshot = new ExpansionSnapshot(); // 记录当前集群状态 snapshot.setClusterNodes(getCurrentClusterNodes()); snapshot.setSlotAllocation(getCurrentSlotAllocation()); snapshot.setReplicationTopology(getReplicationTopology()); snapshot.setClusterConfiguration(getClusterConfiguration()); snapshot.setTimestamp(LocalDateTime.now()); return snapshot; } /** * 基于快照恢复集群状态 */ public void restoreFromSnapshot(ExpansionSnapshot snapshot) { log.info("Restoring cluster from snapshot: {}", snapshot.getTimestamp()); try { // 1. 恢复槽位分配 restoreSlotAllocation(snapshot.getSlotAllocation()); // 2. 恢复主从关系 restoreReplicationTopology(snapshot.getReplicationTopology()); // 3. 恢复集群配置 restoreClusterConfiguration(snapshot.getClusterConfiguration()); log.info("Cluster restored from snapshot successfully"); } catch (Exception e) { log.error("Failed to restore cluster from snapshot", e); throw new RestoreException("Snapshot restore failed", e); } } }
⚠️ 故障处理方案
扩容过程中的故障处理
节点故障处理:
/** * 扩容故障处理器 */ @Component public class ExpansionFailureHandler { /** * 处理节点故障 */ @EventListener public void handleNodeFailure(NodeFailureEvent event) { String failedNodeId = event.getNodeId(); NodeType nodeType = event.getNodeType(); log.error("Node failure detected during expansion: nodeId={}, type={}", failedNodeId, nodeType); if (nodeType == NodeType.MASTER) { handleMasterNodeFailure(failedNodeId); } else { handleSlaveNodeFailure(failedNodeId); } } private void handleMasterNodeFailure(String failedMasterId) { try { // 1. 检查是否有从节点可以提升 List<RedisClusterNode> slaves = getSlaveNodes(failedMasterId); if (!slaves.isEmpty()) { // 2. 选择最优从节点提升为主节点 RedisClusterNode bestSlave = selectBestSlave(slaves); promoteSlaveToMaster(bestSlave.getId()); log.info("Promoted slave {} to master to replace failed node {}", bestSlave.getId(), failedMasterId); // 3. 重新分配故障节点的槽位 redistributeFailedNodeSlots(failedMasterId, bestSlave.getId()); } else { // 4. 没有从节点,需要紧急处理 handleMasterFailureWithoutSlave(failedMasterId); } } catch (Exception e) { log.error("Failed to handle master node failure", e); // 发送紧急告警 sendEmergencyAlert("主节点故障处理失败", failedMasterId, e); } } private void handleSlaveNodeFailure(String failedSlaveId) { try { // 1. 找到主节点 String masterId = findMasterOfSlave(failedSlaveId); // 2. 检查主节点是否还有其他从节点 List<RedisClusterNode> otherSlaves = getSlaveNodes(masterId) .stream() .filter(slave -> !slave.getId().equals(failedSlaveId)) .collect(Collectors.toList()); if (otherSlaves.isEmpty()) { // 3. 主节点失去了所有从节点,需要创建新的从节点 createNewSlaveForMaster(masterId); } // 4. 从集群中移除故障节点 clusterConnection.clusterForget(failedSlaveId); log.info("Handled slave node failure: {}", failedSlaveId); } catch (Exception e) { log.error("Failed to handle slave node failure", e); } } /** * 处理迁移过程中的数据丢失 */ public void handleDataLossDuringMigration(int slot, String sourceNode, String targetNode) { log.error("Data loss detected during slot {} migration from {} to {}", slot, sourceNode, targetNode); try { // 1. 暂停当前迁移 pauseSlotMigration(slot); // 2. 从备份恢复数据 restoreSlotDataFromBackup(slot, sourceNode); // 3. 重新开始迁移 retrySlotMigration(slot, sourceNode, targetNode); } catch (Exception e) { log.error("Failed to handle data loss during migration", e); // 数据恢复失败,需要人工介入 sendDataLossAlert(slot, sourceNode, targetNode, e); } } /** * 网络分区处理 */ public void handleNetworkPartition(List<String> isolatedNodes) { log.error("Network partition detected, isolated nodes: {}", isolatedNodes); try { // 1. 评估分区影响 PartitionImpact impact = assessPartitionImpact(isolatedNodes); if (impact.isCritical()) { // 2. 关键分区,暂停扩容 pauseExpansion("Network partition detected"); // 3. 等待网络恢复 waitForNetworkRecovery(isolatedNodes); // 4. 网络恢复后继续扩容 resumeExpansion(); } else { // 5. 非关键分区,继续扩容但跳过隔离节点 continueExpansionWithoutIsolatedNodes(isolatedNodes); } } catch (Exception e) { log.error("Failed to handle network partition", e); } } }
💡 面试回答要点
标准回答框架
第一部分:现状分析
"首先分析当前集群状态:3主3从架构,500GB数据,10万QPS。 通过监控指标分析瓶颈:内存使用率、单节点QPS、连接数等。 确定扩容目标:6主6从架构,提升整体性能和可用性。"
第二部分:扩容方案
"扩容分四个阶段: 1. 环境准备:部署新节点,环境检查,创建快照备份 2. 节点加入:新主节点和从节点加入集群,建立主从关系 3. 槽位迁移:按批次迁移槽位,实时监控迁移进度 4. 数据验证:验证槽位分配、主从一致性、数据完整性 关键是槽位迁移策略,采用小批量迁移,避免影响业务。"
第三部分:高可用保障
"高可用措施包括: 1. 实时监控:迁移进度、集群健康状态、性能指标 2. 回滚预案:快照备份、槽位回滚、节点移除 3. 故障处理:节点故障自动切换、数据丢失恢复、网络分区处理 4. 分阶段执行:可以随时暂停和恢复,降低风险"
第四部分:故障处理
"扩容过程中可能遇到的故障: 1. 节点故障:主节点故障提升从节点,从节点故障创建新从节点 2. 迁移失败:重试机制、数据校验、告警通知 3. 网络分区:评估影响、暂停扩容、等待恢复 4. 数据丢失:从备份恢复、重新迁移 每种故障都有对应的处理流程和应急预案。"
本题核心要点:
- ✅ Redis集群架构和槽位分配机制
- ✅ 大规模数据迁移的工程实践
- ✅ 高可用系统设计和故障处理
- ✅ 生产环境运维经验和风险控制
总结: Redis集群扩容是复杂的分布式系统工程,需要深入理解集群原理,具备完整的方案设计和故障处理能力,体现了高级工程师的技术深度和工程实践经验
#java秋招面试#Java面试圣经 文章被收录于专栏
Java面试圣经