编程基础 0007_并发模式

Go 并发模式

常见的 Go 并发设计模式,每个模式都有完整可运行示例和适用场景说明

1. Worker Pool 模式

固定数量的 worker goroutine 从共享的任务队列中取任务执行,控制并发度。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d 开始处理任务 %d\n", id, j)
        time.Sleep(time.Second) // 模拟耗时操作
        results <- j * 2
        fmt.Printf("Worker %d 完成任务 %d\n", id, j)
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 启动 worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 关闭任务通道,worker 的 range 会结束

    // 等待所有 worker 完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for r := range results {
        fmt.Println("结果:", r)
    }
}

适用场景: HTTP 请求处理、批量数据处理、限制对外部服务的并发调用数

2. Pipeline 模式

多个阶段串联,每个阶段是一组 goroutine,通过 channel 传递数据。

package main

import "fmt"

// 阶段1: 生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段2: 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 阶段3: 加倍
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // 串联管道: generate -> square -> double
    ch := double(square(generate(2, 3, 4)))
    for v := range ch {
        fmt.Println(v) // 8, 18, 32
    }
}

适用场景: 数据处理流水线、ETL、日志处理管道

3. Fan-out / Fan-in 模式

  • Fan-out: 多个 goroutine 从同一个 channel 读取(分摊工作)
  • Fan-in: 多个 channel 的输出合并到一个 channel
package main

import (
    "fmt"
    "sync"
    "time"
)

// 生产者
func producer(id int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(50 * time.Millisecond)
            out <- id*100 + i
        }
        close(out)
    }()
    return out
}

// Fan-in: 合并多个 channel 到一个
func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Fan-out: 3 个生产者并行工作
    c1 := producer(1)
    c2 := producer(2)
    c3 := producer(3)

    // Fan-in: 合并结果
    for v := range fanIn(c1, c2, c3) {
        fmt.Println(v)
    }
}

适用场景: 多数据源聚合、并行 API 调用结果合并

4. errgroup 包

golang.org/x/sync/errgroup — 并发执行多个任务,任何一个出错则取消全部。

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获循环变量
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()
            results[i] = resp.StatusCode
            return nil
        })
    }

    // 等待所有任务完成或第一个错误
    if err := g.Wait(); err != nil {
        fmt.Println("错误:", err)
        return
    }

    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

errgroup 限制并发数(SetLimit)

g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 个并发

for _, task := range tasks {
    task := task
    g.Go(func() error {
        return process(task)
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

适用场景: 并发 API 调用、批量操作需要收集错误、需要自动取消

5. 限流器模式

5.1 time.Ticker 简单限流

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每 200ms 处理一个请求(5 QPS)
    limiter := time.NewTicker(200 * time.Millisecond)
    defer limiter.Stop()

    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)

    for req := range requests {
        <-limiter.C // 等待下一个 tick
        fmt.Printf("[%s] 处理请求 %d\n", time.Now().Format("04:05.000"), req)
    }
}

5.2 令牌桶限流

package main

import (
    "fmt"
    "sync"
    "time"
)

type TokenBucket struct {
    tokens   chan struct{}
    interval time.Duration
    stop     chan struct{}
}

func NewTokenBucket(rate int, burst int) *TokenBucket {
    tb := &TokenBucket{
        tokens:   make(chan struct{}, burst),
        interval: time.Second / time.Duration(rate),
        stop:     make(chan struct{}),
    }
    // 初始填满
    for i := 0; i < burst; i++ {
        tb.tokens <- struct{}{}
    }
    // 持续补充令牌
    go func() {
        ticker := time.NewTicker(tb.interval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case tb.tokens <- struct{}{}:
                default: // 桶满了,丢弃
                }
            case <-tb.stop:
                return
            }
        }
    }()
    return tb
}

func (tb *TokenBucket) Allow() bool {
    select {
    case <-tb.tokens:
        return true
    default:
        return false
    }
}

func (tb *TokenBucket) Wait() {
    <-tb.tokens
}

func (tb *TokenBucket) Close() {
    close(tb.stop)
}

func main() {
    bucket := NewTokenBucket(5, 3) // 5 QPS, 突发 3
    defer bucket.Close()

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            bucket.Wait() // 阻塞等待令牌
            fmt.Printf("[%s] 请求 %d 获得令牌\n", time.Now().Format("04:05.000"), id)
        }(i)
    }
    wg.Wait()
}

适用场景: API 限流、防止下游过载、控制资源消耗

6. 超时与取消模式

6.1 context.WithTimeout

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(3 * time.Second): // 模拟慢操作
        return "操作完成", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // 设置 1 秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("超时或取消:", err) // context deadline exceeded
        return
    }
    fmt.Println(result)
}

6.2 手动取消 + 多个 goroutine

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // 启动多个 worker
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d 收到取消信号\n", id)
                    return
                default:
                    // 工作...
                    time.Sleep(500 * time.Millisecond)
                    fmt.Printf("Worker %d 工作中...\n", id)
                }
            }
        }(i)
    }

    time.Sleep(2 * time.Second)
    cancel() // 取消所有 worker
    time.Sleep(100 * time.Millisecond)
    fmt.Println("所有 worker 已停止")
}

7. 生产者-消费者模式

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Order struct {
    ID    int
    Price float64
}

func producer(orders chan<- Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i <= 20; i++ {
        order := Order{
            ID:    i,
            Price: float64(rand.Intn(1000)) / 10.0,
        }
        orders <- order
        fmt.Printf("生产订单 #%d (%.1f元)\n", order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    }
}

func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for order := range orders {
        fmt.Printf("  消费者%d 处理订单 #%d (%.1f元)\n", id, order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
    }
}

func main() {
    orders := make(chan Order, 5) // 缓冲 5 个订单

    var producerWg, consumerWg sync.WaitGroup

    // 2 个生产者
    for i := 0; i < 2; i++ {
        producerWg.Add(1)
        go producer(orders, &producerWg)
    }

    // 3 个消费者
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go consumer(i, orders, &consumerWg)
    }

    // 等生产者完成后关闭 channel
    producerWg.Wait()
    close(orders)

    // 等消费者处理完
    consumerWg.Wait()
    fmt.Println("所有订单处理完毕")
}

8. Or-Done Channel 模式

从一个 channel 读取,但需要响应取消信号:

// orDone 包装一个 channel,使其可以被 done 信号中断
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    data := make(chan int)

    // 生产数据
    go func() {
        for i := 0; ; i++ {
            data <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 2 秒后取消
    go func() {
        time.Sleep(2 * time.Second)
        close(done)
    }()

    for v := range orDone(done, data) {
        fmt.Println(v)
    }
    fmt.Println("已取消")
}

9. Semaphore(信号量)模式

用带缓冲的 channel 模拟信号量,控制最大并发数:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

func main() {
    sem := NewSemaphore(3) // 最多 3 个并发
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sem.Acquire()
            defer sem.Release()
            fmt.Printf("[%s] 任务 %d 开始\n", time.Now().Format("05.000"), id)
            time.Sleep(time.Second)
            fmt.Printf("[%s] 任务 %d 完成\n", time.Now().Format("05.000"), id)
        }(i)
    }
    wg.Wait()
}

模式速查表

模式 核心思想 典型场景
Worker Pool 固定 goroutine 数消费任务队列 批量处理、限并发
Pipeline 多阶段 channel 串联 数据处理流水线
Fan-in/Fan-out 拆分并行+合并结果 多数据源聚合
errgroup 并发+错误收集+自动取消 并行 API 调用
令牌桶 控制请求速率 API 限流
Context超时/取消 传播取消信号 RPC/HTTP 超时控制
生产者-消费者 解耦生产和消费速率 消息队列、任务调度
Or-Done 可中断的 channel 读取 需要优雅退出的管道
Semaphore 信号量控制并发数 连接池、资源限制

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6721

(0)
Walker的头像Walker
上一篇 12小时前
下一篇 2025年11月25日 13:00

相关推荐

  • Go工程师体系课 008

    订单及购物车 先从库存服务中将 srv 的服务代码框架复制过来,查找替换对应的名称(order_srv) 加密技术基础 对称加密(Symmetric Encryption) 原理: 使用同一个密钥进行加密和解密 就像一把钥匙,既能锁门也能开门 加密速度快,适合大量数据传输 使用场景: 本地文件加密 数据库内容加密 大量数据传输时的内容加密 内部系统间的快速通…

    后端开发 7小时前
    100
  • Go工程师体系课 006

    项目结构说明:user-web 模块 user-web 是 joyshop_api 工程中的用户服务 Web 层模块,负责处理用户相关的 HTTP 请求、参数校验、业务路由以及调用后端接口等功能。以下是目录结构说明: user-web/ ├── api/ # 控制器层,定义业务接口处理逻辑 ├── config/ # 配置模块,包含系统配置结构体及读取逻辑 …

  • 编程基础 0006_并发进阶_sync包与Context

    并发进阶:sync 包与 Context 一、sync 包详解 1. sync.Mutex 与 sync.RWMutex // Mutex: 互斥锁,同一时间只有一个 goroutine 能持有 var mu sync.Mutex var count int func increment() { mu.Lock() defer mu.Unlock() cou…

    后端开发 17小时前
    300
  • Go工程师体系课 003

    grpc grpc grpc-go grpc 无缝集成了 protobuf protobuf 习惯用 Json、XML 数据存储格式的你们,相信大多都没听过 Protocol Buffer。 Protocol Buffer 其实是 Google 出品的一种轻量 & 高效的结构化数据存储格式,性能比 Json、XML 真的强!太!多! protobuf…

    后端开发 12小时前
    100
  • Go资深工程师讲解(慕课) 000_课程目录索引

    Google资深工程师深度讲解Go语言 - 课程目录索引 课程来源:慕课网(百度网盘备份)讲师风格:从 Google 工程实践出发,注重底层原理和工程规范 完整视频章节与笔记对照表 章节 视频文件 笔记位置 状态 Ch1 课程介绍 1-1 课程导读 — 跳过 1-2 安装与环境 001.md > GOPATH、环境变量 已覆盖 Ch2 基础语法 2-1…

    后端开发 21小时前
    100
简体中文 繁体中文 English