控制并发而不是制造 Goroutine:Worker Pool、信号量与背压
Goroutine 很便宜,但你的数据库连接、下游 QPS、内存和 CPU 并不便宜。无上限地启动 goroutine,只是把排队从一个可观测的队列搬进运行时调度器,同时把压力传给所有下游。
并发控制需要回答三个不同问题:
- 同时执行多少任务?
- 最多允许多少任务排队?
- 队列满时,生产者怎么办?
只设置 goroutine 数量,仍然没有完整的容量模型。
从服务时间估算并发度
对 I/O 型任务,可以从 Little’s Law 的直觉开始:
系统中任务数 ≈ 到达率 × 平均停留时间下游平均 100ms、目标 200 QPS,理论上约需要 20 个并发才能维持吞吐。但 P99、连接池大小、下游限额和重试都会改变结果。公式给起点,压测和线上指标负责校准。
CPU 密集型任务通常从 GOMAXPROCS 附近开始测试;并发远高于可用 CPU 只会增加调度和缓存抖动。
固定 Worker Pool
type Pool struct {
jobs chan Job
wg sync.WaitGroup
}
func NewPool(workerCount, queueSize int) *Pool {
p := &Pool{jobs: make(chan Job, queueSize)}
p.wg.Add(workerCount)
for range workerCount {
go func() {
defer p.wg.Done()
for job := range p.jobs {
job.Run()
}
}()
}
return p
}
func (p *Pool) Submit(ctx context.Context, job Job) error {
select {
case p.jobs <- job:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (p *Pool) Close() {
close(p.jobs)
p.wg.Wait()
}这个最小版本已经表达了几个重要决定:固定执行并发、有限队列、提交响应取消、关闭后排空队列。
生产实现还要处理重复 Close、Close 与 Submit 并发导致的 send-on-closed-channel、任务 panic 和任务错误。与其让所有调用方直接拿到 jobs,更好的做法是用状态锁封装提交与关闭协议,或者由单一所有者负责关闭。
队列容量不是越大越好
大队列会让峰值看起来被“处理”了,实际只是把延迟和内存藏起来。任务在队列里等 30 秒后即使执行成功,对请求方也可能早已无意义。
队列至少要监控:
- 当前长度和容量;
- 排队等待时间;
- 拒绝或超时提交数;
- 执行耗时与失败率;
- 活跃 worker 数。
背压的价值就是让过载尽早变成可见信号。
Channel Semaphore:限制一段代码的并发
不需要长期 worker 时,可以用带缓冲 channel 表示令牌:
type Semaphore chan struct{}
func (s Semaphore) Acquire(ctx context.Context) error {
select {
case s <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (s Semaphore) Release() {
<-s
}使用时确保成功 Acquire 才 defer Release:
if err := sem.Acquire(ctx); err != nil {
return err
}
defer sem.Release()
return callDownstream(ctx)这种模式限制的是“正在执行的临界工作”,每个任务仍可能先启动一个 goroutine 并阻塞在 Acquire。如果输入极大,这些等待 goroutine 本身仍占资源。需要同时限制排队数量时,worker pool 更明确。
加权任务可使用 golang.org/x/sync/semaphore.Weighted,例如大任务消耗 4 个令牌、小任务消耗 1 个。权重必须能反映真实资源,否则只是制造更复杂的饥饿问题。
errgroup.SetLimit:一次调用范围内的并发上限
group, ctx := errgroup.WithContext(parent)
group.SetLimit(16)
for _, item := range items {
item := item
group.Go(func() error {
return process(ctx, item)
})
}
return group.Wait()它适合有限输入集合和“首错取消”语义。与长期 worker pool 相比,不需要维护常驻队列;与 channel semaphore 相比,任务等待发生在 group.Go 调用处,不会先为全部输入创建 goroutine。
背压策略必须写进 API
队列满时通常有四种策略:
阻塞
生产者等待空位,并响应 Context。适合不能丢失、上游能够被自然减速的任务。
拒绝
立即返回 ErrQueueFull,让上游重试、降级或返回 429/503。适合在线请求和明确容量边界。
丢弃
适合允许采样的指标、调试日志等非关键数据。必须计数并告警,不能静默。
持久化
把任务交给 Kafka、数据库 outbox 等外部队列。适合进程重启后仍不能丢失的业务任务,但这已经是可靠消息系统问题,不是一个更大的 channel 能解决的。
flowchart LR
Producer[生产者] --> Gate{队列有空间?}
Gate -->|有| Queue[有限队列]
Gate -->|无| Policy{背压策略}
Policy --> Block[等待]
Policy --> Reject[拒绝]
Policy --> Drop[可观测丢弃]
Policy --> Persist[外部持久队列]
Queue --> Workers[固定并发 Worker]
并发上限要对准真正的瓶颈
如果数据库连接池只有 20 个连接,却允许 200 个 goroutine 同时查询,180 个任务只是换个地方排队。把并发上限设在 20 也未必正确,因为事务、健康检查和其他路径还要共享连接。
常见做法是为不同下游建立独立 bulkhead:库存 RPC、支付 RPC 和数据库各自限流,避免一个依赖变慢拖死所有请求。全局一个 semaphore 很难表达这种隔离。
关闭 Worker Pool 的顺序
可靠停机一般遵循:
- 停止接受新任务;
- 决定排空还是取消队列;
- 向执行中的任务传播截止时间;
- 等待 worker 退出;
- 超过停机预算后记录未完成任务。
如果任务必须保证完成,进程内队列本身就不够可靠:崩溃时内存中的任务无法恢复。不要把优雅停机当成持久性保证。
一组需要一起调的参数
并发数、队列长度、任务超时和重试次数不能分别拍脑袋:
- 并发越高,下游竞争越强;
- 队列越长,端到端延迟越高;
- 超时越长,槽位占用越久;
- 重试越多,过载时新增流量越大。
压测时同时观察吞吐、P95/P99、错误率、队列等待和下游饱和度。目标不是让 goroutine 跑满,而是在过载时仍能给出可预测的响应。