18.9.2 分布式一致性算法(Raft/Paxos)

1. Raft算法原理

1.1 Raft算法基础

public class RaftAlgorithmPrinciple {
    
    /*
     * Raft算法核心概念:
     * 
     * 1. 节点状态
     *    - Leader: 领导者,处理所有客户端请求
     *    - Follower: 跟随者,被动接收日志条目
     *    - Candidate: 候选者,参与领导者选举
     * 
     * 2. 核心机制
     *    - Leader Election: 领导者选举
     *    - Log Replication: 日志复制
     *    - Safety: 安全性保证
     * 
     * 3. 关键概念
     *    - Term: 任期,逻辑时钟
     *    - Log Entry: 日志条目
     *    - Commit Index: 已提交索引
     */
    
    public void demonstrateRaftAlgorithm() {
        System.out.println("=== Raft算法演示 ===");
        
        RaftCluster cluster = new RaftCluster(5);
        
        demonstrateLeaderElection(cluster);
        demonstrateLogReplication(cluster);
        demonstrateLeaderFailover(cluster);
    }
    
    private void demonstrateLeaderElection(RaftCluster cluster) {
        System.out.println("--- 领导者选举演示 ---");
        
        System.out.println("1. 集群启动,所有节点为Follower:");
        cluster.showClusterState();
        
        System.out.println("\n2. 选举超时,节点开始选举:");
        cluster.startElection();
        
        System.out.println("\n3. 选举完成:");
        cluster.showClusterState();
        
        System.out.println("领导者选举完成\n");
    }
    
    private void demonstrateLogReplication(RaftCluster cluster) {
        System.out.println("--- 日志复制演示 ---");
        
        System.out.println("1. 客户端发送请求:");
        cluster.clientRequest("SET key1 value1");
        cluster.clientRequest("SET key2 value2");
        
        System.out.println("\n2. 日志复制过程:");
        cluster.showLogStates();
        
        System.out.println("日志复制完成\n");
    }
    
    private void demonstrateLeaderFailover(RaftCluster cluster) {
        System.out.println("--- 领导者故障转移演示 ---");
        
        System.out.println("1. 当前领导者故障:");
        cluster.simulateLeaderFailure();
        
        System.out.println("\n2. 重新选举:");
        cluster.startElection();
        
        System.out.println("\n3. 新领导者产生:");
        cluster.showClusterState();
        
        System.out.println("故障转移完成\n");
    }
}

// Raft集群
class RaftCluster {
    private java.util.List<RaftNode> nodes;
    private int currentTerm = 0;
    private RaftNode leader = null;
    
    public RaftCluster(int nodeCount) {
        nodes = new java.util.ArrayList<>();
        for (int i = 0; i < nodeCount; i++) {
            nodes.add(new RaftNode("node-" + i, this));
        }
    }
    
    public void showClusterState() {
        System.out.println("  集群状态 (Term: " + currentTerm + "):");
        for (RaftNode node : nodes) {
            String status = node.getState().toString();
            if (node == leader) status += " (LEADER)";
            System.out.println("    " + node.getId() + ": " + status);
        }
    }
    
    public void startElection() {
        // 模拟选举超时,随机节点开始选举
        RaftNode candidate = nodes.get(0);
        if (candidate.getState() == NodeState.FOLLOWER) {
            candidate.startElection();
        }
    }
    
    public void clientRequest(String command) {
        if (leader != null) {
            System.out.println("  客户端请求: " + command);
            leader.handleClientRequest(command);
        } else {
            System.out.println("  无可用领导者,请求失败");
        }
    }
    
    public void showLogStates() {
        System.out.println("  各节点日志状态:");
        for (RaftNode node : nodes) {
            System.out.println("    " + node.getId() + ": " + node.getLogSize() + " entries");
        }
    }
    
    public void simulateLeaderFailure() {
        if (leader != null) {
            System.out.println("  模拟领导者 " + leader.getId() + " 故障");
            leader.setState(NodeState.FAILED);
            leader = null;
        }
    }
    
    // Raft选举逻辑
    public boolean requestVote(RaftNode candidate, int term) {
        if (term > currentTerm) {
            currentTerm = term;
            return true;
        }
        return false;
    }
    
    public void setLeader(RaftNode newLeader, int term) {
        this.leader = newLeader;
        this.currentTerm = term;
        
        // 通知所有节点新的领导者
        for (RaftNode node : nodes) {
            if (node != newLeader && node.getState() != NodeState.FAILED) {
                node.setState(NodeState.FOLLOWER);
                node.setCurrentTerm(term);
            }
        }
    }
    
    public java.util.List<RaftNode> getNodes() { return nodes; }
    public int getCurrentTerm() { return currentTerm; }
}

// 节点状态枚举
enum NodeState {
    FOLLOWER,
    CANDIDATE, 
    LEADER,
    FAILED
}

// Raft节点
class RaftNode {
    private String id;
    private NodeState state = NodeState.FOLLOWER;
    private int currentTerm = 0;
    private String votedFor = null;
    private java.util.List<LogEntry> log = new java.util.ArrayList<>();
    private int commitIndex = 0;
    private RaftCluster cluster;
    
    public RaftNode(String id, RaftCluster cluster) {
        this.id = id;
        this.cluster = cluster;
    }
    
    public void startElection() {
        System.out.println("    " + id + " 开始选举");
        
        state = NodeState.CANDIDATE;
        currentTerm++;
        votedFor = id;
        
        // 请求投票
        int votes = 1; // 自己的票
        for (RaftNode node : cluster.getNodes()) {
            if (node != this && node.getState() != NodeState.FAILED) {
                if (cluster.requestVote(this, currentTerm)) {
                    votes++;
                }
            }
        }
        
        // 检查是否获得多数票
        int majority = cluster.getNodes().size() / 2 + 1;
        if (votes >= majority) {
            becomeLeader();
        } else {
            state = NodeState.FOLLOWER;
        }
    }
    
    private void becomeLeader() {
        System.out.println("    " + id + " 成为领导者 (Term: " + currentTerm + ")");
        state = NodeState.LEADER;
        cluster.setLeader(this, currentTerm);
        
        // 发送心跳
        sendHeartbeat();
    }
    
    private void sendHeartbeat() {
        System.out.println("    " + id + " 发送心跳");
        for (RaftNode node : cluster.getNodes()) {
            if (node != this && node.getState() != NodeState.FAILED) {
                node.receiveHeartbeat(currentTerm);
            }
        }
    }
    
    public void receiveHeartbeat(int term) {
        if (term >= currentTerm) {
            currentTerm = term;
            state = NodeState.FOLLOWER;
            votedFor = null;
        }
    }
    
    public void handleClientRequest(String command) {
        if (state == NodeState.LEADER) {
            // 创建日志条目
            LogEntry entry = new LogEntry(currentTerm, log.size(), command);
            log.add(entry);
            
            System.out.println("    " + id + " 接收请求,创建日志条目: " + command);
            
            // 复制到其他节点
            replicateLog(entry);
        }
    }
    
    private void replicateLog(LogEntry entry) {
        System.out.println("    " + id + " 开始日志复制");
        
        int replicationCount = 1; // 领导者自己
        
        for (RaftNode node : cluster.getNodes()) {
            if (node != this && node.getState() != NodeState.FAILED) {
                if (node.appendEntry(entry)) {
                    replicationCount++;
                }
            }
        }
        
        // 检查是否达到多数
        int majority = cluster.getNodes().size() / 2 + 1;
        if (replicationCount >= majority) {
            commitIndex = entry.getIndex();
            System.out.println("    日志条目已提交: " + entry.getCommand());
        }
    }
    
    public boolean appendEntry(LogEntry entry) {
        log.add(entry);
        System.out.println("      " + id + " 接收日志条目: " + entry.getCommand());
        return true;
    }
    
    // Getters and Setters
    public String getId() { return id; }
    public NodeState getState() { return state; }
    public void setState(NodeState state) { this.state = state; }
    public int getCurrentTerm() { return currentTerm; }
    public void setCurrentTerm(int term) { this.currentTerm = term; }
    public int getLogSize() { return log.size(); }
}

// 日志条目
class LogEntry {
    private int term;
    private int index;
    private String command;
    
    public LogEntry(int term, int index, St

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

Java面试圣经 文章被收录于专栏

Java面试圣经,带你练透java圣经

全部评论

相关推荐

投递掌阅科技等公司10个岗位
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务