
最近在项目里折腾 webhook 处理,踩了几个坑才把整个链路跑顺。事件驱动架构听起来高大上,但真正落地的时候,签名验证、重试机制、幂等性处理这些问题一个都躲不掉。这篇把我在 Next.js + Supabase 体系下实现生产级 webhook 处理的完整方案梳理一遍,给需要的朋友省点踩坑时间。
用 Next.js 和 Supabase 构建生产就绪的事件驱动系统,包括安全的 webhook 端点、可靠的事件处理、重试机制,以及面向可扩展应用的分布式系统模式。
现代应用需要响应来自多个来源的事件:支付服务商、第三方 API、用户操作、系统事件等等。简单的轮询效率低而且无法扩展。基于 webhook 的事件驱动架构能提供实时响应能力,但同时引入了安全、可靠性和错误处理方面的复杂性。
Webhook 在事件发生时主动推送数据到你的应用,而轮询则需要你的应用反复检查变更。Webhook 效率更高、实时性更强、减少服务器负载,但需要完善的安全机制和错误处理。
用 HMAC(密钥散列消息认证码)验证 webhook 签名、尽可能验证来源 IP、仅使用 HTTPS、实现限流、验证载荷结构。永远不要在未验证的情况下信任 webhook 数据——把它当作潜在恶意输入来处理。
实现带指数退避的重试机制、使用死信队列处理失败事件、监控 webhook 健康状态、考虑使用消息队列提升可靠性。大多数 webhook 提供商会自动重试失败的投递,所以你的端点需要优雅地处理重复事件。
构建健壮的 webhook 处理系统:
-- Webhook 事件追踪表
CREATE TABLE webhook_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), event_id TEXT UNIQUE NOT NULL, -- 外部事件 ID,用于幂等性 source TEXT NOT NULL, -- 'alipay', 'wechat', 'github', 'supabase' 等 event_type TEXT NOT NULL, payload JSONB NOT NULL, status TEXT NOT NULL DEFAULT 'pending', -- 'pending', 'processing', 'completed', 'failed' attempts INTEGER DEFAULT 0, last_attempt_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, error_message TEXT, created_at TIMESTAMPTZ DEFAULT NOW()
); -- 事件处理任务队列
CREATE TABLE event_jobs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), webhook_event_id UUID REFERENCES webhook_events(id) ON DELETE CASCADE, job_type TEXT NOT NULL, payload JSONB NOT NULL, status TEXT NOT NULL DEFAULT 'queued', -- 'queued', 'processing', 'completed', 'failed' scheduled_for TIMESTAMPTZ DEFAULT NOW(), attempts INTEGER DEFAULT 0, max_attempts INTEGER DEFAULT 3, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW()
); -- 性能索引
CREATE INDEX idx_webhook_events_status ON webhook_events(status);
CREATE INDEX idx_webhook_events_source_type ON webhook_events(source, event_type);
CREATE INDEX idx_event_jobs_status_scheduled ON event_jobs(status, scheduled_for);
实现安全的通用 webhook 处理器:
// app/api/webhooks/[source]/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { createClient } from '@/lib/supabase/server'
import { verifyWebhookSignature } from '@/lib/webhook-security'
import { queueEventJob } from '@/lib/event-queue' interface WebhookConfig { secretKey: string signatureHeader: string verifySignature: (payload: string, signature: string, secret: string) => boolean
} const WEBHOOK_CONFIGS: Record<string, WebhookConfig> = { alipay: { secretKey: process.env.ALIPAY_WEBHOOK_SECRET!, signatureHeader: 'x-alipay-signature', verifySignature: verifyAlipaySignature }, wechat: { secretKey: process.env.WECHAT_WEBHOOK_SECRET!, signatureHeader: 'x-wechat-signature', verifySignature: verifyWechatSignature }, github: { secretKey: process.env.GITHUB_WEBHOOK_SECRET!, signatureHeader: 'x-hub-signature-256', verifySignature: verifyGitHubSignature }, supabase: { secretKey: process.env.SUPABASE_WEBHOOK_SECRET!, signatureHeader: 'x-supabase-signature', verifySignature: verifySupabaseSignature }
} export async function POST( request: NextRequest, { params }: { params: { source: string } }
) { const source = params.source const config = WEBHOOK_CONFIGS[source] if (!config) { return NextResponse.json( { error: 'Unknown webhook source' }, { status: 400 } ) } try { // 获取原始请求体用于签名验证 const body = await request.text() const signature = request.headers.get(config.signatureHeader) if (!signature) { return NextResponse.json( { error: 'Missing signature' }, { status: 400 } ) } // 验证 webhook 签名 if (!config.verifySignature(body, signature, config.secretKey)) { return NextResponse.json( { error: 'Invalid signature' }, { status: 401 } ) } const payload = JSON.parse(body) const eventId = extractEventId(source, payload) const eventType = extractEventType(source, payload) // 存储 webhook 事件并检查幂等性 const supabase = createClient() const { data: existingEvent } = await supabase .from('webhook_events') .select('id, status') .eq('event_id', eventId) .single() if (existingEvent) { // 事件已处理或正在处理 return NextResponse.json({ message: 'Event already processed', status: existingEvent.status }) } // 存储新的 webhook 事件 const { data: webhookEvent, error } = await supabase .from('webhook_events') .insert({ event_id: eventId, source, event_type: eventType, payload, status: 'pending' }) .select() .single() if (error) throw error // 加入处理队列 await queueEventJob(webhookEvent.id, eventType, payload) return NextResponse.json({ message: 'Webhook received', event_id: eventId }) } catch (error) { console.error('Webhook processing error:', error) return NextResponse.json( { error: 'Internal server error' }, { status: 500 } ) }
} // 不同 webhook 来源的辅助函数
function extractEventId(source: string, payload: any): string { switch (source) { case 'alipay': return payload.trade_no case 'wechat': return payload.transaction_id case 'github': return payload.delivery || `${payload.repository?.id}-${Date.now()}` case 'supabase': return payload.record?.id || `${payload.table}-${Date.now()}<