編程基礎 0011_Go併發與分布式實戰精華

Go 併發與分布式實戰精華

參考:《Go 併發編程實戰》(郝林)、《Mastering Concurrency in Go》(Nathan Kozyra)、《Go 語言構建高併發分布式系統實踐》

1. 併發原語深入

1.1 atomic 包

atomic 操作直接映射到 CPU 指令(如 LOCK CMPXCHG),比 mutex 快一個數量級。

import "sync/atomic"

// 基本操作
var counter int64
atomic.AddInt64(&counter, 1)           // 原子加
atomic.StoreInt64(&counter, 100)       // 原子存
val := atomic.LoadInt64(&counter)      // 原子讀
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS

// Go 1.19+ 泛型原子類型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()

// atomic.Value — 存儲任意類型(適合讀多寫少的配置熱更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)

CAS(Compare-And-Swap)自旋鎖模式:

// 無鎖計數器
func increment(addr *int64) {
    for {
        old := atomic.LoadInt64(addr)
        if atomic.CompareAndSwapInt64(addr, old, old+1) {
            return // 成功
        }
        // 失敗則重試(自旋)
        runtime.Gosched() // 讓出 CPU,避免空轉
    }
}

1.2 sync.Mutex 實現原理

Mutex 內部狀態(state int32 的位含義):
  bit0: locked    — 是否被鎖定
  bit1: woken     — 是否有喚醒的 goroutine
  bit2: starving  — 是否進入飢餓模式
  bit3+: waiters  — 等待者計數

兩種模式:
  正常模式:新到的 goroutine 與被喚醒的 goroutine 競爭鎖
           新到的更有優勢(已在 CPU 上運行),可能導致飢餓
  飢餓模式:等待超過 1ms 後切換
           鎖直接交給隊首等待者,新到的 goroutine 排隊
           最後一個等待者獲得鎖時切回正常模式

1.3 sync.RWMutex 實現

內部結構:
  w       Mutex    // 寫鎖
  writerSem uint32 // 寫等待信號量
  readerSem uint32 // 讀等待信號量
  readerCount int32 // 讀者計數(負數表示有寫者)
  readerWait  int32 // 寫者等待的讀者數

讀鎖:atomic.AddInt32(&readerCount, 1)
     如果 readerCount < 0,說明有寫者 → 等待 readerSem
讀解鎖:atomic.AddInt32(&readerCount, -1)

寫鎖:先獲取 w.Lock()
     將 readerCount 減去 rwmutexMaxReaders(變為負數,阻止新讀者)
     等待現有讀者完成
寫解鎖:恢復 readerCount,喚醒等待的讀者

2. Channel 高級模式

2.1 Pipeline 模式

// 生成器 → 處理器 → 消費者
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // pipeline: generate → square → print
    for v := range square(generate(1, 2, 3, 4)) {
        fmt.Println(v)
    }
}

2.2 Fan-out / Fan-in

// Fan-out: 多個 goroutine 從同一個 channel 讀取
func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outs[i] = square(in) // 多個 worker 競爭讀取
    }
    return outs
}

// Fan-in: 將多個 channel 合併為一個
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

2.3 或通道(Or-Channel)

任一 channel 完成即返回:

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

2.4 信號量模式

// 用帶緩衝 channel 實現信號量
type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

// 使用
sem := NewSemaphore(10) // 最多 10 個併發
for _, task := range tasks {
    sem.Acquire()
    go func(t Task) {
        defer sem.Release()
        process(t)
    }(task)
}

3. 併發安全與數據競爭

3.1 Race Detector

# 編譯時啓用競爭檢測(性能下降 2-10x,內存增加 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app

# 環境變量控制
GORACE="log_path=race.log halt_on_error=1" ./app

3.2 常見競爭陷阱

// 陷阱1:循環變量捕獲
for _, v := range items {
    go func() {
        process(v) // 所有 goroutine 共享同一個 v!
    }()
}
// 修復:傳參
for _, v := range items {
    go func(item Item) {
        process(item)
    }(v)
}
// Go 1.22+ 循環變量語義改變,每次迭代創建新變量

// 陷阱2:map 併發讀寫
// map 不是併發安全的!併發讀寫直接 panic
// 修復:sync.RWMutex 或 sync.Map

// 陷阱3:slice 併發 append
// append 可能觸發擴容,導致底層數組變化
// 修復:預分配 + 索引賦值,或加鎖

// 陷阱4:interface 賦值非原子
var i interface{}
go func() { i = "hello" }() // 非原子!
go func() { i = 42 }()     // 可能讀到半寫狀態
// 修復:atomic.Value

3.3 併發安全設計原則

原則 說明
不可變數據 只讀數據天然安全
值傳遞 傳遞副本而非指針
CSP 通過 channel 通信而非共享內存
COW Copy-On-Write,atomic.Value 存儲配置
限制可見性 不導出可變狀態
鎖粒度最小化 只鎖必要的代碼段

4. errgroup 與併發控制

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕獲變量
        g.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // 任一失敗 → 取消其他
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results[i] = string(body) // 索引賦值,無競爭
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

// 限制併發數
g.SetLimit(10) // 最多 10 個併發 goroutine

5. CSP vs Actor 模型

特徵 CSP (Go) Actor (Erlang/Akka)
通信方式 通過 channel 傳遞消息 直接向 actor 發送消息
標識 channel 有類型,匿名進程 actor 有地址/名稱
耦合度 松耦合(通過 channel 解耦) 需要知道接收者
阻塞 channel 操作可阻塞 消息發送不阻塞(郵箱)
適用場景 結構化併發、pipeline 分布式系統、容錯
// Go 中模擬 Actor 模式
type Actor struct {
    mailbox chan Message
    handler func(Message)
}

func NewActor(handler func(Message)) *Actor {
    a := &Actor{
        mailbox: make(chan Message, 100),
        handler: handler,
    }
    go a.run()
    return a
}

func (a *Actor) run() {
    for msg := range a.mailbox {
        a.handler(msg)
    }
}

func (a *Actor) Send(msg Message) {
    a.mailbox <- msg
}

6. 分布式系統基礎

6.1 一致性哈希

type ConsistentHash struct {
    ring     map[uint32]string // hash → 節點
    keys     []uint32          // 有序 hash 環
    replicas int               // 虛擬節點數
}

func (h *ConsistentHash) Add(nodes ...string) {
    for _, node := range nodes {
        for i := 0; i < h.replicas; i++ {
            hash := crc32.ChecksumIEEE(
                []byte(fmt.Sprintf("%s#%d", node, i)))
            h.ring[hash] = node
            h.keys = append(h.keys, hash)
        }
    }
    sort.Slice(h.keys, func(i, j int) bool {
        return h.keys[i] < h.keys[j]
    })
}

func (h *ConsistentHash) Get(key string) string {
    hash := crc32.ChecksumIEEE([]byte(key))
    idx := sort.Search(len(h.keys), func(i int) bool {
        return h.keys[i] >= hash
    })
    if idx >= len(h.keys) {
        idx = 0
    }
    return h.ring[h.keys[idx]]
}

6.2 分布式鎖(Redis 實現)

func acquireLock(ctx context.Context, rdb *redis.Client,
    key string, ttl time.Duration) (string, error) {
    value := uuid.New().String()
    ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
    if err != nil {
        return "", err
    }
    if !ok {
        return "", errors.New("lock held by another")
    }
    return value, nil
}

func releaseLock(ctx context.Context, rdb *redis.Client,
    key, value string) error {
    // Lua 腳本保證原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
    `
    return rdb.Eval(ctx, script, []string{key}, value).Err()
}

6.3 服務發現模式

註冊中心模式:
  服務啓動 → 註冊到 Consul/etcd → 心跳保活
  客戶端   → 從註冊中心獲取服務列表 → 負載均衡 → 調用

客戶端發現 vs 服務端發現:
  客戶端發現:客戶端直接查註冊中心,自己做負載均衡(gRPC 默認)
  服務端發現:通過 LB 代理,LB 查註冊中心(Nginx、AWS ALB)

6.4 負載均衡策略

// 輪詢
type RoundRobin struct {
    addrs []string
    next  uint64
}
func (r *RoundRobin) Pick() string {
    n := atomic.AddUint64(&r.next, 1)
    return r.addrs[n%uint64(len(r.addrs))]
}

// 加權輪詢、最少連接、一致性哈希等

7. 高併發系統設計模式

7.1 令牌桶限流

import "golang.org/x/time/rate"

// 每秒 100 個請求,突發 50
limiter := rate.NewLimiter(100, 50)

func handler(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Too Many Requests", 429)
        return
    }
    // 處理請求...
}

// 每用戶獨立限流
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
    if l, ok := limiters.Load(userID); ok {
        return l.(*rate.Limiter)
    }
    l := rate.NewLimiter(10, 5)
    limiters.Store(userID, l)
    return l
}

7.2 熔斷器模式

type CircuitBreaker struct {
    mu          sync.Mutex
    failures    int
    threshold   int
    state       string // "closed", "open", "half-open"
    lastFailure time.Time
    timeout     time.Duration
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    cb.mu.Unlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.threshold {
            cb.state = "open"
        }
        return err
    }
    cb.failures = 0
    cb.state = "closed"
    return nil
}

7.3 優雅退出

func main() {
    srv := &http.Server{Addr: ":8080"}

    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // 等待中斷信號
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    log.Println("Server exited")
}

7.4 Worker Pool

func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
    results := make(chan Result, workers)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- process(job)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

8. 併發調試工具

工具 用途
go run -race 數據競爭檢測
GODEBUG=schedtrace=1000 調度器狀態
runtime.NumGoroutine() goroutine 數量監控
pprof goroutine goroutine 洩漏分析
go tool trace 可視化調度和阻塞事件
dlv(Delve) Go 專用調試器

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6737

(0)
Walker的頭像Walker
上一篇 2026年3月8日 15:11
下一篇 2026年3月9日 12:56

相關推薦

  • 編程基礎 0006_併發進階_sync包與Context

    併發進階:sync 包與 Context 一、sync 包詳解 1. sync.Mutex 與 sync.RWMutex // Mutex: 互斥鎖,同一時間只有一個 goroutine 能持有 var mu sync.Mutex var count int func increment() { mu.Lock() defer mu.Unlock() cou…

    後端開發 2026年3月6日
    5800
  • Go工程師體系課 012

    Go 中集成 Elasticsearch 1. 客戶端庫選擇 1.1 主流 Go ES 客戶端 olivere/elastic:功能最全面,API 設計優雅,支持 ES 7.x/8.x elastic/go-elasticsearch:官方客戶端,輕量級,更接近原生 REST API go-elasticsearch/elasticsearch:社區維護的官…

    後端開發 2026年3月7日
    5200
  • Go工程師體系課 018

    API 網關與持續部署入門(Kong & Jenkins) 對應資料目錄《第 2 章 Jenkins 入門》《第 3 章 通過 Jenkins 部署服務》,整理 Kong 與 Jenkins 在企業級持續交付中的實戰路徑。即便零基礎,也能順著步驟搭建出自己的網關 + 持續部署流水線。 課前導覽:甚麼是 API 網關 API 網關位於客戶端與後端微服務…

    後端開發 2026年3月7日
    6400
  • Go工程師體系課 019

    Go 內存模型與 GC 1. 內存分配基礎 1.1 棧(Stack)與堆(Heap) ┌─────────────────────────────┐ │ 堆 (Heap) │ ← 動態分配,GC 管理 │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ obj │ │ obj │ │ obj │ │ │ └─────┘ └─────┘ └────…

    後端開發 2026年3月7日
    6100
  • Go工程師體系課 004

    需求分析 後台管理系統 商品管理 商品列表 商品分類 品牌管理 品牌分類 訂單管理 訂單列表 用戶信息管理 用戶列表 用戶地址 用戶留言 輪播圖管理 電商系統 登錄頁面 首頁 商品搜索 商品分類導航 輪播圖展示 推薦商品展示 商品詳情頁 商品圖片展示 商品描述 商品規格選擇 加入購物車 購物車 商品列表 數量調整 刪除商品 結算功能 用戶中心 訂單中心 我的…

    2026年3月7日
    5400
簡體中文 繁體中文 English