編程基礎 0007_併發模式

Go 併發模式

常見的 Go 併發設計模式,每個模式都有完整可運行示例和適用場景說明

1. Worker Pool 模式

固定數量的 worker goroutine 從共享的任務隊列中取任務執行,控制併發度。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d 開始處理任務 %d\n", id, j)
        time.Sleep(time.Second) // 模擬耗時操作
        results <- j * 2
        fmt.Printf("Worker %d 完成任務 %d\n", id, j)
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 啓動 worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 發送任務
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 關閉任務通道,worker 的 range 會結束

    // 等待所有 worker 完成後關閉 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集結果
    for r := range results {
        fmt.Println("結果:", r)
    }
}

適用場景: HTTP 請求處理、批量數據處理、限制對外部服務的併發調用數

2. Pipeline 模式

多個階段串聯,每個階段是一組 goroutine,通過 channel 傳遞數據。

package main

import "fmt"

// 階段1: 生成數據
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 階段2: 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 階段3: 加倍
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // 串聯管道: generate -> square -> double
    ch := double(square(generate(2, 3, 4)))
    for v := range ch {
        fmt.Println(v) // 8, 18, 32
    }
}

適用場景: 數據處理流水線、ETL、日誌處理管道

3. Fan-out / Fan-in 模式

  • Fan-out: 多個 goroutine 從同一個 channel 讀取(分攤工作)
  • Fan-in: 多個 channel 的輸出合併到一個 channel
package main

import (
    "fmt"
    "sync"
    "time"
)

// 生產者
func producer(id int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(50 * time.Millisecond)
            out <- id*100 + i
        }
        close(out)
    }()
    return out
}

// Fan-in: 合併多個 channel 到一個
func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Fan-out: 3 個生產者並行工作
    c1 := producer(1)
    c2 := producer(2)
    c3 := producer(3)

    // Fan-in: 合併結果
    for v := range fanIn(c1, c2, c3) {
        fmt.Println(v)
    }
}

適用場景: 多數據源聚合、並行 API 調用結果合併

4. errgroup 包

golang.org/x/sync/errgroup — 併發執行多個任務,任何一個出錯則取消全部。

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕獲循環變量
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()
            results[i] = resp.StatusCode
            return nil
        })
    }

    // 等待所有任務完成或第一個錯誤
    if err := g.Wait(); err != nil {
        fmt.Println("錯誤:", err)
        return
    }

    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

errgroup 限制併發數(SetLimit)

g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 個併發

for _, task := range tasks {
    task := task
    g.Go(func() error {
        return process(task)
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

適用場景: 併發 API 調用、批量操作需要收集錯誤、需要自動取消

5. 限流器模式

5.1 time.Ticker 簡單限流

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每 200ms 處理一個請求(5 QPS)
    limiter := time.NewTicker(200 * time.Millisecond)
    defer limiter.Stop()

    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)

    for req := range requests {
        <-limiter.C // 等待下一個 tick
        fmt.Printf("[%s] 處理請求 %d\n", time.Now().Format("04:05.000"), req)
    }
}

5.2 令牌桶限流

package main

import (
    "fmt"
    "sync"
    "time"
)

type TokenBucket struct {
    tokens   chan struct{}
    interval time.Duration
    stop     chan struct{}
}

func NewTokenBucket(rate int, burst int) *TokenBucket {
    tb := &TokenBucket{
        tokens:   make(chan struct{}, burst),
        interval: time.Second / time.Duration(rate),
        stop:     make(chan struct{}),
    }
    // 初始填滿
    for i := 0; i < burst; i++ {
        tb.tokens <- struct{}{}
    }
    // 持續補充令牌
    go func() {
        ticker := time.NewTicker(tb.interval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case tb.tokens <- struct{}{}:
                default: // 桶滿了,丟棄
                }
            case <-tb.stop:
                return
            }
        }
    }()
    return tb
}

func (tb *TokenBucket) Allow() bool {
    select {
    case <-tb.tokens:
        return true
    default:
        return false
    }
}

func (tb *TokenBucket) Wait() {
    <-tb.tokens
}

func (tb *TokenBucket) Close() {
    close(tb.stop)
}

func main() {
    bucket := NewTokenBucket(5, 3) // 5 QPS, 突發 3
    defer bucket.Close()

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            bucket.Wait() // 阻塞等待令牌
            fmt.Printf("[%s] 請求 %d 獲得令牌\n", time.Now().Format("04:05.000"), id)
        }(i)
    }
    wg.Wait()
}

適用場景: API 限流、防止下游過載、控制資源消耗

6. 超時與取消模式

6.1 context.WithTimeout

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(3 * time.Second): // 模擬慢操作
        return "操作完成", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // 設置 1 秒超時
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("超時或取消:", err) // context deadline exceeded
        return
    }
    fmt.Println(result)
}

6.2 手動取消 + 多個 goroutine

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // 啓動多個 worker
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d 收到取消信號\n", id)
                    return
                default:
                    // 工作...
                    time.Sleep(500 * time.Millisecond)
                    fmt.Printf("Worker %d 工作中...\n", id)
                }
            }
        }(i)
    }

    time.Sleep(2 * time.Second)
    cancel() // 取消所有 worker
    time.Sleep(100 * time.Millisecond)
    fmt.Println("所有 worker 已停止")
}

7. 生產者-消費者模式

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Order struct {
    ID    int
    Price float64
}

func producer(orders chan<- Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i <= 20; i++ {
        order := Order{
            ID:    i,
            Price: float64(rand.Intn(1000)) / 10.0,
        }
        orders <- order
        fmt.Printf("生產訂單 #%d (%.1f元)\n", order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    }
}

func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for order := range orders {
        fmt.Printf("  消費者%d 處理訂單 #%d (%.1f元)\n", id, order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
    }
}

func main() {
    orders := make(chan Order, 5) // 緩衝 5 個訂單

    var producerWg, consumerWg sync.WaitGroup

    // 2 個生產者
    for i := 0; i < 2; i++ {
        producerWg.Add(1)
        go producer(orders, &producerWg)
    }

    // 3 個消費者
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go consumer(i, orders, &consumerWg)
    }

    // 等生產者完成後關閉 channel
    producerWg.Wait()
    close(orders)

    // 等消費者處理完
    consumerWg.Wait()
    fmt.Println("所有訂單處理完畢")
}

8. Or-Done Channel 模式

從一個 channel 讀取,但需要響應取消信號:

// orDone 包裝一個 channel,使其可以被 done 信號中斷
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    data := make(chan int)

    // 生產數據
    go func() {
        for i := 0; ; i++ {
            data <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 2 秒後取消
    go func() {
        time.Sleep(2 * time.Second)
        close(done)
    }()

    for v := range orDone(done, data) {
        fmt.Println(v)
    }
    fmt.Println("已取消")
}

9. Semaphore(信號量)模式

用帶緩衝的 channel 模擬信號量,控制最大併發數:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

func main() {
    sem := NewSemaphore(3) // 最多 3 個併發
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sem.Acquire()
            defer sem.Release()
            fmt.Printf("[%s] 任務 %d 開始\n", time.Now().Format("05.000"), id)
            time.Sleep(time.Second)
            fmt.Printf("[%s] 任務 %d 完成\n", time.Now().Format("05.000"), id)
        }(i)
    }
    wg.Wait()
}

模式速查表

模式 核心思想 典型場景
Worker Pool 固定 goroutine 數消費任務隊列 批量處理、限併發
Pipeline 多階段 channel 串聯 數據處理流水線
Fan-in/Fan-out 拆分並行+合併結果 多數據源聚合
errgroup 併發+錯誤收集+自動取消 並行 API 調用
令牌桶 控制請求速率 API 限流
Context超時/取消 傳播取消信號 RPC/HTTP 超時控制
生產者-消費者 解耦生產和消費速率 消息隊列、任務調度
Or-Done 可中斷的 channel 讀取 需要優雅退出的管道
Semaphore 信號量控制併發數 連接池、資源限制

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6721

(0)
Walker的頭像Walker
上一篇 12小時前
下一篇 2025年11月25日 13:00

相關推薦

  • Go工程師體系課 008

    訂單及購物車 先從庫存服務中將 srv 的服務代碼框架複製過來,查找替換對應的名稱(order_srv) 加密技術基礎 對稱加密(Symmetric Encryption) 原理: 使用同一個密鑰進行加密和解密 就像一把鑰匙,既能鎖門也能開門 加密速度快,適合大量數據傳輸 使用場景: 本地文件加密 數據庫內容加密 大量數據傳輸時的內容加密 內部系統間的快速通…

    後端開發 7小時前
    100
  • Go工程師體系課 006

    項目結構說明:user-web 模塊 user-web 是 joyshop_api 工程中的用戶服務 Web 層模塊,負責處理用戶相關的 HTTP 請求、參數校驗、業務路由以及調用後端接口等功能。以下是目錄結構說明: user-web/ ├── api/ # 控制器層,定義業務接口處理邏輯 ├── config/ # 配置模塊,包含系統配置結構體及讀取邏輯 …

  • 編程基礎 0006_併發進階_sync包與Context

    併發進階:sync 包與 Context 一、sync 包詳解 1. sync.Mutex 與 sync.RWMutex // Mutex: 互斥鎖,同一時間只有一個 goroutine 能持有 var mu sync.Mutex var count int func increment() { mu.Lock() defer mu.Unlock() cou…

    後端開發 17小時前
    200
  • Go工程師體系課 003

    grpc grpc grpc-go grpc 無縫集成了 protobuf protobuf 習慣用 Json、XML 數據存儲格式的你們,相信大多都沒聽過 Protocol Buffer。 Protocol Buffer 其實是 Google 出品的一種輕量 & 高效的結構化數據存儲格式,性能比 Json、XML 真的強!太!多! protobuf…

    後端開發 12小時前
    100
  • Go資深工程師講解(慕課) 000_課程目錄索引

    Google資深工程師深度講解Go語言 - 課程目錄索引 課程來源:慕課網(百度網盤備份)講師風格:從 Google 工程實踐出發,注重底層原理和工程規範 完整視頻章節與筆記對照表 章節 視頻文件 筆記位置 狀態 Ch1 課程介紹 1-1 課程導讀 — 跳過 1-2 安裝與環境 001.md > GOPATH、環境變量 已覆蓋 Ch2 基礎語法 2-1…

    後端開發 21小時前
    100
簡體中文 繁體中文 English