編程基礎 0007_併發模式

Go 併發模式

常見的 Go 併發設計模式,每個模式都有完整可運行示例和適用場景說明

1. Worker Pool 模式

固定數量的 worker goroutine 從共享的任務隊列中取任務執行,控制併發度。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d 開始處理任務 %d\n", id, j)
        time.Sleep(time.Second) // 模擬耗時操作
        results <- j * 2
        fmt.Printf("Worker %d 完成任務 %d\n", id, j)
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 啓動 worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 發送任務
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 關閉任務通道,worker 的 range 會結束

    // 等待所有 worker 完成後關閉 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集結果
    for r := range results {
        fmt.Println("結果:", r)
    }
}

適用場景: HTTP 請求處理、批量數據處理、限制對外部服務的併發調用數

2. Pipeline 模式

多個階段串聯,每個階段是一組 goroutine,通過 channel 傳遞數據。

package main

import "fmt"

// 階段1: 生成數據
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 階段2: 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 階段3: 加倍
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // 串聯管道: generate -> square -> double
    ch := double(square(generate(2, 3, 4)))
    for v := range ch {
        fmt.Println(v) // 8, 18, 32
    }
}

適用場景: 數據處理流水線、ETL、日誌處理管道

3. Fan-out / Fan-in 模式

  • Fan-out: 多個 goroutine 從同一個 channel 讀取(分攤工作)
  • Fan-in: 多個 channel 的輸出合併到一個 channel
package main

import (
    "fmt"
    "sync"
    "time"
)

// 生產者
func producer(id int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(50 * time.Millisecond)
            out <- id*100 + i
        }
        close(out)
    }()
    return out
}

// Fan-in: 合併多個 channel 到一個
func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Fan-out: 3 個生產者並行工作
    c1 := producer(1)
    c2 := producer(2)
    c3 := producer(3)

    // Fan-in: 合併結果
    for v := range fanIn(c1, c2, c3) {
        fmt.Println(v)
    }
}

適用場景: 多數據源聚合、並行 API 調用結果合併

4. errgroup 包

golang.org/x/sync/errgroup — 併發執行多個任務,任何一個出錯則取消全部。

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕獲循環變量
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()
            results[i] = resp.StatusCode
            return nil
        })
    }

    // 等待所有任務完成或第一個錯誤
    if err := g.Wait(); err != nil {
        fmt.Println("錯誤:", err)
        return
    }

    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

errgroup 限制併發數(SetLimit)

g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 個併發

for _, task := range tasks {
    task := task
    g.Go(func() error {
        return process(task)
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

適用場景: 併發 API 調用、批量操作需要收集錯誤、需要自動取消

5. 限流器模式

5.1 time.Ticker 簡單限流

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每 200ms 處理一個請求(5 QPS)
    limiter := time.NewTicker(200 * time.Millisecond)
    defer limiter.Stop()

    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)

    for req := range requests {
        <-limiter.C // 等待下一個 tick
        fmt.Printf("[%s] 處理請求 %d\n", time.Now().Format("04:05.000"), req)
    }
}

5.2 令牌桶限流

package main

import (
    "fmt"
    "sync"
    "time"
)

type TokenBucket struct {
    tokens   chan struct{}
    interval time.Duration
    stop     chan struct{}
}

func NewTokenBucket(rate int, burst int) *TokenBucket {
    tb := &TokenBucket{
        tokens:   make(chan struct{}, burst),
        interval: time.Second / time.Duration(rate),
        stop:     make(chan struct{}),
    }
    // 初始填滿
    for i := 0; i < burst; i++ {
        tb.tokens <- struct{}{}
    }
    // 持續補充令牌
    go func() {
        ticker := time.NewTicker(tb.interval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case tb.tokens <- struct{}{}:
                default: // 桶滿了,丟棄
                }
            case <-tb.stop:
                return
            }
        }
    }()
    return tb
}

func (tb *TokenBucket) Allow() bool {
    select {
    case <-tb.tokens:
        return true
    default:
        return false
    }
}

func (tb *TokenBucket) Wait() {
    <-tb.tokens
}

func (tb *TokenBucket) Close() {
    close(tb.stop)
}

func main() {
    bucket := NewTokenBucket(5, 3) // 5 QPS, 突發 3
    defer bucket.Close()

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            bucket.Wait() // 阻塞等待令牌
            fmt.Printf("[%s] 請求 %d 獲得令牌\n", time.Now().Format("04:05.000"), id)
        }(i)
    }
    wg.Wait()
}

適用場景: API 限流、防止下游過載、控制資源消耗

6. 超時與取消模式

6.1 context.WithTimeout

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(3 * time.Second): // 模擬慢操作
        return "操作完成", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // 設置 1 秒超時
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("超時或取消:", err) // context deadline exceeded
        return
    }
    fmt.Println(result)
}

6.2 手動取消 + 多個 goroutine

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // 啓動多個 worker
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d 收到取消信號\n", id)
                    return
                default:
                    // 工作...
                    time.Sleep(500 * time.Millisecond)
                    fmt.Printf("Worker %d 工作中...\n", id)
                }
            }
        }(i)
    }

    time.Sleep(2 * time.Second)
    cancel() // 取消所有 worker
    time.Sleep(100 * time.Millisecond)
    fmt.Println("所有 worker 已停止")
}

7. 生產者-消費者模式

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Order struct {
    ID    int
    Price float64
}

func producer(orders chan<- Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i <= 20; i++ {
        order := Order{
            ID:    i,
            Price: float64(rand.Intn(1000)) / 10.0,
        }
        orders <- order
        fmt.Printf("生產訂單 #%d (%.1f元)\n", order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    }
}

func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for order := range orders {
        fmt.Printf("  消費者%d 處理訂單 #%d (%.1f元)\n", id, order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
    }
}

func main() {
    orders := make(chan Order, 5) // 緩衝 5 個訂單

    var producerWg, consumerWg sync.WaitGroup

    // 2 個生產者
    for i := 0; i < 2; i++ {
        producerWg.Add(1)
        go producer(orders, &producerWg)
    }

    // 3 個消費者
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go consumer(i, orders, &consumerWg)
    }

    // 等生產者完成後關閉 channel
    producerWg.Wait()
    close(orders)

    // 等消費者處理完
    consumerWg.Wait()
    fmt.Println("所有訂單處理完畢")
}

8. Or-Done Channel 模式

從一個 channel 讀取,但需要響應取消信號:

// orDone 包裝一個 channel,使其可以被 done 信號中斷
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    data := make(chan int)

    // 生產數據
    go func() {
        for i := 0; ; i++ {
            data <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 2 秒後取消
    go func() {
        time.Sleep(2 * time.Second)
        close(done)
    }()

    for v := range orDone(done, data) {
        fmt.Println(v)
    }
    fmt.Println("已取消")
}

9. Semaphore(信號量)模式

用帶緩衝的 channel 模擬信號量,控制最大併發數:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

func main() {
    sem := NewSemaphore(3) // 最多 3 個併發
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sem.Acquire()
            defer sem.Release()
            fmt.Printf("[%s] 任務 %d 開始\n", time.Now().Format("05.000"), id)
            time.Sleep(time.Second)
            fmt.Printf("[%s] 任務 %d 完成\n", time.Now().Format("05.000"), id)
        }(i)
    }
    wg.Wait()
}

模式速查表

模式 核心思想 典型場景
Worker Pool 固定 goroutine 數消費任務隊列 批量處理、限併發
Pipeline 多階段 channel 串聯 數據處理流水線
Fan-in/Fan-out 拆分並行+合併結果 多數據源聚合
errgroup 併發+錯誤收集+自動取消 並行 API 調用
令牌桶 控制請求速率 API 限流
Context超時/取消 傳播取消信號 RPC/HTTP 超時控制
生產者-消費者 解耦生產和消費速率 消息隊列、任務調度
Or-Done 可中斷的 channel 讀取 需要優雅退出的管道
Semaphore 信號量控制併發數 連接池、資源限制

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6721

(0)
Walker的頭像Walker
上一篇 10小時前
下一篇 2025年11月25日 13:00

相關推薦

  • Go工程師體系課 017

    限流、熔斷與降級入門(含 Sentinel 實戰) 結合課件第 3 章(3-1 ~ 3-9)的視頻要點,整理一套面向初學者的服務保護指南,幫助理解“為甚麼需要限流、熔斷和降級”,以及如何用 Sentinel 快速上手。 學習路線速覽 3-1 理解服務雪崩與限流、熔斷、降級的背景 3-2 Sentinel 與 Hystrix 對比,明確技術選型 3-3 Sen…

  • Go工程師體系課 009

    其它一些功能 個人中心 收藏 管理收貨地址(增刪改查) 留言 拷貝inventory_srv--> userop_srv 查詢替換所有的inventory Elasticsearch 深度解析文檔 1. 甚麼是Elasticsearch Elasticsearch是一個基於Apache Lucene構建的分布式、RESTful搜索和分析引擎,能夠快速地…

    後端開發 4小時前
    000
  • Go工程師體系課 015

    Docker 容器化 —— Go 項目實戰指南 一、Docker 核心概念 1.1 甚麼是 Docker Docker 是一個開源的容器化平台,它可以將應用程序及其所有依賴項打包到一個標準化的單元(容器)中,從而實現"一次構建,到處運行"。對於 Go 開發者而言,Docker 解決了以下痛點: 開發環境與生產環境不一致 依賴管理複雜(數據庫、緩存、消息隊列等…

  • Go資深工程師講解(慕課) 005

    005 標準庫 http 使用 http 客戶端發送請求 使用 http.Client 控制請求頭 使用 httputil 簡化工作 package main import ( "fmt" "net/http" "net/http/httputil" ) func main() { resp, er…

    後端開發 21小時前
    100
  • 編程基礎 0001_基礎教程

    go 甚麼是 Go是一門併發支持、垃圾加收的編譯型系統編程語言,具有靜態編譯語言的高性能和動態語言的,主要特點如下 類型安全和內存安全 以非常直觀和極低代價的方案實現高併發 高效的垃圾回收機制 快速編譯(同時解決了 C 語言中頭文件太多的問題) UTF-8 支持 安裝 源碼安裝 標準包安裝 第三方安裝 標準包安裝,一路下一步。安裝完後,會自動添加如下環境變量…

    後端開發 12小時前
    600
簡體中文 繁體中文 English