site logo

Marico's space

PostgreSQL Outbox 模式端到端实践:生产者、中继器、消费者

服务器技术 2026-04-28 11:55:08 5

原文:Outbox Pattern in Postgres, End to End: Producer, Relayer, Consumer|译者前言:本文来自 dev.to,观点有价值,转写发布供读者参考。

  • 书籍: Event-Driven Architecture Pocket Guide: Saga, CQRS, Outbox, and the Traps Nobody Warns You About
  • 我的项目: Hermes IDE | GitHub — 为使用 Claude Code 等 AI 编程工具的开发者打造的 IDE
  • 作者: xgabriel.com | GitHub

你扣了用户卡片,发了 payment.captured webhook。然后数据库事务因为订单表的唯一键冲突而回滚了,用户却收到了一个根本不存在的订单的确认邮件。到早上,客服收到了 14 张工单,财务负责人正在追问为什么 Stripe 和你的数据库在 4 月 18 日 02:14 UTC 的事件记录上不一致。

这就是双写问题。你在两个独立步骤中分别写入了 Postgres 和 Kafka,其中一个成功了,另一个却失败了。没有任何重试策略能解决这个问题,因为 broker 调用和数据库提交不在同一个原子单元中。Outbox 模式就是那个沉闷但有效的解决方案,十年来它一直在默默支撑着生产环境中的事件驱动系统。下面是一个完整的、可运行的 Postgres 实现:Schema、Producer、使用 FOR UPDATE SKIP LOCKED 的 Relayer、幂等 Consumer,以及那些仍然会坑你的失败模式。

The schema

一张表。与业务表在同一个数据库中,这样你就可以在同一个事务中向两者写入。

CREATE TABLE events_outbox (
    id            BIGSERIAL PRIMARY KEY,
    aggregate_id  UUID        NOT NULL,
    type          TEXT        NOT NULL,
    payload       JSONB       NOT NULL,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    dispatched_at TIMESTAMPTZ
);

CREATE INDEX events_outbox_pending_idx ON events_outbox (created_at) WHERE dispatched_at IS NULL;

aggregate_id 是事件所关联的实体:订单 ID、用户 ID、支付 ID。下游消费者会把它作为分区键,这样同一 aggregate 的事件能保持有序。created_at WHERE dispatched_at IS NULL 上的部分索引是保证 relayer 查询在表增长时保持高效的关键。没有它,对 5000 万行 outbox 表的轮询就是一次顺序扫描,会把其他所有操作都卡死。

注意这里没有什么。没有 topic 列,没有 retry_count,没有 status 枚举。保持这张表精简。Relayer 从 type 推导 topic。重试是 relayer 的问题,不是表的问题。状态枚举会引诱人添加各种状态("processing"、"failed"、"skipped"),把 outbox 变成一个工作流引擎——这就是团队悄悄迷失方向的地方。

The producer: one transaction, two writes

这个模式的全部意义在于:业务写和 outbox 写一起提交或一起回滚。用 Python 加 psycopg

import json
from uuid import UUID
import psycopg

def place_order( conn: psycopg.Connection, order_id: UUID, customer_id: UUID, total_cents: int, ) -> None: with conn.transaction(): conn.execute( """ INSERT INTO orders (id, customer_id, total_cents, status) VALUES (%s, %s, %s, 'placed') """, (order_id, customer_id, total_cents), ) _emit_order_placed( conn, order_id, customer_id, total_cents, )

Outbox 插入是一个轻量辅助函数,复用同一个连接,所以两次写入走同一个事务:

def _emit_order_placed(
    conn: psycopg.Connection,
    order_id: UUID,
    customer_id: UUID,
    total_cents: int,
) -> None:
    payload = json.dumps({
        "order_id": str(order_id),
        "customer_id": str(customer_id),
        "total_cents": total_cents,
    })
    conn.execute(
        """
        INSERT INTO events_outbox (aggregate_id, type, payload)
        VALUES (%s, %s, %s::jsonb)
        """,
        (order_id, "order.placed", payload),
    )

如果订单插入失败,outbox 插入就不会发生。如果 outbox 插入失败,订单插入就会回滚。没有任何路径能让用户看到系统从未发布过的事件的副作用。这就是全部的保证,只要你确保两次写入共享同一个连接和事务,就能免费获得这个保证。

一些看起来可选但实际不是的东西。始终在写入时序列化 payload,不要在 relay 时做。行应该携带你打算发布的确切字节。始终在 payload 中包含足够的数据,让消费者不需要回调你的服务来补充它;否则事件就变成了一块指向可变状态的墓碑,契约就泄漏了。

The relayer: poll, claim, publish, mark

一个独立进程(sidecar、定时任务、长运行消费者)轮询 outbox、声明一批消息、发布到 broker、标记行已分发。声明使用 FOR UPDATE SKIP LOCKED,这样多个 relayer 副本可以并行运行而不会互相踩踏。

SELECT id, aggregate_id, type, payload
FROM events_outbox
WHERE dispatched_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED;

FOR UPDATE 在事务期间获取行锁。SKIP LOCKED 告诉 Postgres 如果行已被锁定就不要等待。它直接跳过。用三个 relayer pod 并发运行这个查询,每个都会拿到互不重叠的一批消息,broker 扇出会水平扩展。无需协调服务、无需 lease 表、无需 ZooKeeper。(SKIP LOCKED 行为)

用 Python 包装 Kafka producer:

from confluent_kafka import Producer

producer = Producer({"bootstrap.servers": "kafka:9092"})

CLAIM_SQL = """ SELECT id, aggregate_id, type, payload FROM events_outbox WHERE dispatched_at IS NULL ORDER BY id LIMIT %s FOR UPDATE SKIP LOCKED """

MARK_SQL = """ UPDATE events_outbox SET dispatched_at = now() WHERE id = ANY(%s) """

Relay 循环打开一个事务,用 FOR UPDATE SKIP LOCKED 声明一批消息,按 aggregate_id 作为 key 发布到 Kafka,然后在提交前标记行已分发:

def relay_once(
    conn: psycopg.Connection, batch: int = 100,
) -> int:
    with conn.transaction():
        rows = conn.execute(CLAIM_SQL, (batch,)).fetchall()
        if not rows:
            return 0

ids = [] for event_id, aggregate_id, etype, payload in rows: producer.produce( topic=etype.split(".", 1)[0], key=str(aggregate_id), value=payload, headers=[ ("event_id", str(event_id).encode()), ], ) ids.append(event_id)

producer.flush(timeout=5) conn.execute(MARK_SQL, (ids,)) return len(rows)

仔细看操作顺序。事务在 broker 发布期间保持打开。如果发布失败,UPDATE 不会运行,事务回滚,行锁释放,下一次轮询会再次拾取这些行。如果发布成功但 UPDATE 失败,你已经发布了,行会在下一次轮询中被重新声明并重新发布。At-least-once 投递就是契约。 消费者这一端让它看起来像 exactly-once。

event_id header 是消费者幂等性的关键。每条消息都带上它。这是唯一能在重新发布后保留的身份信息。

The idempotent consumer

一个处理同一事件两次而没有可观察副作用的消费者。模式是在消费者端有一个 processed_events 表,作用域是消费者名称,在 (consumer, event_id) 上有唯一约束。

CREATE TABLE processed_events (
    consumer     TEXT        NOT NULL,
    event_id     BIGINT      NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (consumer, event_id)
);

消费者把副作用和去重写入包装在同一个事务中。如果事件已经被处理过,插入会违反主键,工作被跳过:

def handle_order_placed(
    conn: psycopg.Connection,
    consumer: str,
    event_id: int,
    payload: dict,
) -> None:
    with conn.transaction():
        try:
            conn.execute(
                """
                INSERT INTO processed_events (consumer, event_id)
                VALUES (%s, %s)
                """,
                (consumer, event_id),
            )
        except psycopg.errors.UniqueViolation:
            return  # already processed; skip the side-effect
        # business side-effect goes inside the same tx
        conn.execute(
            """
            INSERT INTO loyalty_points (customer_id, points)
            VALUES (%s, %s)
            ON CONFLICT (customer_id)
            DO UPDATE SET
              points = loyalty_points.points + EXCLUDED.points
            """,
            (payload["customer_id"], payload["total_cents"] // 100),
        )

两条规则保持契约有效。去重写入和副作用必须共享一个事务。否则它们之间的一次崩溃会让事件看起来已被处理,但工作却缺失了。副作用本身在数据层也应该是幂等的(ON CONFLICT、upserts、条件更新),这样在 Kafka rebalance 期间的部分重放不会破坏状态。

Failure modes that still bite you

这个模式修复了双写问题。它不修复物理定律。

跨 aggregate 的重排序。 如果两个 relayer 副本同时获取了第 100 和 101 行,而 broker 先接受了 101,下游消费者会先看到 101 再看到 100。只要两行属于不同的 aggregate_id 值,这没问题。Kafka 按 aggregate_id 分区保持每个实体的顺序。跨 aggregate,你没有顺序保证,任何假设全局事件顺序的消费者都是错的。单副本 relayer 加单 Kafka 分区是获得完全顺序的唯一方式,但它会把吞吐量限制在一个流上。

分发延迟。 Relayer 每 N 秒轮询一次。从提交到发布之间有一个时间窗口,行存在于数据库中但没有任何事件到达 Kafka。任何同时读取 DB 和下游投影的系统都会看到事件尚未宣告的状态。记录延迟,设置 SLO,当 now() - max(created_at) WHERE dispatched_at IS NULL 超过阈值时告警。(SeatGeek 的 outbox 总结把可行数字定在 1-5 秒左右。)

卡住的行。 有时候一行永远不会被分发。Producer 写了一个超过 broker 最大消息大小的 payload。Producer 写了一个没有 topic 匹配的 type。Relayer 在 flush 中途崩溃,导致某行的 ID 超过了锁定区域。解决方案是一个每分钟运行的 watchdog 查询:

SELECT id, type, age(now(), created_at) AS waiting
FROM events_outbox
WHERE dispatched_at IS NULL
  AND created_at < now() - interval '5 minutes'
ORDER BY id
LIMIT 50;

对等待超过五分钟的行发告警。大多数时候,告警会在面向客户的系统感受到之前就捕获到配置错误的 topic 或 payload 过大情况。

Outbox 表膨胀。 成功分发的行会堆积。一个夜间任务删除 dispatched_at < now() - interval '7 days' 的行,保持表足够小,让部分索引保持在缓存中热着。不要在 relayer 事务内同步删除行。你会和自己 的 vacuum 进程打架,尾延迟会失控。

Outbox 不花哨。它就是一张表、一个轮询循环、消费者端的一个唯一索引,但它一直在默默超越大多数为取代它而构建的分布式事务框架。把 schema 做对,让 relayer 保持简单,让消费者在数据层幂等,把告警放在上述三个失败模式上。其他的都是装饰。


If this was useful

Event-Driven Architecture Pocket Guide 的 Outbox 章节深入探讨了轮询 relayer 和 Debezium 风格日志尾部之间的权衡、何时从单个 outbox 切换到按 aggregate 分流,以及保持五年旧的消费者在重命名字段时不崩溃的模式演进规则。如果你正在生产环境中运行 outbox,而上面的失败模式部分听起来太熟悉了,这本书的其他内容就是为你准备的。

Event-Driven Architecture Pocket Guide