跳至内容
结构化并发:WaitGroup、errgroup 与并发任务收敛

结构化并发:WaitGroup、errgroup 与并发任务收敛

2026年6月23日·
yanlong

并发最难维护的形态不是 goroutine 多,而是任务离开了调用栈:函数已经返回,子任务还在运行;某个子任务失败,兄弟任务毫不知情;调用方不知道什么时候资源才真正释放。

结构化并发要求并发任务形成清楚的父子范围:父调用启动子任务,在离开这个范围前等待它们结束;取消和错误沿范围传播。

    flowchart TD
  P[父调用] --> A[任务 A]
  P --> B[任务 B]
  P --> C[任务 C]
  A --> J[Wait / 收敛点]
  B --> J
  C --> J
  J --> R[父调用返回]
  

WaitGroup 只负责等待

经典模式:

var wg sync.WaitGroup
for _, item := range items {
    item := item
    wg.Add(1)
    go func() {
        defer wg.Done()
        process(item)
    }()
}
wg.Wait()

Add 要在启动 goroutine 之前执行,否则新 goroutine 可能还没 Add,主 goroutine 的 Wait 已经返回。

Go 1.25 起可以使用 WaitGroup.Go 消除 Add/Done 配对:

var wg sync.WaitGroup
for _, item := range items {
    item := item
    wg.Go(func() {
        process(item)
    })
}
wg.Wait()

传给 wg.Go 的函数不应 panic。WaitGroup 不收集错误,也不自动取消兄弟任务。如果任务可能失败,需要自己增加错误通道、锁或其他协调,这通常就是应该考虑 errgroup 的信号。

手写错误收集为什么容易失控

errorsCh := make(chan error, len(items))
var wg sync.WaitGroup

for _, item := range items {
    item := item
    wg.Add(1)
    go func() {
        defer wg.Done()
        errorsCh <- process(item)
    }()
}

wg.Wait()
close(errorsCh)

这段代码还要决定:

  • nil error 是否发送;
  • 返回第一个错误还是合并全部错误;
  • 一个任务失败后,其他任务是否继续;
  • 调用方取消时,任务如何停止;
  • channel 容量是否可能导致发送阻塞。

不是不能手写,而是这些决定已经构成一个并发协议。对“任一失败则取消全部”的任务组,golang.org/x/sync/errgroup 已经提供了成熟实现。

errgroup 把错误和取消绑在一起

group, ctx := errgroup.WithContext(parent)

for _, id := range orderIDs {
    id := id
    group.Go(func() error {
        return rebuildIndex(ctx, id)
    })
}

if err := group.Wait(); err != nil {
    return fmt.Errorf("rebuild indexes: %w", err)
}

WithContext 返回的 Context 会在第一个任务返回非 nil error 时取消,也会在 Wait 返回时取消。任务必须真正使用这个 ctx,取消才有意义。

func rebuildIndex(ctx context.Context, id string) error {
    rows, err := db.QueryContext(ctx, query, id)
    // ...
}

errgroup 返回第一个非 nil 错误。它适合“一个失败,整个组合操作就失败”的语义;批处理若要求记录每个条目的结果,不应把所有错误都压成第一个。

结果收集要避免并发 append

var results []Result
group.Go(func() error {
    result := compute()
    results = append(results, result) // 数据竞争
    return nil
})

如果输入数量固定,预分配并让每个任务写唯一索引最简单:

results := make([]Result, len(inputs))
group, ctx := errgroup.WithContext(parent)

for i, input := range inputs {
    i, input := i, input
    group.Go(func() error {
        result, err := compute(ctx, input)
        if err != nil {
            return err
        }
        results[i] = result
        return nil
    })
}

if err := group.Wait(); err != nil {
    return nil, err
}
return results, nil

不同 goroutine 写不同数组元素可以避免对同一内存位置的竞争;切片长度和底层数组不能在期间改变。返回结果前等待全部任务,确保写入已经完成。

SetLimit 把并发上限放进任务组

group, ctx := errgroup.WithContext(parent)
group.SetLimit(8)

for _, item := range items {
    item := item
    group.Go(func() error {
        return process(ctx, item)
    })
}
return group.Wait()

当已有 8 个任务运行时,后续 Go 调用会阻塞,直到出现空位。限制值不应在任务仍活跃时修改。

注意这个阻塞发生在提交任务的 goroutine。如果提交本身也必须响应 Context,可以在循环中先检查 ctx,或者使用更显式的队列/worker pool。

TryGo 在没有空位时返回 false,适合调用方拥有清晰降级策略的场景。不要把 false 静默当作成功,否则任务会无声丢失。

什么时候不该取消兄弟任务

假设批量发送 100 封通知,某一封失败不代表其他 99 封应该取消。此时可以让每个任务写入独立结果,WaitGroup 等待全部完成:

type Outcome struct {
    ID  string
    Err error
}

outcomes := make([]Outcome, len(messages))
var wg sync.WaitGroup

for i, message := range messages {
    i, message := i, message
    wg.Add(1)
    go func() {
        defer wg.Done()
        outcomes[i] = Outcome{
            ID:  message.ID,
            Err: send(parent, message),
        }
    }()
}
wg.Wait()

结构化并发不等于“第一个失败就取消”,而是父调用明确拥有任务范围和收敛点。错误策略由业务决定。

panic 如何处理

普通 goroutine 中未恢复的 panic 会让整个进程崩溃。不要期待 WaitGroup 或 errgroup 自动把 panic 变成 error。

如果任务来自不可信插件或必须隔离的作业,可以在任务边界 recover、记录堆栈并转换错误;普通内部代码更适合让 panic 暴露程序错误,而不是悄悄吞掉。

结构化并发的代码形状

一段容易维护的并发代码通常具备这些特征:

  • GoWait 出现在可见的同一范围;
  • 子任务继承父 Context;
  • 错误策略明确:首错取消、全部收集或允许部分成功;
  • 结果写入没有共享 append;
  • 并发上限是配置,不由输入规模决定;
  • 父函数返回时,子任务已经结束。

这比“到处启动 goroutine,再用日志观察它们”可靠得多。

延伸阅读