跳至内容
构建不泄漏的并发流水线:Fan-out、Fan-in 与取消

构建不泄漏的并发流水线:Fan-out、Fan-in 与取消

2026年6月21日·
yanlong

并发流水线把一项工作拆成多个 stage,每个 stage 从上游接收数据、处理后发给下游。它适合流式处理和阶段吞吐差异明显的任务,但也有一个经典故障:下游提前返回,上游永远阻塞在发送。

一个可用的流水线必须同时设计数据流和取消流。

    flowchart LR
  Source[Source] --> Parse1[Parse]
  Source --> Parse2[Parse]
  Parse1 --> Merge[Fan-in]
  Parse2 --> Merge
  Merge --> Store[Store]
  Cancel[Context cancel] -.-> Source
  Cancel -.-> Parse1
  Cancel -.-> Parse2
  Cancel -.-> Store
  

Stage 的基本契约

用泛型可以写出一个简单 stage:

func Map[T, R any](
    ctx context.Context,
    input <-chan T,
    transform func(context.Context, T) (R, error),
) (<-chan R, <-chan error) {
    output := make(chan R)
    errorsCh := make(chan error, 1)

    go func() {
        defer close(output)
        defer close(errorsCh)

        for {
            var value T
            var ok bool
            select {
            case <-ctx.Done():
                return
            case value, ok = <-input:
                if !ok {
                    return
                }
            }

            result, err := transform(ctx, value)
            if err != nil {
                errorsCh <- err
                return
            }

            select {
            case output <- result:
            case <-ctx.Done():
                return
            }
        }
    }()

    return output, errorsCh
}

它体现了几个契约:

  • stage 自己创建 output,所以自己负责关闭;
  • 接收 input 的 stage 不关闭 input;
  • 向下游发送时同时监听取消;
  • error channel 容量为 1,避免报告错误时因无人立即接收而阻塞。

实际 API 往往把值与错误合并成 Result[T],或用 errgroup 管理整条流水线,避免每个 stage 再配一条错误 channel。

Source 也必须能取消

func Generate[T any](ctx context.Context, values ...T) <-chan T {
    output := make(chan T)
    go func() {
        defer close(output)
        for _, value := range values {
            select {
            case output <- value:
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}

如果只在 stage 接收时检查 Context,却在发送时直接 output <- value,下游停止读取后仍会泄漏。每个潜在阻塞点都要能观察取消。

Fan-out:多个 Worker 读取同一输入

多个 goroutine 同时 range 同一个 channel,channel 会把每个值交给其中一个接收者:

func Workers(
    ctx context.Context,
    count int,
    jobs <-chan Job,
) []<-chan Result {
    outputs := make([]<-chan Result, count)
    for i := range count {
        outputs[i] = worker(ctx, jobs)
    }
    return outputs
}

这适合每个任务相互独立、顺序不重要、单个任务成本相近的场景。它不保证输出顺序。需要恢复顺序时,可以给输入附带序号并在下游重排,但重排缓冲会重新引入内存上界问题。

如果少数任务特别慢,静态地给每个 worker 分一批数据容易拖尾;共享输入 channel 可以让空闲 worker 自然领取下一项。

Fan-in:合并多个输出

func Merge[T any](
    ctx context.Context,
    inputs ...<-chan T,
) <-chan T {
    output := make(chan T)
    var wg sync.WaitGroup
    wg.Add(len(inputs))

    for _, input := range inputs {
        input := input
        go func() {
            defer wg.Done()
            for {
                var value T
                var ok bool
                select {
                case <-ctx.Done():
                    return
                case value, ok = <-input:
                    if !ok {
                        return
                    }
                }

                select {
                case output <- value:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(output)
    }()

    return output
}

不能让每个转发 goroutine各自 close(output),因为多个发送方中任意一个先结束都不代表其他发送方结束。单独的协调 goroutine 等全部转发完成后关闭。

提前退出是泄漏高发点

for result := range results {
    if result.Match {
        return result // 上游可能还在发送
    }
}

调用方一旦提前返回,必须先取消整条流水线:

ctx, cancel := context.WithCancel(parent)
defer cancel()

for result := range results {
    if result.Match {
        return result
    }
}

defer cancel() 会在函数返回时执行。前提是所有 stage 在发送、接收或外部 I/O 时都传播 ctx。只在最外层创建 Context,而 stage 仍无条件发送,取消不会自动打断 channel 操作。

用 errgroup 管理整条流水线

当 stage 数量固定、错误需要首错取消时,errgroup 比“函数返回 channel”更容易建立清楚的生命周期:

func RunPipeline(ctx context.Context, input <-chan Item) error {
    group, ctx := errgroup.WithContext(ctx)
    parsed := make(chan Parsed)

    group.Go(func() error {
        defer close(parsed)
        for item := range input {
            value, err := parse(item)
            if err != nil {
                return err
            }
            select {
            case parsed <- value:
            case <-ctx.Done():
                return ctx.Err()
            }
        }
        return nil
    })

    group.Go(func() error {
        for value := range parsed {
            if err := store(ctx, value); err != nil {
                return err
            }
        }
        return nil
    })

    return group.Wait()
}

任一 stage 失败会取消 ctx,其他 stage 在阻塞点退出。Wait 确保函数返回时所有 stage 已收敛。

外部 input 的所有权

示例没有关闭 input,因为它不是本函数创建的。如果某个 stage 直接使用 for range input,而 input 在取消后没有关闭,该 stage 仍可能阻塞,无法观察 ctx。

需要改成显式 select 接收:

for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case item, ok := <-input:
        if !ok {
            return nil
        }
        // process item
    }
}

这是流水线代码里很容易漏掉的一层:接收本身也是阻塞点。

缓冲放在哪里

无缓冲 channel 提供最强背压:下游没有接收,上游就不能继续。缓冲可以吸收阶段之间的短暂速度差,但每个缓冲都增加在途数据和取消后的清理量。

优先从无缓冲开始,只在 profile 或吞吐实验表明阶段之间存在可被缓冲吸收的抖动时增加容量。把每段都设成 1000,不会让最慢 stage 变快,只会让问题晚一点暴露。

什么时候不用流水线

  • 只有少量固定并发调用:errgroup 更简单;
  • 任务需要可靠持久化:使用外部消息系统;
  • 阶段之间共享复杂可变状态:锁和明确服务对象可能更清楚;
  • 整批数据很小:顺序循环的维护成本最低。

流水线不是“高级 Go”的展示品。只有当流式处理、阶段隔离或背压确实属于问题本身时,它才会让设计更简单。

延伸阅读