8 分钟阅读

用 PostgreSQL 实现可靠消息队列:SKIP LOCKED 与 at-least-once 投递语义

在 HookRelay 的设计中,我选择用 PostgreSQL 而不是专门的消息队列系统来实现任务队列。这个选择在技术社区里颇有争议,有人认为这是"错误的工具做错误的事"。这篇文章深入讲这个实现背后的原理,以及它真正的边界在哪里。

一个简单问题:如何让多个 Worker 并发消费,且不重复?

假设你有一张任务表,多个 Worker 进程同时从里面取任务处理。最朴素的方案:

-- Worker 1
SELECT * FROM jobs WHERE status = 'pending' LIMIT 1;
-- Worker 2 同时执行同样的 SQL
 
-- 结果:两个 Worker 拿到同一条记录,重复处理

这是经典的竞态条件

方案一:悲观锁 FOR UPDATE

BEGIN;
SELECT * FROM jobs WHERE status = 'pending' LIMIT 1 FOR UPDATE;
-- 处理...
UPDATE jobs SET status = 'done' WHERE id = $1;
COMMIT;

FOR UPDATE 会锁住查询到的行,其他事务对这行的 SELECT ... FOR UPDATE阻塞等待

问题:Worker 1 锁住了 job_1,Worker 2 来了,发现 job_1 被锁,等待。等 Worker 1 提交后,Worker 2 才能继续,发现 job_1 已经是 done 了,再去找下一条。串行化了,失去了并发的意义。

方案二:乐观锁(CAS 更新)

-- 用 UPDATE 的原子性代替 SELECT 的锁
UPDATE jobs 
SET status = 'processing', worker_id = $worker_id
WHERE id = (
    SELECT id FROM jobs 
    WHERE status = 'pending' 
    ORDER BY id LIMIT 1
)
AND status = 'pending'  -- 条件检查,防止并发冲突
RETURNING *;

如果两个 Worker 同时运行这条 SQL,只有一个会成功更新(因为 UPDATE 是原子的),另一个 RETURNING 为空,需要重试。这比方案一好,但高并发时重试率很高,有大量无效操作。

方案三:SKIP LOCKED(正确答案)

BEGIN;
SELECT * FROM jobs 
WHERE status = 'pending' 
ORDER BY created_at
LIMIT 10
FOR UPDATE SKIP LOCKED;  -- 跳过已被锁定的行,而不是等待
COMMIT;

SKIP LOCKED 是 PostgreSQL 9.5 引入的特性。它的语义是:如果某行已经被其他事务锁定,直接跳过它,选择下一条未锁定的行

这样,Worker 1 和 Worker 2 可以同时运行,各自拿到不同的任务,互不干扰,完全并行。

SKIP LOCKED 的底层实现原理

理解这个特性,需要先了解 PostgreSQL 的 MVCC(多版本并发控制)和行锁机制。

PostgreSQL 中,每行数据在物理上有一个行级锁(tuple lock)。FOR UPDATE 会在 row 的 header 中设置 xmax 字段(标记当前持有锁的事务 ID),其他事务读到这行时检查 xmax 是否是活跃事务:

  • 普通 SELECT:看不到锁,直接读(MVCC 快照隔离)
  • FOR UPDATE:检测到锁,根据参数决定等待(默认)或跳过(SKIP LOCKED)
  • NOWAIT:检测到锁,立即报错
行锁在内存中的表示(简化):
pg_locks 表中:
  locktype: tuple
  relation: jobs 表的 OID
  tuple: 行的物理位置 (page, offset)
  transactionid: 持锁事务 ID
  mode: RowExclusiveLock
  granted: true

当扫描到一行时,PostgreSQL 检查该行是否有未完成事务的锁:

  • 没有锁:正常加锁并返回
  • 有锁 + 普通 FOR UPDATE:进入锁等待队列(LockAcquire
  • 有锁 + SKIP LOCKED:跳过该行,继续扫描下一行

关键的实现细节:SKIP LOCKED 不会进入等待队列,因此不会触发死锁检测。这也是它比普通 FOR UPDATE 更高效的原因之一——没有锁等待的上下文切换开销。

完整的队列实现

type PostgresQueue struct {
    db          *sql.DB
    workerCount int
}
 
type Job struct {
    ID         int64
    Payload    []byte
    RetryCount int
    Status     string
    NextRetry  time.Time
}
 
func (q *PostgresQueue) Dequeue(ctx context.Context, batchSize int) ([]*Job, error) {
    tx, err := q.db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelReadCommitted, // 不需要 Repeatable Read
    })
    if err != nil {
        return nil, err
    }
    defer tx.Rollback() // 事务提交后这个是 no-op,失败时会回滚释放锁
 
    rows, err := tx.QueryContext(ctx, `
        SELECT id, payload, retry_count
        FROM jobs
        WHERE status = 'pending'
          AND next_retry_at <= NOW()
        ORDER BY priority DESC, created_at ASC
        LIMIT $1
        FOR UPDATE SKIP LOCKED
    `, batchSize)
    if err != nil {
        return nil, err
    }
 
    var jobs []*Job
    var ids []int64
    for rows.Next() {
        j := &Job{}
        if err := rows.Scan(&j.ID, &j.Payload, &j.RetryCount); err != nil {
            return nil, err
        }
        jobs = append(jobs, j)
        ids = append(ids, j.ID)
    }
    rows.Close()
 
    if len(jobs) == 0 {
        tx.Rollback()
        return nil, nil
    }
 
    // 标记为 processing(在同一事务内,保证原子性)
    _, err = tx.ExecContext(ctx, `
        UPDATE jobs 
        SET status = 'processing', started_at = NOW()
        WHERE id = ANY($1)
    `, pq.Array(ids))
    if err != nil {
        return nil, err
    }
 
    // 提交事务,释放行锁
    // 注意:此时其他 Worker 才能看到这些行被标记为 processing
    if err := tx.Commit(); err != nil {
        return nil, err
    }
 
    return jobs, nil
}

这里有个重要的设计决策:在同一个事务内完成 SELECT 和 UPDATE

如果分成两个步骤(先 SELECT 拿 ID,再 UPDATE),在两个操作之间有个窗口期,另一个 Worker 可能也拿到了同样的 ID。虽然 UPDATE 是原子的,但这引入了不必要的复杂性。同一事务内,SELECT 拿到了行锁,UPDATE 只是修改同样的行,没有额外的锁竞争。

At-Least-Once 语义与幂等性

PostgreSQL 队列保证的是 at-least-once(至少一次)而不是 exactly-once(精确一次)。

场景:Worker 取到任务,处理到一半,进程 crash 了。此时任务状态是 processing,永远不会被重新消费。

解决方案:心跳 + 超时重置

// Worker 在处理任务时,定期更新心跳
func (w *Worker) process(ctx context.Context, job *Job) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    
    // 心跳 goroutine
    go func() {
        ticker := time.NewTicker(10 * time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                w.db.ExecContext(ctx, `
                    UPDATE jobs SET last_heartbeat = NOW() WHERE id = $1
                `, job.ID)
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // 实际处理
    if err := w.handler(ctx, job); err != nil {
        w.markFailed(job, err)
        return
    }
    w.markDone(job)
}
 
// 定期扫描超时任务,重置为 pending
func (q *PostgresQueue) RecoverStaleJobs(ctx context.Context) {
    q.db.ExecContext(ctx, `
        UPDATE jobs 
        SET status = 'pending', retry_count = retry_count + 1
        WHERE status = 'processing'
          AND last_heartbeat < NOW() - INTERVAL '30 seconds'
          AND retry_count < max_retries
    `)
}

这就是为什么 at-least-once 而不是 exactly-once:任务可能被处理了一次(Worker crash 前),然后超时后被重新消费再处理一次。

应对 at-least-once 的正确姿势是幂等设计

// 在任务 payload 中携带幂等键
type SendEmailJob struct {
    IdempotencyKey string  // 唯一标识,通常是业务 ID 的哈希
    UserID         int64
    Template       string
    Variables      map[string]string
}
 
func (h *EmailHandler) Handle(ctx context.Context, job *Job) error {
    var task SendEmailJob
    json.Unmarshal(job.Payload, &task)
    
    // 检查是否已经发送过
    var sent bool
    h.db.QueryRowContext(ctx, 
        "SELECT EXISTS(SELECT 1 FROM sent_emails WHERE idempotency_key = $1)",
        task.IdempotencyKey,
    ).Scan(&sent)
    
    if sent {
        return nil  // 已发送,幂等返回成功
    }
    
    if err := h.emailClient.Send(ctx, task); err != nil {
        return err
    }
    
    // 记录已发送(与发送操作放在同一事务内更好,但 email 通常不支持事务)
    h.db.ExecContext(ctx,
        "INSERT INTO sent_emails (idempotency_key, sent_at) VALUES ($1, NOW())",
        task.IdempotencyKey,
    )
    return nil
}

这个方案的性能上限

pgbench 和真实 HookRelay 负载测试的数据:

| 场景 | 吞吐量 | |------|-------| | 单 Worker,无竞争 | ~3,500 jobs/s | | 10 Workers,SKIP LOCKED | ~12,000 jobs/s | | 20 Workers,SKIP LOCKED | ~18,000 jobs/s(接近瓶颈) | | 20 Workers,FOR UPDATE(对比) | ~800 jobs/s(锁等待严重) |

在 10 Workers 时,SKIP LOCKED 的吞吐比 FOR UPDATE 高 15 倍

瓶颈在哪里?当 Worker 数量增多,所有 Worker 都在竞争扫描同一个表的前 N 行,即使 SKIP LOCKED 跳过了已锁的行,索引扫描本身也会有竞争(index page 上的 buffer lock)。

超过 ~18,000 jobs/s 后,更多 Worker 已经没有收益,需要考虑分片(多张队列表,Worker 分组消费)或迁移到真正的消息队列系统。

对于 HookRelay 的场景,18,000 events/s 已经远超需求,这个方案完全合适。

什么时候应该换 Kafka

明确说清楚这个方案的边界,比鼓吹它万能更重要:

  • > 50,000 events/s:PostgreSQL 队列无论怎么优化都到头了,换 Kafka
  • 需要消息重放:Kafka 保留历史,可以重放任意时间点的消息;PostgreSQL 队列消费完就没了
  • 多消费者组独立消费:Kafka 的 consumer group 天然支持;PostgreSQL 需要为每个消费者复制一份数据
  • 严格的消息顺序:Kafka 分区内有序;PostgreSQL 在多 Worker 并发下不保证顺序

如果你的场景不涉及以上情况,PostgreSQL 队列的运维简单性是真实的优势——不需要维护额外的基础设施,事务性保证更强,调试更直接(直接 SELECT 看队列状态)。