← Back
后端开发 2026.03.06

Programming Basics 0007_Concurrency Patterns

后端开发

Common Go concurrency design patterns, each with complete runnable examples and descriptions of applicable scenarios

1. Worker Pool Pattern

A fixed number of worker goroutines fetch and execute tasks from a shared task queue, controlling the degree of concurrency.

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d 开始处理任务 %d\n", id, j)
        time.Sleep(time.Second) // 模拟耗时操作
        results <- j * 2
        fmt.Printf("Worker %d 完成任务 %d\n", id, j)
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 启动 worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 关闭任务通道,worker 的 range 会结束

    // 等待所有 worker 完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for r := range results {
        fmt.Println("结果:", r)
    }
}

Applicable Scenarios: HTTP request processing, batch data processing, limiting concurrent calls to external services

2. Pipeline Pattern

Multiple stages are chained together, with each stage being a group of goroutines passing data via channels.

package main

import "fmt"

// 阶段1: 生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段2: 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 阶段3: 加倍
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // 串联管道: generate -> square -> double
    ch := double(square(generate(2, 3, 4)))
    for v := range ch {
        fmt.Println(v) // 8, 18, 32
    }
}

Applicable Scenarios: Data processing pipelines, ETL, log processing pipelines

3. Fan-out / Fan-in Pattern

  • Fan-out: Multiple goroutines read from the same channel (distributing work)
  • Fan-in: The outputs of multiple channels are merged into one channel
package main

import (
    "fmt"
    "sync"
    "time"
)

// 生产者
func producer(id int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(50 * time.Millisecond)
            out <- id*100 + i
        }
        close(out)
    }()
    return out
}

// Fan-in: 合并多个 channel 到一个
func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Fan-out: 3 个生产者并行工作
    c1 := producer(1)
    c2 := producer(2)
    c3 := producer(3)

    // Fan-in: 合并结果
    for v := range fanIn(c1, c2, c3) {
        fmt.Println(v)
    }
}

Applicable Scenarios: Aggregation of multiple data sources, merging results of parallel API calls

4. errgroup Package

golang.org/x/sync/errgroup — Executes multiple tasks concurrently, canceling all if any one fails.

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获循环变量
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()
            results[i] = resp.StatusCode
            return nil
        })
    }

    // 等待所有任务完成或第一个错误
    if err := g.Wait(); err != nil {
        fmt.Println("错误:", err)
        return
    }

    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

errgroup Limiting Concurrency (SetLimit)

g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 个并发

for _, task := range tasks {
    task := task
    g.Go(func() error {
        return process(task)
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

Applicable Scenarios: Concurrent API calls, batch operations requiring error collection, automatic cancellation

5. Rate Limiter Pattern

5.1 time.Ticker Simple Rate Limiting

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每 200ms 处理一个请求(5 QPS)
    limiter := time.NewTicker(200 * time.Millisecond)
    defer limiter.Stop()

    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)

    for req := range requests {
        <-limiter.C // 等待下一个 tick
        fmt.Printf("[%s] 处理请求 %d\n", time.Now().Format("04:05.000"), req)
    }
}

5.2 Token Bucket Rate Limiting

package main

import (
    "fmt"
    "sync"
    "time"
)

type TokenBucket struct {
    tokens   chan struct{}
    interval time.Duration
    stop     chan struct{}
}

func NewTokenBucket(rate int, burst int) *TokenBucket {
    tb := &TokenBucket{
        tokens:   make(chan struct{}, burst),
        interval: time.Second / time.Duration(rate),
        stop:     make(chan struct{}),
    }
    // 初始填满
    for i := 0; i < burst; i++ {
        tb.tokens <- struct{}{}
    }
    // 持续补充令牌
    go func() {
        ticker := time.NewTicker(tb.interval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case tb.tokens <- struct{}{}:
                default: // 桶满了,丢弃
                }
            case <-tb.stop:
                return
            }
        }
    }()
    return tb
}

func (tb *TokenBucket) Allow() bool {
    select {
    case <-tb.tokens:
        return true
    default:
        return false
    }
}

func (tb *TokenBucket) Wait() {
    <-tb.tokens
}

func (tb *TokenBucket) Close() {
    close(tb.stop)
}

func main() {
    bucket := NewTokenBucket(5, 3) // 5 QPS, 突发 3
    defer bucket.Close()

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            bucket.Wait() // 阻塞等待令牌
            fmt.Printf("[%s] 请求 %d 获得令牌\n", time.Now().Format("04:05.000"), id)
        }(i)
    }
    wg.Wait()
}

Applicable Scenarios: API rate limiting, preventing downstream overload, controlling resource consumption

6. Timeout and Cancellation Pattern

6.1 context.WithTimeout

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(3 * time.Second): // 模拟慢操作
        return "操作完成", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // 设置 1 秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("超时或取消:", err) // context deadline exceeded
        return
    }
    fmt.Println(result)
}

6.2 Manual Cancellation + Multiple Goroutines

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // 启动多个 worker
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d 收到取消信号\n", id)
                    return
                default:
                    // 工作...
                    time.Sleep(500 * time.Millisecond)
                    fmt.Printf("Worker %d 工作中...\n", id)
                }
            }
        }(i)
    }

    time.Sleep(2 * time.Second)
    cancel() // 取消所有 worker
    time.Sleep(100 * time.Millisecond)
    fmt.Println("所有 worker 已停止")
}

7. Producer-Consumer Pattern

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Order struct {
    ID    int
    Price float64
}

func producer(orders chan<- Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i <= 20; i++ {
        order := Order{
            ID:    i,
            Price: float64(rand.Intn(1000)) / 10.0,
        }
        orders <- order
        fmt.Printf("生产订单 #%d (%.1f元)\n", order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    }
}

func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for order := range orders {
        fmt.Printf("  消费者%d 处理订单 #%d (%.1f元)\n", id, order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
    }
}

func main() {
    orders := make(chan Order, 5) // 缓冲 5 个订单

    var producerWg, consumerWg sync.WaitGroup

    // 2 个生产者
    for i := 0; i < 2; i++ {
        producerWg.Add(1)
        go producer(orders, &producerWg)
    }

    // 3 个消费者
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go consumer(i, orders, &consumerWg)
    }

    // 等生产者完成后关闭 channel
    producerWg.Wait()
    close(orders)

    // 等消费者处理完
    consumerWg.Wait()
    fmt.Println("所有订单处理完毕")
}

8. Or-Done Channel Pattern

Reads from a channel but needs to respond to a cancellation signal:

// orDone 包装一个 channel,使其可以被 done 信号中断
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    data := make(chan int)

    // 生产数据
    go func() {
        for i := 0; ; i++ {
            data <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 2 秒后取消
    go func() {
        time.Sleep(2 * time.Second)
        close(done)
    }()

    for v := range orDone(done, data) {
        fmt.Println(v)
    }
    fmt.Println("已取消")
}

9. Semaphore Pattern

Uses a buffered channel to simulate a semaphore, controlling the maximum degree of concurrency:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

func main() {
    sem := NewSemaphore(3) // 最多 3 个并发
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sem.Acquire()
            defer sem.Release()
            fmt.Printf("[%s] 任务 %d 开始\n", time.Now().Format("05.000"), id)
            time.Sleep(time.Second)
            fmt.Printf("[%s] 任务 %d 完成\n", time.Now().Format("05.000"), id)
        }(i)
    }
    wg.Wait()
}

Pattern Quick Reference Table

PatternCore IdeaTypical Scenario
Worker PoolFixed number of goroutines consuming task queueBatch processing, limiting concurrency
PipelineMultiple stages chained via channelsData processing pipelines
Fan-in/Fan-outSplit for parallelism + merge resultsMulti-data source aggregation
errgroupConcurrency + error collection + automatic cancellationParallel API calls
Token BucketControl request rateAPI rate limiting
Context Timeout/CancellationPropagate cancellation signalRPC/HTTP timeout control
Producer-ConsumerDecouple production and consumption ratesMessage queues, task scheduling
Or-DoneInterruptible channel readPipelines requiring graceful shutdown
SemaphoreSemaphore controlling concurrencyConnection pools, resource limits