腾讯真题:Redis集群扩容方案

面试重要程度:⭐⭐⭐⭐⭐

真题来源:腾讯2024社招技术面试

考察重点:Redis集群架构、数据迁移、高可用设计

预计阅读时间:45分钟

真题背景

面试官: "我们的Redis集群目前有6个节点(3主3从),存储了约500GB数据,QPS达到10万。由于业务快速增长,需要扩容到12个节点(6主6从)。请详细设计扩容方案,包括数据迁移策略、服务可用性保证、回滚预案等。另外,如果在扩容过程中发现某个节点出现故障,应该如何处理?"

考察意图:

  • Redis集群架构的深度理解
  • 大规模数据迁移的工程实践能力
  • 高可用系统设计思维
  • 故障处理和应急响应能力
  • 生产环境运维经验

🎯 现状分析与扩容规划

当前集群状态分析

集群拓扑:

# 当前集群状态
redis-cli --cluster info 127.0.0.1:7000

# 节点分布
Master1 (7000): slots 0-5460     (5461 slots) 
Master2 (7001): slots 5461-10922 (5462 slots)
Master3 (7002): slots 10923-16383 (5461 slots)

Slave1  (7003): replicates Master1
Slave2  (7004): replicates Master2  
Slave3  (7005): replicates Master3

性能指标分析:

/**
 * 集群性能监控
 */
@Component
public class ClusterMonitor {
    
    @Autowired
    private RedisClusterConnection clusterConnection;
    
    /**
     * 获取集群性能指标
     */
    public ClusterMetrics getClusterMetrics() {
        ClusterMetrics metrics = new ClusterMetrics();
        
        // 获取所有主节点
        Iterable<RedisClusterNode> masters = clusterConnection.clusterGetNodes()
            .stream()
            .filter(RedisClusterNode::isMaster)
            .collect(Collectors.toList());
        
        for (RedisClusterNode master : masters) {
            NodeMetrics nodeMetrics = getNodeMetrics(master);
            metrics.addNodeMetrics(nodeMetrics);
        }
        
        return metrics;
    }
    
    private NodeMetrics getNodeMetrics(RedisClusterNode node) {
        Properties info = clusterConnection.info(node);
        
        NodeMetrics metrics = new NodeMetrics();
        metrics.setNodeId(node.getId());
        metrics.setHost(node.getHost());
        metrics.setPort(node.getPort());
        
        // 内存使用情况
        metrics.setUsedMemory(Long.parseLong(info.getProperty("used_memory", "0")));
        metrics.setMaxMemory(Long.parseLong(info.getProperty("maxmemory", "0")));
        
        // QPS统计
        metrics.setCommandsProcessed(Long.parseLong(info.getProperty("total_commands_processed", "0")));
        metrics.setConnectedClients(Integer.parseInt(info.getProperty("connected_clients", "0")));
        
        // 槽位信息
        Set<SlotRange> slotRanges = node.getSlotRange();
        metrics.setSlotCount(slotRanges.stream()
            .mapToInt(range -> range.getEnd() - range.getStart() + 1)
            .sum());
        
        return metrics;
    }
}

@Data
public class ClusterMetrics {
    private List<NodeMetrics> nodeMetrics = new ArrayList<>();
    private long totalMemoryUsed;
    private long totalQPS;
    private int totalSlots = 16384;
    
    public void addNodeMetrics(NodeMetrics nodeMetrics) {
        this.nodeMetrics.add(nodeMetrics);
        this.totalMemoryUsed += nodeMetrics.getUsedMemory();
        this.totalQPS += nodeMetrics.getQps();
    }
    
    /**
     * 分析是否需要扩容
     */
    public boolean needsExpansion() {
        // 内存使用率超过70%
        boolean memoryPressure = nodeMetrics.stream()
            .anyMatch(node -> node.getMemoryUsageRatio() > 0.7);
        
        // 单节点QPS超过3万
        boolean qpsPressure = nodeMetrics.stream()
            .anyMatch(node -> node.getQps() > 30000);
        
        // 连接数超过5000
        boolean connectionPressure = nodeMetrics.stream()
            .anyMatch(node -> node.getConnectedClients() > 5000);
        
        return memoryPressure || qpsPressure || connectionPressure;
    }
}

扩容目标规划

扩容后集群架构:

/**
 * 扩容规划
 */
@Component
public class ExpansionPlanner {
    
    /**
     * 制定扩容计划
     */
    public ExpansionPlan createExpansionPlan(ClusterMetrics currentMetrics) {
        ExpansionPlan plan = new ExpansionPlan();
        
        // 目标:6主6从架构
        plan.setTargetMasterCount(6);
        plan.setTargetSlaveCount(6);
        
        // 新增节点规划
        List<NodeConfig> newNodes = Arrays.asList(
            new NodeConfig("192.168.1.10", 7006, NodeType.MASTER),
            new NodeConfig("192.168.1.11", 7007, NodeType.MASTER),
            new NodeConfig("192.168.1.12", 7008, NodeType.MASTER),
            new NodeConfig("192.168.1.13", 7009, NodeType.SLAVE),
            new NodeConfig("192.168.1.14", 7010, NodeType.SLAVE),
            new NodeConfig("192.168.1.15", 7011, NodeType.SLAVE)
        );
        plan.setNewNodes(newNodes);
        
        // 槽位重新分配计划
        plan.setSlotReallocation(calculateSlotReallocation());
        
        // 预估迁移时间
        plan.setEstimatedMigrationTime(estimateMigrationTime(currentMetrics));
        
        return plan;
    }
    
    private Map<String, SlotRange> calculateSlotReallocation() {
        Map<String, SlotRange> allocation = new HashMap<>();
        
        // 6个主节点,每个节点约2731个槽位
        int slotsPerMaster = 16384 / 6;
        int remainder = 16384 % 6;
        
        for (int i = 0; i < 6; i++) {
            int startSlot = i * slotsPerMaster;
            int endSlot = (i + 1) * slotsPerMaster - 1;
            
            // 前remainder个节点多分配1个槽位
            if (i < remainder) {
                endSlot++;
            }
            
            allocation.put("master" + i, new SlotRange(startSlot, endSlot));
        }
        
        return allocation;
    }
    
    private Duration estimateMigrationTime(ClusterMetrics metrics) {
        // 根据数据量和网络带宽估算
        long totalDataSize = metrics.getTotalMemoryUsed();
        long networkBandwidth = 1000 * 1024 * 1024; // 1GB/s
        long migrationBandwidth = networkBandwidth / 4; // 预留75%带宽给业务
        
        long estimatedSeconds = totalDataSize / migrationBandwidth;
        return Duration.ofSeconds(estimatedSeconds);
    }
}

🚀 扩容实施方案

阶段一:环境准备

新节点部署:

#!/bin/bash
# 新节点部署脚本

# 1. 创建新节点配置文件
create_node_config() {
    local port=$1
    local node_dir="/opt/redis/node-${port}"
    
    mkdir -p ${node_dir}
    
    cat > ${node_dir}/redis.conf << EOF
port ${port}
cluster-enabled yes
cluster-config-file nodes-${port}.conf
cluster-node-timeout 5000
appendonly yes
appendfilename "appendonly-${port}.aof"
dir ${node_dir}
logfile ${node_dir}/redis-${port}.log
pidfile /var/run/redis_${port}.pid

# 内存配置
maxmemory 8gb
maxmemory-policy allkeys-lru

# 网络配置
tcp-keepalive 300
timeout 0

# 持久化配置
save 900 1
save 300 10
save 60 10000
EOF
}

# 2. 启动新节点
start_new_nodes() {
    for port in 7006 7007 7008 7009 7010 7011; do
        echo "Starting Redis node on port ${port}..."
        create_node_config ${port}
        redis-server /opt/redis/node-${port}/redis.conf &
        sleep 2
    done
}

# 3. 验证节点状态
verify_nodes() {
    for port in 7006 7007 7008 7009 7010 7011; do
        if redis-cli -p ${port} ping | grep -q PONG; then
            echo "Node ${port}: OK"
        else
            echo "Node ${port}: FAILED"
            exit 1
        fi
    done
}

start_new_nodes
verify_nodes

环境检查清单:

/**
 * 扩容前环境检查
 */
@Component
public class PreExpansionChecker {
    
    /**
     * 执行扩容前检查
     */
    public CheckResult performPreExpansionCheck() {
        CheckResult result = new CheckResult();
        
        // 1. 集群健康检查
        result.addCheck("cluster_health", checkClusterHealth());
        
        // 2. 节点资源检查
        result.addCheck("node_resources", checkNodeResources());
        
        // 3. 网络连通性检查
        result.addCheck("network_connectivity", checkNetworkConnectivity());
        
        // 4. 备份验证
        result.addCheck("backup_verification", checkBackupStatus());
        
        // 5. 监控系统检查
        result.addCheck("monitoring_system", checkMonitoringSystem());
        
        return result;
    }
    
    private boolean checkClusterHealth() {
        try {
            // 检查所有节点状态
            Iterable<RedisClusterNode> nodes = clusterConnection.clusterGetNodes();
            
            for (RedisClusterNode node : nodes) {
                if (node.getFlags().contains(RedisClusterNode.Flag.FAIL)) {
                    log.error("Node {} is in FAIL state", node.getId());
                    return false;
                }
            }
            
            // 检查槽位分配
            Properties clusterInfo = clusterConnection.clusterGetClusterInfo();
            String clusterState = clusterInfo.getProperty("cluster_state");
            
            return "ok".equals(clusterState);
            
        } catch (Exception e) {
            log.error("Cluster health check failed", e);
            return false;
        }
    }
    
    private boolean checkNodeResources() {
        // 检查CPU、内存、磁盘空间
        return true; // 简化实现
    }
    
    private boolean checkNetworkConnectivity() {
        // 检查新旧节点间网络连通性
        return true; // 简化实现
    }
    
    private boolean checkBackupStatus() {
        // 验证最近的备份是否可用
        return true; // 简化实现
    }
    
    private boolean checkMonitoringSystem() {
        // 确保监控系统正常工作
        return true; // 简化实现
    }
}

阶段二:节点加入集群

添加新主节点:

/**
 * 集群扩容执行器
 */
@Component
public class ClusterExpansionExecutor {
    
    @Autowired
    private RedisClusterConnection clusterConnection;
    
    /**
     * 添加新主节点到集群
     */
    public void addMasterNodes(List<NodeConfig> masterNodes) {
        for (NodeConfig nodeConfig : masterNodes) {
            try {
                log.info("Adding master node: {}:{}", nodeConfig.getHost(), nodeConfig.getPort());
                
                // 1. 将新节点加入集群
                clusterConnection.clusterMeet(nodeConfig.getHost(), nodeConfig.getPort());
                
                // 2. 等待节点握手完成
                waitForNodeHandshake(nodeConfig);
                
                // 3. 验证节点状态
                verifyNodeStatus(nodeConfig);
                
                log.info("Master node {}:{} added successfully", 
                    nodeConfig.getHost(), nodeConfig.getPort());
                
            } catch (Exception e) {
                log.error("Failed to add master node {}:{}", 
                    nodeConfig.getHost(), nodeConfig.getPort(), e);
                throw new ExpansionException("Failed to add master node", e);
            }
        }
    }
    
    /**
     * 添加新从节点
     */
    public void addSlaveNodes(List<NodeConfig> slaveNodes, Map<String, String> masterSlaveMapping) {
        for (NodeConfig slaveConfig : slaveNodes) {
            try {
                log.info("Adding slave node: {}:{}", slaveConfig.getHost(), slaveConfig.getPort());
                
                // 1. 将从节点加入集群
                clusterConnection.clusterMeet(slaveConfig.getHost(), slaveConfig.getPort());
                
                // 2. 等待握手完成
                waitForNodeHandshake(slaveConfig);
                
                // 3. 设置主从关系
                String masterId = masterSlaveMapping.get(slaveConfig.getNodeId());
                clusterConnection.clusterReplicate(slaveConfig.getNodeId(), masterId);
                
                // 4. 验证主从关系
                verifyReplicationStatus(slaveConfig, masterId);
                
                log.info("Slave node {}:{} added successfully", 
                    slaveConfig.getHost(), slaveConfig.getPort());
                
            } catch (Exception e) {
                log.error("Failed to add slave node {}:{}", 
                    slaveConfig.getHost(), slaveConfig.getPort(), e);
                throw new ExpansionException("Failed to add slave node", e);
            }
        }
    }
    
    private void waitForNodeHandshake(NodeConfig nodeConfig) throws InterruptedException {
        int maxRetries = 30;
        int retryCount = 0;
        
        while (retryCount < maxRetries) {
            try {
                RedisClusterNode node = findNodeById(nodeConfig.getNodeId());
                if (node != null && !node.getFlags().contains(RedisClusterNode.Flag.HANDSHAKE)) {
                    return; // 握手完成
                }
            } catch (Exception e) {
                // 忽略异常,继续重试
            }
            
            Thread.sleep(1000);
            retryCount++;
        }
        
        throw new ExpansionException("Node handshake timeout: " + nodeConfig.getNodeId());
    }
}

阶段三:槽位迁移

槽位迁移策略:

/**
 * 槽位迁移管理器
 */
@Component
public class SlotMigrationManager {
    
    @Autowired
    private RedisClusterConnection clusterConnection;
    
    /**
     * 执行槽位迁移
     */
    public void migrateSlots(Map<String, SlotRange> reallocationPlan) {
        // 按批次迁移,避免对业务造成太大影响
        int batchSize = 100; // 每批迁移100个槽位
        
        for (Map.Entry<String, SlotRange> entry : reallocationPlan.entrySet()) {
            String targetNodeId = entry.getKey();
            SlotRange slotRange = entry.getValue();
            
            migrateSlotRange(targetNodeId, slotRange, batchSize);
        }
    }
    
    private void migrateSlotRange(String targetNodeId, SlotRange slotRange, int batchSize) {
        int startSlot = slotRange.getStart();
        int endSlot = slotRange.getEnd();
        
        for (int slot = startSlot; slot <= endSlot; slot += batchSize) {
            int batchEndSlot = Math.min(slot + batchSize - 1, endSlot);
            
            try {
                migrateSlotBatch(targetNodeId, slot, batchEndSlot);
                
                // 迁移完成后短暂休息,避免影响业务
                Thread.sleep(100);
                
            } catch (Exception e) {
                log.error("Failed to migrate slots {}-{} to node {}", 
                    slot, batchEndSlot, targetNodeId, e);
                
                // 迁移失败时的处理策略
                handleMigrationFailure(targetNodeId, slot, batchEndSlot, e);
            }
        }
    }
    
    private void migrateSlotBatch(String targetNodeId, int startSlot, int endSlot) {
        for (int slot = startSlot; slot <= endSlot; slot++) {
            migrateSlot(slot, targetNodeId);
        }
    }
    
    /**
     * 迁移单个槽位
     */
    private void migrateSlot(int slot, String targetNodeId) {
        // 1. 找到当前槽位所在的源节点
        String sourceNodeId = findSlotOwner(slot);
        if (sourceNodeId.equals(targetNodeId)) {
            return; // 槽位已在目标节点
        }
        
        // 2. 在源节点设置槽位为迁移状态
        clusterConnection.clusterSetSlotMigrating(slot, targetNodeId);
        
        // 3. 在目标节点设置槽位为导入状态
        clusterConnection.clusterSetSlotImporting(slot, sourceNodeId);
        
        // 4. 获取槽位中的所有key
        List<String> keys = clusterConnection.clusterGetKeysInSlot(slot, 1000);
        
        // 5. 迁移key
        for (String key : keys) {
            clusterConnection.migrate(targetNodeId, 6379, key, 0, 5000);
        }
        
        // 6. 确认槽位迁移完成
        clusterConnection.clusterSetSlotNode(slot, targetNodeId);
        
        log.info("Slot {} migrated from {} to {}", slot, sourceNodeId, targetNodeId);
    }
    
    /**
     * 处理迁移失败
     */
    private void handleMigrationFailure(String targetNodeId, int startSlot, int endSlot, Exception e) {
        // 记录失败信息
        MigrationFailure failure = new MigrationFailure();
        failure.setTargetNodeId(targetNodeId);
        failure.setStartSlot(startSlot);
        failure.setEndSlot(endSlot);
        failure.setException(e);
        failure.setTimestamp(LocalDateTime.now());
        
        // 发送告警
        alertService.sendMigrationFailureAlert(failure);
        
        // 根据错误类型决定是否重试
        if (isRetryableError(e)) {
            // 等待一段时间后重试
            scheduleRetry(targetNodeId, startSlot, endSlot);
        } else {
            // 不可重试的错误,需要人工介入
            log.error("Non-retryable migration error, manual intervention required", e);
        }
    }
}

阶段四:数据一致性验证

数据一致性检查:

/**
 * 数据一致性验证器
 */
@Component
public class DataConsistencyValidator {
    
    /**
     * 验证集群数据一致性
     */
    public ConsistencyReport validateDataConsistency() {
        ConsistencyReport report = new ConsistencyReport();
        
        // 1. 验证槽位分配
        report.setSlotAllocationValid(validateSlotAllocation());
        
        // 2. 验证主从数据一致性
        report.setReplicationConsistencyValid(validateReplicationConsistency());
        
        // 3. 验证数据完整性
        report.setDataIntegrityValid(validateDataIntegrity());
        
        // 4. 验证集群状态
        report.setClusterStateValid(validateClusterState());
        
        return report;
    }
    
    private boolean validateSlotAllocation() {
        try {
            // 检查所有16384个槽位是否都有归属
            Set<Integer> assignedSlots = new HashSet<>();
            
            Iterable<RedisClusterNode> masters = clusterConnection.clusterGetNodes()
                .stream()
                .filter(RedisClusterNode::isMaster)
                .collect(Collectors.toList());
            
            for (RedisClusterNode master : masters) {
                Set<SlotRange> slotRanges = master.getSlotRange();
                for (SlotRange range : slotRanges) {
                    for (int slot = range.getStart(); slot <= range.getEnd(); slot++) {
                        if (assignedSlots.contains(slot)) {
                            log.error("Slot {} is assigned to multiple masters", slot);
                            return false;
                        }
                        assignedSlots.add(slot);
                    }
                }
            }
            
            // 检查是否所有槽位都已分配
            return assignedSlots.size() == 16384;
            
        } catch (Exception e) {
            log.error("Slot allocation validation failed", e);
            return false;
        }
    }
    
    private boolean validateReplicationConsistency() {
        try {
            Iterable<RedisClusterNode> masters = clusterConnection.clusterGetNodes()
                .stream()
                .filter(RedisClusterNode::isMaster)
                .collect(Collectors.toList());
            
            for (RedisClusterNode master : masters) {
                // 获取主节点的数据校验和
                String masterChecksum = getDataChecksum(master);
                
                // 获取所有从节点
                List<RedisClusterNode> slaves = getSlaveNodes(master.getId());
                
                for (RedisClusterNode slave : slaves) {
                    String slaveChecksum = getDataChecksum(slave);
                    
                    if (!masterChecksum.equals(slaveChecksum)) {
                        log.error("Data inconsistency between master {} and slave {}", 
                            master.getId(), slave.getId());
                        return false;
                    }
                }
            }
            
            return true;
            
        } catch (Exception e) {
            log.error("Replication consistency validation failed", e);
            return false;
        }
    }
    
    /**
     * 采样验证数据完整性
     */
    private boolean validateDataIntegrity() {
        try {
            // 随机选择1000个key进行验证
            List<String> sampleKeys = getSampleKeys(1000);
            
            for (String key : sampleKeys) {
                // 计算key应该在哪个槽位
                int slot = CRC16.crc16(key.getBytes()) % 16384;
                
                // 找到槽位所在的节点
                String expectedNodeId = findSlotOwner(slot);
                
                // 验证key确实在该节点上
                RedisClusterNode node = findNodeById(expectedNodeId);
                Object value = clusterConnection.get(node, key.getBytes());
                
                if (value == null) {
                    log.error("Key {} not found in expected node {}", key, expectedNodeId);
                    return false;
                }
            }
            
            return true;
            
        } catch (Exception e) {
            log.error("Data integrity validation failed", e);
            return false;
        }
    }
}

🛡️ 高可用保障措施

扩容过程监控

实时监控系统:

/**
 * 扩容过程监控
 */
@Component
public class ExpansionMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final Timer migrationTimer = Timer.builder("cluster.migration.duration")
        .description("Slot migration duration")
        .register(meterRegistry);
    
    private final Counter migrationFailures = Counter.builder("cluster.migration.failures")
        .description("Migration failure count")
        .register(meterRegistry);
    
    /**
     * 监控迁移进度
     */
    @EventListener
    public void onSlotMigrationStart(SlotMigrationStartEvent event) {
        log.info("Slot migration started: slot={}, source={}, target={}", 
            event.getSlot(), event.getSourceNode(), event.getTargetNode());
        
        // 记录迁移开始时间
        event.setStartTime(System.currentTimeMillis());
    }
    
    @EventListener
    public void onSlotMigrationComplete(SlotMigrationCompleteEvent event) {
        long duration = System.currentTimeMillis() - event.getStartTime();
        
        log.info("Slot migration completed: slot={}, duration={}ms", 
            event.getSlot(), duration);
        
        // 记录迁移时间指标
        migrationTimer.record(duration, TimeUnit.MILLISECONDS);
        
        // 更新迁移进度
        updateMigrationProgress(event);
    }
    
    @EventListener
    public void onSlotMigrationFailure(SlotMigrationFailureEvent event) {
        log.error("Slot migration failed: slot={}, error={}", 
            event.getSlot(), event.getException().getMessage());
        
        // 记录失败指标
        migrationFailures.increment();
        
        // 发送告警
        sendMigrationFailureAlert(event);
    }
    
    /**
     * 监控集群健康状态
     */
    @Scheduled(fixedRate = 10000) // 每10秒检查一次
    public void monitorClusterHealth() {
        try {
            ClusterHealthStatus status = checkClusterHealth();
            
            // 记录健康状态指标
            Gauge.builder("cluster.health.score")
                .description("Cluster health score")
                .register(meterRegistry, status, ClusterHealthStatus::getHealthScore);
            
            // 如果健康状态异常,发送告警
            if (status.getHealthScore() < 0.8) {
                sendClusterHealthAlert(status);
            }
            
        } catch (Exception e) {
            log.error("Cluster health monitoring failed", e);
        }
    }
    
    private void sendMigrationFailureAlert(SlotMigrationFailureEvent event) {
        AlertMessage alert = AlertMessage.builder()
            .title("Redis集群槽位迁移失败")
            .content(String.format("槽位 %d 从节点 %s 迁移到节点 %s 失败:%s", 
                event.getSlot(), 
                event.getSourceNode(), 
                event.getTargetNode(),
                event.getException().getMessage()))
            .level(AlertLevel.HIGH)
            .timestamp(LocalDateTime.now())
            .build();
        
        alertService.sendAlert(alert);
    }
}

回滚预案

扩容回滚机制:

/**
 * 扩容回滚管理器
 */
@Component
public class ExpansionRollbackManager {
    
    /**
     * 执行扩容回滚
     */
    public void rollbackExpansion(String expansionId) {
        log.info("Starting expansion rollback for expansion: {}", expansionId);
        
        try {
            // 1. 获取扩容记录
            ExpansionRecord record = getExpansionRecord(expansionId);
            
            // 2. 停止正在进行的迁移
            stopOngoingMigrations(record);
            
            // 3. 回滚槽位分配
            rollbackSlotAllocation(record);
            
            // 4. 移除新增节点
            removeNewNodes(record);
            
            // 5. 恢复原始配置
            restoreOriginalConfiguration(record);
            
            // 6. 验证回滚结果
            validateRollbackResult(record);
            
            log.info("Expansion rollback completed successfully");
            
        } catch (Exception e) {
            log.error("Expansion rollback failed", e);
            
            // 回滚失败,需要人工介入
            sendRollbackFailureAlert(expansionId, e);
            throw new RollbackException("Rollback failed", e);
        }
    }
    
    private void rollbackSlotAllocation(ExpansionRecord record) {
        // 将槽位迁移回原始节点
        Map<Integer, String> originalSlotAllocation = record.getOriginalSlotAllocation();
        
        for (Map.Entry<Integer, String> entry : originalSlotAllocation.entrySet()) {
            int slot = entry.getKey();
            String originalOwner = entry.getValue();
            String currentOwner = findSlotOwner(slot);
            
            if (!originalOwner.equals(currentOwner)) {
                // 需要将槽位迁移回原始节点
                migrateSlotBack(slot, currentOwner, originalOwner);
            }
        }
    }
    
    private void removeNewNodes(ExpansionRecord record) {
        List<String> newNodeIds = record.getNewNodeIds();
        
        for (String nodeId : newNodeIds) {
            try {
                // 1. 确保节点上没有槽位
                RedisClusterNode node = findNodeById(nodeId);
                if (!node.getSlotRange().isEmpty()) {
                    log.warn("Node {} still has slots assigned, skipping removal", nodeId);
                    continue;
                }
                
                // 2. 如果是从节点,先停止复制
                if (!node.isMaster()) {
                    clusterConnection.clusterReplicate(nodeId, "");
                }
                
                // 3. 从集群中移除节点
                clusterConnection.clusterForget(nodeId);
                
                // 4. 停止节点进程
                stopRedisNode(node.getHost(), node.getPort());
                
                log.info("Node {} removed successfully", nodeId);
                
            } catch (Exception e) {
                log.error("Failed to remove node {}", nodeId, e);
            }
        }
    }
    
    /**
     * 创建扩容快照
     */
    public ExpansionSnapshot createExpansionSnapshot() {
        ExpansionSnapshot snapshot = new ExpansionSnapshot();
        
        // 记录当前集群状态
        snapshot.setClusterNodes(getCurrentClusterNodes());
        snapshot.setSlotAllocation(getCurrentSlotAllocation());
        snapshot.setReplicationTopology(getReplicationTopology());
        snapshot.setClusterConfiguration(getClusterConfiguration());
        snapshot.setTimestamp(LocalDateTime.now());
        
        return snapshot;
    }
    
    /**
     * 基于快照恢复集群状态
     */
    public void restoreFromSnapshot(ExpansionSnapshot snapshot) {
        log.info("Restoring cluster from snapshot: {}", snapshot.getTimestamp());
        
        try {
            // 1. 恢复槽位分配
            restoreSlotAllocation(snapshot.getSlotAllocation());
            
            // 2. 恢复主从关系
            restoreReplicationTopology(snapshot.getReplicationTopology());
            
            // 3. 恢复集群配置
            restoreClusterConfiguration(snapshot.getClusterConfiguration());
            
            log.info("Cluster restored from snapshot successfully");
            
        } catch (Exception e) {
            log.error("Failed to restore cluster from snapshot", e);
            throw new RestoreException("Snapshot restore failed", e);
        }
    }
}

⚠️ 故障处理方案

扩容过程中的故障处理

节点故障处理:

/**
 * 扩容故障处理器
 */
@Component
public class ExpansionFailureHandler {
    
    /**
     * 处理节点故障
     */
    @EventListener
    public void handleNodeFailure(NodeFailureEvent event) {
        String failedNodeId = event.getNodeId();
        NodeType nodeType = event.getNodeType();
        
        log.error("Node failure detected during expansion: nodeId={}, type={}", 
            failedNodeId, nodeType);
        
        if (nodeType == NodeType.MASTER) {
            handleMasterNodeFailure(failedNodeId);
        } else {
            handleSlaveNodeFailure(failedNodeId);
        }
    }
    
    private void handleMasterNodeFailure(String failedMasterId) {
        try {
            // 1. 检查是否有从节点可以提升
            List<RedisClusterNode> slaves = getSlaveNodes(failedMasterId);
            
            if (!slaves.isEmpty()) {
                // 2. 选择最优从节点提升为主节点
                RedisClusterNode bestSlave = selectBestSlave(slaves);
                promoteSlaveToMaster(bestSlave.getId());
                
                log.info("Promoted slave {} to master to replace failed node {}", 
                    bestSlave.getId(), failedMasterId);
                
                // 3. 重新分配故障节点的槽位
                redistributeFailedNodeSlots(failedMasterId, bestSlave.getId());
                
            } else {
                // 4. 没有从节点,需要紧急处理
                handleMasterFailureWithoutSlave(failedMasterId);
            }
            
        } catch (Exception e) {
            log.error("Failed to handle master node failure", e);
            
            // 发送紧急告警
            sendEmergencyAlert("主节点故障处理失败", failedMasterId, e);
        }
    }
    
    private void handleSlaveNodeFailure(String failedSlaveId) {
        try {
            // 1. 找到主节点
            String masterId = findMasterOfSlave(failedSlaveId);
            
            // 2. 检查主节点是否还有其他从节点
            List<RedisClusterNode> otherSlaves = getSlaveNodes(masterId)
                .stream()
                .filter(slave -> !slave.getId().equals(failedSlaveId))
                .collect(Collectors.toList());
            
            if (otherSlaves.isEmpty()) {
                // 3. 主节点失去了所有从节点,需要创建新的从节点
                createNewSlaveForMaster(masterId);
            }
            
            // 4. 从集群中移除故障节点
            clusterConnection.clusterForget(failedSlaveId);
            
            log.info("Handled slave node failure: {}", failedSlaveId);
            
        } catch (Exception e) {
            log.error("Failed to handle slave node failure", e);
        }
    }
    
    /**
     * 处理迁移过程中的数据丢失
     */
    public void handleDataLossDuringMigration(int slot, String sourceNode, String targetNode) {
        log.error("Data loss detected during slot {} migration from {} to {}", 
            slot, sourceNode, targetNode);
        
        try {
            // 1. 暂停当前迁移
            pauseSlotMigration(slot);
            
            // 2. 从备份恢复数据
            restoreSlotDataFromBackup(slot, sourceNode);
            
            // 3. 重新开始迁移
            retrySlotMigration(slot, sourceNode, targetNode);
            
        } catch (Exception e) {
            log.error("Failed to handle data loss during migration", e);
            
            // 数据恢复失败,需要人工介入
            sendDataLossAlert(slot, sourceNode, targetNode, e);
        }
    }
    
    /**
     * 网络分区处理
     */
    public void handleNetworkPartition(List<String> isolatedNodes) {
        log.error("Network partition detected, isolated nodes: {}", isolatedNodes);
        
        try {
            // 1. 评估分区影响
            PartitionImpact impact = assessPartitionImpact(isolatedNodes);
            
            if (impact.isCritical()) {
                // 2. 关键分区,暂停扩容
                pauseExpansion("Network partition detected");
                
                // 3. 等待网络恢复
                waitForNetworkRecovery(isolatedNodes);
                
                // 4. 网络恢复后继续扩容
                resumeExpansion();
                
            } else {
                // 5. 非关键分区,继续扩容但跳过隔离节点
                continueExpansionWithoutIsolatedNodes(isolatedNodes);
            }
            
        } catch (Exception e) {
            log.error("Failed to handle network partition", e);
        }
    }
}

💡 面试回答要点

标准回答框架

第一部分:现状分析

"首先分析当前集群状态:3主3从架构,500GB数据,10万QPS。
通过监控指标分析瓶颈:内存使用率、单节点QPS、连接数等。
确定扩容目标:6主6从架构,提升整体性能和可用性。"

第二部分:扩容方案

"扩容分四个阶段:
1. 环境准备:部署新节点,环境检查,创建快照备份
2. 节点加入:新主节点和从节点加入集群,建立主从关系
3. 槽位迁移:按批次迁移槽位,实时监控迁移进度
4. 数据验证:验证槽位分配、主从一致性、数据完整性

关键是槽位迁移策略,采用小批量迁移,避免影响业务。"

第三部分:高可用保障

"高可用措施包括:
1. 实时监控:迁移进度、集群健康状态、性能指标
2. 回滚预案:快照备份、槽位回滚、节点移除
3. 故障处理:节点故障自动切换、数据丢失恢复、网络分区处理
4. 分阶段执行:可以随时暂停和恢复,降低风险"

第四部分:故障处理

"扩容过程中可能遇到的故障:
1. 节点故障:主节点故障提升从节点,从节点故障创建新从节点
2. 迁移失败:重试机制、数据校验、告警通知
3. 网络分区:评估影响、暂停扩容、等待恢复
4. 数据丢失:从备份恢复、重新迁移

每种故障都有对应的处理流程和应急预案。"

本题核心要点:

  • ✅ Redis集群架构和槽位分配机制
  • ✅ 大规模数据迁移的工程实践
  • ✅ 高可用系统设计和故障处理
  • ✅ 生产环境运维经验和风险控制

总结: Redis集群扩容是复杂的分布式系统工程,需要深入理解集群原理,具备完整的方案设计和故障处理能力,体现了高级工程师的技术深度和工程实践经验

#java秋招面试#
Java面试圣经 文章被收录于专栏

Java面试圣经

全部评论

相关推荐

评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务