
最近折腾了一个分布式键值存储项目,踩了不少坑,这篇把 Raft 共识协议的核心实现说清楚。
像 etcd、CockroachDB、Consul 这些分布式系统,跨多节点保持数据一致性的答案通常是共识算法。目前最流行的就是 Raft —— 设计目标就是"易于理解"。
GitHub 仓库 | 在线演示页面
每个分布式系统的面试都会问到共识:"Kafka 怎么保证顺序?" "网络分区时会发生什么?" "选主是怎么工作的?"
从零实现 Raft 能让你真正搞懂这些问题。而且这个过程几乎覆盖了所有系统编程技能:
Raft 把集群节点分成三种角色:
Follower ──[election timeout]──▶ Candidate ──[wins majority]──▶ Leader ▲ │ │ │ loses/timeout │ discovers higher │ └────────────────────────────────┘ term │ └──────────────────────────────────────────────────────────────┘
term(任期),给自己投票,然后发送 RequestVote RPC随机超时避免了"选举风暴"——所有节点同时发起选举导致票数分散。
选主完成后,所有客户端写入都经过 leader:
async def propose(self, command: dict) -> tuple[bool, str]: if self.state != NodeState.LEADER: return False, "Not leader" # 1. Append to local log
entry = self.log.create_entry(self.current_term, command=command) # 2. Persist to WAL
if self.wal: self.wal.append_entry(entry) # 3. Wait for majority replication
future = asyncio.get_event_loop().create_future() self._pending_commands[entry.index] = future await asyncio.wait_for(future, timeout=5.0) return True, f"Committed at index {entry.index}"
leader 通过 AppendEntries RPC 把日志条目复制给 followers。只有当多数节点都收到并确认后,条目才会被提交。
Raft 确保:
┌─────────────────────────────────────────────┐
│ Client Request │
│ PUT /kv/name {"value":"Raft"} │
└──────────────────┬──────────────────────────┘ ▼
┌──────────────────────────────────────────────┐
│ LEADER (Node 1) │
│ HTTP Server → Raft Engine → Log (WAL) → KV │
└─────────┬──────────────────────┬─────────────┘ │ AppendEntries │ AppendEntries ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ FOLLOWER │ │ FOLLOWER │ │ (Node 2) │ │ (Node 3) │ └─────────────┘ └─────────────┘
核心模块:
| 模块 | 功能 |
|---|---|
consensus.py |
Raft 核心引擎 — 选举、复制、提交 |
log.py |
只追加日志,包含条目和冲突解决 |
wal.py |
预写日志持久化和崩溃恢复 |
store.py |
KV 状态机 — 应用已提交的条目 |
server.py |
HTTP 服务,暴露客户端 API 和 Raft RPC |
client.py |
异步 Python 客户端,支持自动发现 leader |
Raft 的核心是复制日志。每条日志条目都有任期(election epoch)和索引:
@dataclass
class LogEntry: term: int index: int entry_type: str = EntryType.COMMAND command: dict | None = None
日志需要处理冲突解决——当 follower 收到新 leader 发来的条目,而本地日志已经有冲突时,需要截断:
def append_entries(self, prev_index, prev_term, entries): # Consistency check
if prev_index > 0: prev_entry = self.get(prev_index) if prev_entry is None or prev_entry.term != prev_term: return False # Reject — log inconsistency
for entry in entries: existing = self.get(entry.index) if existing and existing.term != entry.term: self._truncate_from(entry.index) # Conflict!
self.entries.append(entry) elif existing is None: self.entries.append(entry) return True
选举逻辑是整个系统里时间最敏感的环节:
async def _start_election(self): self.current_term += 1 self.state = NodeState.CANDIDATE self.voted_for = self.node_id votes_received = 1 # Self-vote
votes_needed = self.config.majority vote_tasks = [ self._request_vote(peer, self.current_term) for peer in self.peers ] results = await asyncio.gather(*vote_tasks, return_exceptions=True) for result in results: if result is True: votes_received += 1 if votes_received >= votes_needed: self._become_leader()
节点只有在 candidate 的 term 更新、还没投过票、且 candidate 日志至少和自己一样新时才会投票。
持久化是保证数据不丢的关键——在确认之前必须先把条目落盘:
class WriteAheadLog: def append_entry(self, entry: LogEntry): with open(self.wal_path, "a") as f: f.write(json.dumps(entry.to_dict()) + "\n") def save_state(self, current_term, voted_for, commit_index): tmp = self.state_path.with_suffix(".tmp") with open(tmp, "w") as f: json.dump(state, f) tmp.rename(self.state_path) # Atomic write
节点启动时会从最后一个快照开始重放 WAL 来恢复状态。
docker-compose up --build
# 3 nodes on ports 8001, 8002, 8003
# Store values
raftkv put db.host "postgres.internal"
raftkv get db.host
# db.host = postgres.internal # Check cluster
raftkv status --node http://localhost:8001
curl -X PUT http://localhost:8001/kv/name \ -d '{"value": "RaftKV"}' curl http://localhost:8001/kv/name
# {"key": "name", "value": "RaftKV"} curl http://localhost:8001/cluster/status
7 个模块共 89 个测试,覆盖单元测试、RPC、共识和多节点集成:
@pytest.mark.asyncio
async def test_data_replicated_to_followers(three_node_cluster): nodes = three_node_cluster leader = find_leader(nodes) await leader.propose({"op": "put", "key": "test", "value": "hello"}) await asyncio.sleep(1.0) for node in nodes: assert node.store.get("test") == "hello"
prevLogIndex + prevLogTerm 防止了脑裂导致的数据损坏完整源码在 GitHub 上,觉得有用的话给个 star!
这是"从零构建"系列的第十个项目。之前做过 SQL 查询引擎、向量搜索引擎、工作流编排引擎。
原文链接:https://dev.to/hajirufai/building-a-distributed-key-value-store-with-raft-consensus-in-python-5c8d