
原文:Outbox Pattern in Postgres, End to End: Producer, Relayer, Consumer|译者前言:本文来自 dev.to,观点有价值,转写发布供读者参考。
你扣了用户卡片,发了 payment.captured webhook。然后数据库事务因为订单表的唯一键冲突而回滚了,用户却收到了一个根本不存在的订单的确认邮件。到早上,客服收到了 14 张工单,财务负责人正在追问为什么 Stripe 和你的数据库在 4 月 18 日 02:14 UTC 的事件记录上不一致。
这就是双写问题。你在两个独立步骤中分别写入了 Postgres 和 Kafka,其中一个成功了,另一个却失败了。没有任何重试策略能解决这个问题,因为 broker 调用和数据库提交不在同一个原子单元中。Outbox 模式就是那个沉闷但有效的解决方案,十年来它一直在默默支撑着生产环境中的事件驱动系统。下面是一个完整的、可运行的 Postgres 实现:Schema、Producer、使用 FOR UPDATE SKIP LOCKED 的 Relayer、幂等 Consumer,以及那些仍然会坑你的失败模式。
一张表。与业务表在同一个数据库中,这样你就可以在同一个事务中向两者写入。
CREATE TABLE events_outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
dispatched_at TIMESTAMPTZ
);
CREATE INDEX events_outbox_pending_idx
ON events_outbox (created_at)
WHERE dispatched_at IS NULL;
aggregate_id 是事件所关联的实体:订单 ID、用户 ID、支付 ID。下游消费者会把它作为分区键,这样同一 aggregate 的事件能保持有序。created_at WHERE dispatched_at IS NULL 上的部分索引是保证 relayer 查询在表增长时保持高效的关键。没有它,对 5000 万行 outbox 表的轮询就是一次顺序扫描,会把其他所有操作都卡死。
注意这里没有什么。没有 topic 列,没有 retry_count,没有 status 枚举。保持这张表精简。Relayer 从 type 推导 topic。重试是 relayer 的问题,不是表的问题。状态枚举会引诱人添加各种状态("processing"、"failed"、"skipped"),把 outbox 变成一个工作流引擎——这就是团队悄悄迷失方向的地方。
这个模式的全部意义在于:业务写和 outbox 写一起提交或一起回滚。用 Python 加 psycopg:
import json
from uuid import UUID
import psycopg
def place_order(
conn: psycopg.Connection,
order_id: UUID,
customer_id: UUID,
total_cents: int,
) -> None:
with conn.transaction():
conn.execute(
"""
INSERT INTO orders (id, customer_id, total_cents, status)
VALUES (%s, %s, %s, 'placed')
""",
(order_id, customer_id, total_cents),
)
_emit_order_placed(
conn, order_id, customer_id, total_cents,
)
Outbox 插入是一个轻量辅助函数,复用同一个连接,所以两次写入走同一个事务:
def _emit_order_placed(
conn: psycopg.Connection,
order_id: UUID,
customer_id: UUID,
total_cents: int,
) -> None:
payload = json.dumps({
"order_id": str(order_id),
"customer_id": str(customer_id),
"total_cents": total_cents,
})
conn.execute(
"""
INSERT INTO events_outbox (aggregate_id, type, payload)
VALUES (%s, %s, %s::jsonb)
""",
(order_id, "order.placed", payload),
)
如果订单插入失败,outbox 插入就不会发生。如果 outbox 插入失败,订单插入就会回滚。没有任何路径能让用户看到系统从未发布过的事件的副作用。这就是全部的保证,只要你确保两次写入共享同一个连接和事务,就能免费获得这个保证。
一些看起来可选但实际不是的东西。始终在写入时序列化 payload,不要在 relay 时做。行应该携带你打算发布的确切字节。始终在 payload 中包含足够的数据,让消费者不需要回调你的服务来补充它;否则事件就变成了一块指向可变状态的墓碑,契约就泄漏了。
一个独立进程(sidecar、定时任务、长运行消费者)轮询 outbox、声明一批消息、发布到 broker、标记行已分发。声明使用 FOR UPDATE SKIP LOCKED,这样多个 relayer 副本可以并行运行而不会互相踩踏。
SELECT id, aggregate_id, type, payload
FROM events_outbox
WHERE dispatched_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED;
FOR UPDATE 在事务期间获取行锁。SKIP LOCKED 告诉 Postgres 如果行已被锁定就不要等待。它直接跳过。用三个 relayer pod 并发运行这个查询,每个都会拿到互不重叠的一批消息,broker 扇出会水平扩展。无需协调服务、无需 lease 表、无需 ZooKeeper。(SKIP LOCKED 行为)
用 Python 包装 Kafka producer:
from confluent_kafka import Producer
producer = Producer({"bootstrap.servers": "kafka:9092"})
CLAIM_SQL = """
SELECT id, aggregate_id, type, payload
FROM events_outbox
WHERE dispatched_at IS NULL
ORDER BY id
LIMIT %s
FOR UPDATE SKIP LOCKED
"""
MARK_SQL = """
UPDATE events_outbox
SET dispatched_at = now()
WHERE id = ANY(%s)
"""
Relay 循环打开一个事务,用 FOR UPDATE SKIP LOCKED 声明一批消息,按 aggregate_id 作为 key 发布到 Kafka,然后在提交前标记行已分发:
def relay_once(
conn: psycopg.Connection, batch: int = 100,
) -> int:
with conn.transaction():
rows = conn.execute(CLAIM_SQL, (batch,)).fetchall()
if not rows:
return 0
ids = []
for event_id, aggregate_id, etype, payload in rows:
producer.produce(
topic=etype.split(".", 1)[0],
key=str(aggregate_id),
value=payload,
headers=[
("event_id", str(event_id).encode()),
],
)
ids.append(event_id)
producer.flush(timeout=5)
conn.execute(MARK_SQL, (ids,))
return len(rows)
仔细看操作顺序。事务在 broker 发布期间保持打开。如果发布失败,UPDATE 不会运行,事务回滚,行锁释放,下一次轮询会再次拾取这些行。如果发布成功但 UPDATE 失败,你已经发布了,行会在下一次轮询中被重新声明并重新发布。At-least-once 投递就是契约。 消费者这一端让它看起来像 exactly-once。
event_id header 是消费者幂等性的关键。每条消息都带上它。这是唯一能在重新发布后保留的身份信息。
一个处理同一事件两次而没有可观察副作用的消费者。模式是在消费者端有一个 processed_events 表,作用域是消费者名称,在 (consumer, event_id) 上有唯一约束。
CREATE TABLE processed_events (
consumer TEXT NOT NULL,
event_id BIGINT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (consumer, event_id)
);
消费者把副作用和去重写入包装在同一个事务中。如果事件已经被处理过,插入会违反主键,工作被跳过:
def handle_order_placed(
conn: psycopg.Connection,
consumer: str,
event_id: int,
payload: dict,
) -> None:
with conn.transaction():
try:
conn.execute(
"""
INSERT INTO processed_events (consumer, event_id)
VALUES (%s, %s)
""",
(consumer, event_id),
)
except psycopg.errors.UniqueViolation:
return # already processed; skip the side-effect
# business side-effect goes inside the same tx
conn.execute(
"""
INSERT INTO loyalty_points (customer_id, points)
VALUES (%s, %s)
ON CONFLICT (customer_id)
DO UPDATE SET
points = loyalty_points.points + EXCLUDED.points
""",
(payload["customer_id"], payload["total_cents"] // 100),
)
两条规则保持契约有效。去重写入和副作用必须共享一个事务。否则它们之间的一次崩溃会让事件看起来已被处理,但工作却缺失了。副作用本身在数据层也应该是幂等的(ON CONFLICT、upserts、条件更新),这样在 Kafka rebalance 期间的部分重放不会破坏状态。
这个模式修复了双写问题。它不修复物理定律。
跨 aggregate 的重排序。 如果两个 relayer 副本同时获取了第 100 和 101 行,而 broker 先接受了 101,下游消费者会先看到 101 再看到 100。只要两行属于不同的 aggregate_id 值,这没问题。Kafka 按 aggregate_id 分区保持每个实体的顺序。跨 aggregate,你没有顺序保证,任何假设全局事件顺序的消费者都是错的。单副本 relayer 加单 Kafka 分区是获得完全顺序的唯一方式,但它会把吞吐量限制在一个流上。
分发延迟。 Relayer 每 N 秒轮询一次。从提交到发布之间有一个时间窗口,行存在于数据库中但没有任何事件到达 Kafka。任何同时读取 DB 和下游投影的系统都会看到事件尚未宣告的状态。记录延迟,设置 SLO,当 now() - max(created_at) WHERE dispatched_at IS NULL 超过阈值时告警。(SeatGeek 的 outbox 总结把可行数字定在 1-5 秒左右。)
卡住的行。 有时候一行永远不会被分发。Producer 写了一个超过 broker 最大消息大小的 payload。Producer 写了一个没有 topic 匹配的 type。Relayer 在 flush 中途崩溃,导致某行的 ID 超过了锁定区域。解决方案是一个每分钟运行的 watchdog 查询:
SELECT id, type, age(now(), created_at) AS waiting
FROM events_outbox
WHERE dispatched_at IS NULL
AND created_at < now() - interval '5 minutes'
ORDER BY id
LIMIT 50;
对等待超过五分钟的行发告警。大多数时候,告警会在面向客户的系统感受到之前就捕获到配置错误的 topic 或 payload 过大情况。
Outbox 表膨胀。 成功分发的行会堆积。一个夜间任务删除 dispatched_at < now() - interval '7 days' 的行,保持表足够小,让部分索引保持在缓存中热着。不要在 relayer 事务内同步删除行。你会和自己 的 vacuum 进程打架,尾延迟会失控。
Outbox 不花哨。它就是一张表、一个轮询循环、消费者端的一个唯一索引,但它一直在默默超越大多数为取代它而构建的分布式事务框架。把 schema 做对,让 relayer 保持简单,让消费者在数据层幂等,把告警放在上述三个失败模式上。其他的都是装饰。
Event-Driven Architecture Pocket Guide 的 Outbox 章节深入探讨了轮询 relayer 和 Debezium 风格日志尾部之间的权衡、何时从单个 outbox 切换到按 aggregate 分流,以及保持五年旧的消费者在重命名字段时不崩溃的模式演进规则。如果你正在生产环境中运行 outbox,而上面的失败模式部分听起来太熟悉了,这本书的其他内容就是为你准备的。
Event-Driven Architecture Pocket Guide