跳至内容
控制并发而不是制造 Goroutine:Worker Pool、信号量与背压

控制并发而不是制造 Goroutine:Worker Pool、信号量与背压

2026年6月22日·
yanlong

Goroutine 很便宜,但你的数据库连接、下游 QPS、内存和 CPU 并不便宜。无上限地启动 goroutine,只是把排队从一个可观测的队列搬进运行时调度器,同时把压力传给所有下游。

并发控制需要回答三个不同问题:

  1. 同时执行多少任务?
  2. 最多允许多少任务排队?
  3. 队列满时,生产者怎么办?

只设置 goroutine 数量,仍然没有完整的容量模型。

从服务时间估算并发度

对 I/O 型任务,可以从 Little’s Law 的直觉开始:

系统中任务数 ≈ 到达率 × 平均停留时间

下游平均 100ms、目标 200 QPS,理论上约需要 20 个并发才能维持吞吐。但 P99、连接池大小、下游限额和重试都会改变结果。公式给起点,压测和线上指标负责校准。

CPU 密集型任务通常从 GOMAXPROCS 附近开始测试;并发远高于可用 CPU 只会增加调度和缓存抖动。

固定 Worker Pool

type Pool struct {
    jobs chan Job
    wg   sync.WaitGroup
}

func NewPool(workerCount, queueSize int) *Pool {
    p := &Pool{jobs: make(chan Job, queueSize)}
    p.wg.Add(workerCount)
    for range workerCount {
        go func() {
            defer p.wg.Done()
            for job := range p.jobs {
                job.Run()
            }
        }()
    }
    return p
}

func (p *Pool) Submit(ctx context.Context, job Job) error {
    select {
    case p.jobs <- job:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (p *Pool) Close() {
    close(p.jobs)
    p.wg.Wait()
}

这个最小版本已经表达了几个重要决定:固定执行并发、有限队列、提交响应取消、关闭后排空队列。

生产实现还要处理重复 Close、Close 与 Submit 并发导致的 send-on-closed-channel、任务 panic 和任务错误。与其让所有调用方直接拿到 jobs,更好的做法是用状态锁封装提交与关闭协议,或者由单一所有者负责关闭。

队列容量不是越大越好

大队列会让峰值看起来被“处理”了,实际只是把延迟和内存藏起来。任务在队列里等 30 秒后即使执行成功,对请求方也可能早已无意义。

队列至少要监控:

  • 当前长度和容量;
  • 排队等待时间;
  • 拒绝或超时提交数;
  • 执行耗时与失败率;
  • 活跃 worker 数。

背压的价值就是让过载尽早变成可见信号。

Channel Semaphore:限制一段代码的并发

不需要长期 worker 时,可以用带缓冲 channel 表示令牌:

type Semaphore chan struct{}

func (s Semaphore) Acquire(ctx context.Context) error {
    select {
    case s <- struct{}{}:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (s Semaphore) Release() {
    <-s
}

使用时确保成功 Acquire 才 defer Release:

if err := sem.Acquire(ctx); err != nil {
    return err
}
defer sem.Release()
return callDownstream(ctx)

这种模式限制的是“正在执行的临界工作”,每个任务仍可能先启动一个 goroutine 并阻塞在 Acquire。如果输入极大,这些等待 goroutine 本身仍占资源。需要同时限制排队数量时,worker pool 更明确。

加权任务可使用 golang.org/x/sync/semaphore.Weighted,例如大任务消耗 4 个令牌、小任务消耗 1 个。权重必须能反映真实资源,否则只是制造更复杂的饥饿问题。

errgroup.SetLimit:一次调用范围内的并发上限

group, ctx := errgroup.WithContext(parent)
group.SetLimit(16)

for _, item := range items {
    item := item
    group.Go(func() error {
        return process(ctx, item)
    })
}
return group.Wait()

它适合有限输入集合和“首错取消”语义。与长期 worker pool 相比,不需要维护常驻队列;与 channel semaphore 相比,任务等待发生在 group.Go 调用处,不会先为全部输入创建 goroutine。

背压策略必须写进 API

队列满时通常有四种策略:

阻塞

生产者等待空位,并响应 Context。适合不能丢失、上游能够被自然减速的任务。

拒绝

立即返回 ErrQueueFull,让上游重试、降级或返回 429/503。适合在线请求和明确容量边界。

丢弃

适合允许采样的指标、调试日志等非关键数据。必须计数并告警,不能静默。

持久化

把任务交给 Kafka、数据库 outbox 等外部队列。适合进程重启后仍不能丢失的业务任务,但这已经是可靠消息系统问题,不是一个更大的 channel 能解决的。

    flowchart LR
  Producer[生产者] --> Gate{队列有空间?}
  Gate -->|有| Queue[有限队列]
  Gate -->|无| Policy{背压策略}
  Policy --> Block[等待]
  Policy --> Reject[拒绝]
  Policy --> Drop[可观测丢弃]
  Policy --> Persist[外部持久队列]
  Queue --> Workers[固定并发 Worker]
  

并发上限要对准真正的瓶颈

如果数据库连接池只有 20 个连接,却允许 200 个 goroutine 同时查询,180 个任务只是换个地方排队。把并发上限设在 20 也未必正确,因为事务、健康检查和其他路径还要共享连接。

常见做法是为不同下游建立独立 bulkhead:库存 RPC、支付 RPC 和数据库各自限流,避免一个依赖变慢拖死所有请求。全局一个 semaphore 很难表达这种隔离。

关闭 Worker Pool 的顺序

可靠停机一般遵循:

  1. 停止接受新任务;
  2. 决定排空还是取消队列;
  3. 向执行中的任务传播截止时间;
  4. 等待 worker 退出;
  5. 超过停机预算后记录未完成任务。

如果任务必须保证完成,进程内队列本身就不够可靠:崩溃时内存中的任务无法恢复。不要把优雅停机当成持久性保证。

一组需要一起调的参数

并发数、队列长度、任务超时和重试次数不能分别拍脑袋:

  • 并发越高,下游竞争越强;
  • 队列越长,端到端延迟越高;
  • 超时越长,槽位占用越久;
  • 重试越多,过载时新增流量越大。

压测时同时观察吞吐、P95/P99、错误率、队列等待和下游饱和度。目标不是让 goroutine 跑满,而是在过载时仍能给出可预测的响应。

延伸阅读