site logo

Marico's space

使用 Python 构建基于 Raft 共识协议的分布式键值存储

编程技术 2026-05-30 15:05:43 3

最近折腾了一个分布式键值存储项目,踩了不少坑,这篇把 Raft 共识协议的核心实现说清楚。

像 etcd、CockroachDB、Consul 这些分布式系统,跨多节点保持数据一致性的答案通常是共识算法。目前最流行的就是 Raft —— 设计目标就是"易于理解"。

GitHub 仓库 | 在线演示页面

为什么要做这个?

每个分布式系统的面试都会问到共识:"Kafka 怎么保证顺序?" "网络分区时会发生什么?" "选主是怎么工作的?"

从零实现 Raft 能让你真正搞懂这些问题。而且这个过程几乎覆盖了所有系统编程技能:

  • 并发编程 — 异步事件循环、定时器、状态机
  • 网络通信 — HTTP RPC(远程过程调用)、超时处理、故障恢复
  • 持久化 — 预写日志(WAL)、快照、崩溃恢复
  • 算法 — 选主、日志复制、提交规则

Raft 算法五分钟入门

Raft 把集群节点分成三种角色:

Follower ──[election timeout]──▶ Candidate ──[wins majority]──▶ Leader ▲ │ │ │ loses/timeout │ discovers higher │ └────────────────────────────────┘ term │ └──────────────────────────────────────────────────────────────┘

Leader 选举

  1. 每个节点启动时都是 Follower,带一个随机选举超时(1.5–3 秒)
  2. 如果 follower 在超时前没收到 leader 的消息 → 变成 Candidate
  3. candidate 增加自己的 term(任期),给自己投票,然后发送 RequestVote RPC
  4. 如果获得多数票 → 成为 Leader
  5. leader 定期发送心跳维持权威

随机超时避免了"选举风暴"——所有节点同时发起选举导致票数分散。

日志复制

选主完成后,所有客户端写入都经过 leader:

async def propose(self, command: dict) -> tuple[bool, str]: if self.state != NodeState.LEADER: return False, "Not leader" # 1. Append to local log
 entry = self.log.create_entry(self.current_term, command=command) # 2. Persist to WAL
 if self.wal: self.wal.append_entry(entry) # 3. Wait for majority replication
 future = asyncio.get_event_loop().create_future() self._pending_commands[entry.index] = future await asyncio.wait_for(future, timeout=5.0) return True, f"Committed at index {entry.index}"

leader 通过 AppendEntries RPC 把日志条目复制给 followers。只有当多数节点都收到并确认后,条目才会被提交

安全性保证

Raft 确保:

  • 选举安全性:每个任期最多一个 leader
  • 领导者只追加:leader 永远不会覆盖或删除日志条目
  • 日志匹配:如果两个日志包含相同索引和任期的条目,则该条目之前的所有条目都相同
  • 领导者完整性:已提交的条目必定存在于所有后续 leader 的日志中

架构概览

┌─────────────────────────────────────────────┐
│ Client Request │
│ PUT /kv/name {"value":"Raft"} │
└──────────────────┬──────────────────────────┘ ▼
┌──────────────────────────────────────────────┐
│ LEADER (Node 1) │
│ HTTP Server → Raft Engine → Log (WAL) → KV │
└─────────┬──────────────────────┬─────────────┘ │ AppendEntries │ AppendEntries ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ FOLLOWER │ │ FOLLOWER │ │ (Node 2) │ │ (Node 3) │ └─────────────┘ └─────────────┘

核心模块:

模块 功能
consensus.py Raft 核心引擎 — 选举、复制、提交
log.py 只追加日志,包含条目和冲突解决
wal.py 预写日志持久化和崩溃恢复
store.py KV 状态机 — 应用已提交的条目
server.py HTTP 服务,暴露客户端 API 和 Raft RPC
client.py 异步 Python 客户端,支持自动发现 leader

Raft 日志

Raft 的核心是复制日志。每条日志条目都有任期(election epoch)和索引:

@dataclass
class LogEntry: term: int index: int entry_type: str = EntryType.COMMAND command: dict | None = None

日志需要处理冲突解决——当 follower 收到新 leader 发来的条目,而本地日志已经有冲突时,需要截断:

def append_entries(self, prev_index, prev_term, entries): # Consistency check
 if prev_index > 0: prev_entry = self.get(prev_index) if prev_entry is None or prev_entry.term != prev_term: return False # Reject — log inconsistency
 for entry in entries: existing = self.get(entry.index) if existing and existing.term != entry.term: self._truncate_from(entry.index) # Conflict!
 self.entries.append(entry) elif existing is None: self.entries.append(entry) return True

Leader 选举详解

选举逻辑是整个系统里时间最敏感的环节:

async def _start_election(self): self.current_term += 1 self.state = NodeState.CANDIDATE self.voted_for = self.node_id votes_received = 1 # Self-vote
 votes_needed = self.config.majority vote_tasks = [ self._request_vote(peer, self.current_term) for peer in self.peers ] results = await asyncio.gather(*vote_tasks, return_exceptions=True) for result in results: if result is True: votes_received += 1 if votes_received >= votes_needed: self._become_leader()

节点只有在 candidate 的 term 更新、还没投过票、且 candidate 日志至少和自己一样新时才会投票。

预写日志与崩溃恢复

持久化是保证数据不丢的关键——在确认之前必须先把条目落盘:

class WriteAheadLog: def append_entry(self, entry: LogEntry): with open(self.wal_path, "a") as f: f.write(json.dumps(entry.to_dict()) + "\n") def save_state(self, current_term, voted_for, commit_index): tmp = self.state_path.with_suffix(".tmp") with open(tmp, "w") as f: json.dump(state, f) tmp.rename(self.state_path) # Atomic write

节点启动时会从最后一个快照开始重放 WAL 来恢复状态。

运行集群

Docker Compose

docker-compose up --build
# 3 nodes on ports 8001, 8002, 8003

CLI 使用

# Store values
raftkv put db.host "postgres.internal"
raftkv get db.host
# db.host = postgres.internal # Check cluster
raftkv status --node http://localhost:8001

HTTP API

curl -X PUT http://localhost:8001/kv/name \ -d '{"value": "RaftKV"}' curl http://localhost:8001/kv/name
# {"key": "name", "value": "RaftKV"} curl http://localhost:8001/cluster/status

测试

7 个模块共 89 个测试,覆盖单元测试、RPC、共识和多节点集成:

@pytest.mark.asyncio
async def test_data_replicated_to_followers(three_node_cluster): nodes = three_node_cluster leader = find_leader(nodes) await leader.propose({"op": "put", "key": "test", "value": "hello"}) await asyncio.sleep(1.0) for node in nodes: assert node.store.get("test") == "hello"

关键心得

  1. 时间是关键 — 随机选举超时设计巧妙但调参复杂
  2. 日志一致性检查才是真正的英雄prevLogIndex + prevLogTerm 防止了脑裂导致的数据损坏
  3. 任期号是通用的裁决器 — 更高的任期永远胜出
  4. 把共识层和状态机分离很巧妙 — Raft 日志和 KV 存储是完全独立的
  5. 持久化必须是原子操作 — 先写后重命名用于状态文件,JSONL 追加用于 WAL

完整源码在 GitHub 上,觉得有用的话给个 star!

这是"从零构建"系列的第十个项目。之前做过 SQL 查询引擎、向量搜索引擎、工作流编排引擎。

原文链接:https://dev.to/hajirufai/building-a-distributed-key-value-store-with-raft-consensus-in-python-5c8d