Programming Basics 0007_Concurrency Patterns

Go Concurrency Patterns

Common Go concurrency design patterns, each with complete runnable examples and descriptions of applicable scenarios

1. Worker Pool Pattern

A fixed number of worker goroutines fetch and execute tasks from a shared task queue, controlling the degree of concurrency.

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d 开始处理任务 %d\n", id, j)
        time.Sleep(time.Second) // 模拟耗时操作
        results <- j * 2
        fmt.Printf("Worker %d 完成任务 %d\n", id, j)
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 启动 worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 关闭任务通道,worker 的 range 会结束

    // 等待所有 worker 完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for r := range results {
        fmt.Println("结果:", r)
    }
}

Applicable Scenarios: HTTP request processing, batch data processing, limiting concurrent calls to external services

2. Pipeline Pattern

Multiple stages are chained together, with each stage being a group of goroutines passing data via channels.

package main

import "fmt"

// 阶段1: 生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段2: 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 阶段3: 加倍
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // 串联管道: generate -> square -> double
    ch := double(square(generate(2, 3, 4)))
    for v := range ch {
        fmt.Println(v) // 8, 18, 32
    }
}

Applicable Scenarios: Data processing pipelines, ETL, log processing pipelines

3. Fan-out / Fan-in Pattern

  • Fan-out: Multiple goroutines read from the same channel (distributing work)
  • Fan-in: The outputs of multiple channels are merged into one channel
package main

import (
    "fmt"
    "sync"
    "time"
)

// 生产者
func producer(id int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(50 * time.Millisecond)
            out <- id*100 + i
        }
        close(out)
    }()
    return out
}

// Fan-in: 合并多个 channel 到一个
func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Fan-out: 3 个生产者并行工作
    c1 := producer(1)
    c2 := producer(2)
    c3 := producer(3)

    // Fan-in: 合并结果
    for v := range fanIn(c1, c2, c3) {
        fmt.Println(v)
    }
}

Applicable Scenarios: Aggregation of multiple data sources, merging results of parallel API calls

4. errgroup Package

golang.org/x/sync/errgroup — Executes multiple tasks concurrently, canceling all if any one fails.

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获循环变量
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()
            results[i] = resp.StatusCode
            return nil
        })
    }

    // 等待所有任务完成或第一个错误
    if err := g.Wait(); err != nil {
        fmt.Println("错误:", err)
        return
    }

    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

errgroup Limiting Concurrency (SetLimit)

g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 个并发

for _, task := range tasks {
    task := task
    g.Go(func() error {
        return process(task)
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

Applicable Scenarios: Concurrent API calls, batch operations requiring error collection, automatic cancellation

5. Rate Limiter Pattern

5.1 time.Ticker Simple Rate Limiting

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每 200ms 处理一个请求(5 QPS)
    limiter := time.NewTicker(200 * time.Millisecond)
    defer limiter.Stop()

    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)

    for req := range requests {
        <-limiter.C // 等待下一个 tick
        fmt.Printf("[%s] 处理请求 %d\n", time.Now().Format("04:05.000"), req)
    }
}

5.2 Token Bucket Rate Limiting

package main

import (
    "fmt"
    "sync"
    "time"
)

type TokenBucket struct {
    tokens   chan struct{}
    interval time.Duration
    stop     chan struct{}
}

func NewTokenBucket(rate int, burst int) *TokenBucket {
    tb := &TokenBucket{
        tokens:   make(chan struct{}, burst),
        interval: time.Second / time.Duration(rate),
        stop:     make(chan struct{}),
    }
    // 初始填满
    for i := 0; i < burst; i++ {
        tb.tokens <- struct{}{}
    }
    // 持续补充令牌
    go func() {
        ticker := time.NewTicker(tb.interval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case tb.tokens <- struct{}{}:
                default: // 桶满了,丢弃
                }
            case <-tb.stop:
                return
            }
        }
    }()
    return tb
}

func (tb *TokenBucket) Allow() bool {
    select {
    case <-tb.tokens:
        return true
    default:
        return false
    }
}

func (tb *TokenBucket) Wait() {
    <-tb.tokens
}

func (tb *TokenBucket) Close() {
    close(tb.stop)
}

func main() {
    bucket := NewTokenBucket(5, 3) // 5 QPS, 突发 3
    defer bucket.Close()

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            bucket.Wait() // 阻塞等待令牌
            fmt.Printf("[%s] 请求 %d 获得令牌\n", time.Now().Format("04:05.000"), id)
        }(i)
    }
    wg.Wait()
}

Applicable Scenarios: API rate limiting, preventing downstream overload, controlling resource consumption

6. Timeout and Cancellation Pattern

6.1 context.WithTimeout

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(3 * time.Second): // 模拟慢操作
        return "操作完成", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // 设置 1 秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("超时或取消:", err) // context deadline exceeded
        return
    }
    fmt.Println(result)
}

6.2 Manual Cancellation + Multiple Goroutines

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // 启动多个 worker
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d 收到取消信号\n", id)
                    return
                default:
                    // 工作...
                    time.Sleep(500 * time.Millisecond)
                    fmt.Printf("Worker %d 工作中...\n", id)
                }
            }
        }(i)
    }

    time.Sleep(2 * time.Second)
    cancel() // 取消所有 worker
    time.Sleep(100 * time.Millisecond)
    fmt.Println("所有 worker 已停止")
}

7. Producer-Consumer Pattern

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Order struct {
    ID    int
    Price float64
}

func producer(orders chan<- Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i <= 20; i++ {
        order := Order{
            ID:    i,
            Price: float64(rand.Intn(1000)) / 10.0,
        }
        orders <- order
        fmt.Printf("生产订单 #%d (%.1f元)\n", order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    }
}

func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for order := range orders {
        fmt.Printf("  消费者%d 处理订单 #%d (%.1f元)\n", id, order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
    }
}

func main() {
    orders := make(chan Order, 5) // 缓冲 5 个订单

    var producerWg, consumerWg sync.WaitGroup

    // 2 个生产者
    for i := 0; i < 2; i++ {
        producerWg.Add(1)
        go producer(orders, &producerWg)
    }

    // 3 个消费者
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go consumer(i, orders, &consumerWg)
    }

    // 等生产者完成后关闭 channel
    producerWg.Wait()
    close(orders)

    // 等消费者处理完
    consumerWg.Wait()
    fmt.Println("所有订单处理完毕")
}

8. Or-Done Channel Pattern

Reads from a channel but needs to respond to a cancellation signal:

// orDone 包装一个 channel,使其可以被 done 信号中断
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    data := make(chan int)

    // 生产数据
    go func() {
        for i := 0; ; i++ {
            data <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 2 秒后取消
    go func() {
        time.Sleep(2 * time.Second)
        close(done)
    }()

    for v := range orDone(done, data) {
        fmt.Println(v)
    }
    fmt.Println("已取消")
}

9. Semaphore Pattern

Uses a buffered channel to simulate a semaphore, controlling the maximum degree of concurrency:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

func main() {
    sem := NewSemaphore(3) // 最多 3 个并发
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sem.Acquire()
            defer sem.Release()
            fmt.Printf("[%s] 任务 %d 开始\n", time.Now().Format("05.000"), id)
            time.Sleep(time.Second)
            fmt.Printf("[%s] 任务 %d 完成\n", time.Now().Format("05.000"), id)
        }(i)
    }
    wg.Wait()
}

Pattern Quick Reference Table

Pattern Core Idea Typical Scenario
Worker Pool Fixed number of goroutines consuming task queue Batch processing, limiting concurrency
Pipeline Multiple stages chained via channels Data processing pipelines
Fan-in/Fan-out Split for parallelism + merge results Multi-data source aggregation
errgroup Concurrency + error collection + automatic cancellation Parallel API calls
Token Bucket Control request rate API rate limiting
Context Timeout/Cancellation Propagate cancellation signal RPC/HTTP timeout control
Producer-Consumer Decouple production and consumption rates Message queues, task scheduling
Or-Done Interruptible channel read Pipelines requiring graceful shutdown
Semaphore Semaphore controlling concurrency Connection pools, resource limits

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6721

(0)
Walker的头像Walker
上一篇 12 hours ago
下一篇 Nov 25, 2025 13:00

Related Posts

EN
简体中文 繁體中文 English