編程基礎 0011_Go併發與分佈式實戰精華

Go 併發與分佈式實戰精華

參考:《Go 併發編程實戰》(郝林)、《Mastering Concurrency in Go》(Nathan Kozyra)、《Go 語言構建高併發分佈式系統實踐》

1. 併發原語深入

1.1 atomic 包

atomic 操作直接映射到 CPU 指令(如 LOCK CMPXCHG),比 mutex 快一個數量級。

import "sync/atomic"

// 基本操作
var counter int64
atomic.AddInt64(&counter, 1)           // 原子加
atomic.StoreInt64(&counter, 100)       // 原子存
val := atomic.LoadInt64(&counter)      // 原子讀
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS

// Go 1.19+ 泛型原子類型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()

// atomic.Value — 存儲任意類型(適合讀多寫少的配置熱更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)

CAS(Compare-And-Swap)自旋鎖模式:

// 無鎖計數器
func increment(addr *int64) {
    for {
        old := atomic.LoadInt64(addr)
        if atomic.CompareAndSwapInt64(addr, old, old+1) {
            return // 成功
        }
        // 失敗則重試(自旋)
        runtime.Gosched() // 讓出 CPU,避免空轉
    }
}

1.2 sync.Mutex 實現原理

Mutex 內部狀態(state int32 的位含義):
  bit0: locked    — 是否被鎖定
  bit1: woken     — 是否有喚醒的 goroutine
  bit2: starving  — 是否進入飢餓模式
  bit3+: waiters  — 等待者計數

兩種模式:
  正常模式:新到的 goroutine 與被喚醒的 goroutine 競爭鎖
           新到的更有優勢(已在 CPU 上運行),可能導致飢餓
  飢餓模式:等待超過 1ms 後切換
           鎖直接交給隊首等待者,新到的 goroutine 排隊
           最後一個等待者獲得鎖時切回正常模式

1.3 sync.RWMutex 實現

內部結構:
  w       Mutex    // 寫鎖
  writerSem uint32 // 寫等待信號量
  readerSem uint32 // 讀等待信號量
  readerCount int32 // 讀者計數(負數表示有寫者)
  readerWait  int32 // 寫者等待的讀者數

讀鎖:atomic.AddInt32(&readerCount, 1)
     如果 readerCount < 0,說明有寫者 → 等待 readerSem
讀解鎖:atomic.AddInt32(&readerCount, -1)

寫鎖:先獲取 w.Lock()
     將 readerCount 減去 rwmutexMaxReaders(變爲負數,阻止新讀者)
     等待現有讀者完成
寫解鎖:恢復 readerCount,喚醒等待的讀者

2. Channel 高級模式

2.1 Pipeline 模式

// 生成器 → 處理器 → 消費者
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // pipeline: generate → square → print
    for v := range square(generate(1, 2, 3, 4)) {
        fmt.Println(v)
    }
}

2.2 Fan-out / Fan-in

// Fan-out: 多個 goroutine 從同一個 channel 讀取
func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outs[i] = square(in) // 多個 worker 競爭讀取
    }
    return outs
}

// Fan-in: 將多個 channel 合併爲一個
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

2.3 或通道(Or-Channel)

任一 channel 完成即返回:

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

2.4 信號量模式

// 用帶緩衝 channel 實現信號量
type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

// 使用
sem := NewSemaphore(10) // 最多 10 個併發
for _, task := range tasks {
    sem.Acquire()
    go func(t Task) {
        defer sem.Release()
        process(t)
    }(task)
}

3. 併發安全與數據競爭

3.1 Race Detector

# 編譯時啓用競爭檢測(性能下降 2-10x,內存增加 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app

# 環境變量控制
GORACE="log_path=race.log halt_on_error=1" ./app

3.2 常見競爭陷阱

// 陷阱1:循環變量捕獲
for _, v := range items {
    go func() {
        process(v) // 所有 goroutine 共享同一個 v!
    }()
}
// 修復:傳參
for _, v := range items {
    go func(item Item) {
        process(item)
    }(v)
}
// Go 1.22+ 循環變量語義改變,每次迭代創建新變量

// 陷阱2:map 併發讀寫
// map 不是併發安全的!併發讀寫直接 panic
// 修復:sync.RWMutex 或 sync.Map

// 陷阱3:slice 併發 append
// append 可能觸發擴容,導致底層數組變化
// 修復:預分配 + 索引賦值,或加鎖

// 陷阱4:interface 賦值非原子
var i interface{}
go func() { i = "hello" }() // 非原子!
go func() { i = 42 }()     // 可能讀到半寫狀態
// 修復:atomic.Value

3.3 併發安全設計原則

原則 說明
不可變數據 只讀數據天然安全
值傳遞 傳遞副本而非指針
CSP 通過 channel 通信而非共享內存
COW Copy-On-Write,atomic.Value 存儲配置
限制可見性 不導出可變狀態
鎖粒度最小化 只鎖必要的代碼段

4. errgroup 與併發控制

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕獲變量
        g.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // 任一失敗 → 取消其他
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results[i] = string(body) // 索引賦值,無競爭
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

// 限制併發數
g.SetLimit(10) // 最多 10 個併發 goroutine

5. CSP vs Actor 模型

特徵 CSP (Go) Actor (Erlang/Akka)
通信方式 通過 channel 傳遞消息 直接向 actor 發送消息
標識 channel 有類型,匿名進程 actor 有地址/名稱
耦合度 松耦合(通過 channel 解耦) 需要知道接收者
阻塞 channel 操作可阻塞 消息發送不阻塞(郵箱)
適用場景 結構化併發、pipeline 分佈式系統、容錯
// Go 中模擬 Actor 模式
type Actor struct {
    mailbox chan Message
    handler func(Message)
}

func NewActor(handler func(Message)) *Actor {
    a := &Actor{
        mailbox: make(chan Message, 100),
        handler: handler,
    }
    go a.run()
    return a
}

func (a *Actor) run() {
    for msg := range a.mailbox {
        a.handler(msg)
    }
}

func (a *Actor) Send(msg Message) {
    a.mailbox <- msg
}

6. 分佈式系統基礎

6.1 一致性哈希

type ConsistentHash struct {
    ring     map[uint32]string // hash → 節點
    keys     []uint32          // 有序 hash 環
    replicas int               // 虛擬節點數
}

func (h *ConsistentHash) Add(nodes ...string) {
    for _, node := range nodes {
        for i := 0; i < h.replicas; i++ {
            hash := crc32.ChecksumIEEE(
                []byte(fmt.Sprintf("%s#%d", node, i)))
            h.ring[hash] = node
            h.keys = append(h.keys, hash)
        }
    }
    sort.Slice(h.keys, func(i, j int) bool {
        return h.keys[i] < h.keys[j]
    })
}

func (h *ConsistentHash) Get(key string) string {
    hash := crc32.ChecksumIEEE([]byte(key))
    idx := sort.Search(len(h.keys), func(i int) bool {
        return h.keys[i] >= hash
    })
    if idx >= len(h.keys) {
        idx = 0
    }
    return h.ring[h.keys[idx]]
}

6.2 分佈式鎖(Redis 實現)

func acquireLock(ctx context.Context, rdb *redis.Client,
    key string, ttl time.Duration) (string, error) {
    value := uuid.New().String()
    ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
    if err != nil {
        return "", err
    }
    if !ok {
        return "", errors.New("lock held by another")
    }
    return value, nil
}

func releaseLock(ctx context.Context, rdb *redis.Client,
    key, value string) error {
    // Lua 腳本保證原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
    `
    return rdb.Eval(ctx, script, []string{key}, value).Err()
}

6.3 服務發現模式

註冊中心模式:
  服務啓動 → 註冊到 Consul/etcd → 心跳保活
  客戶端   → 從註冊中心獲取服務列表 → 負載均衡 → 調用

客戶端發現 vs 服務端發現:
  客戶端發現:客戶端直接查註冊中心,自己做負載均衡(gRPC 默認)
  服務端發現:通過 LB 代理,LB 查註冊中心(Nginx、AWS ALB)

6.4 負載均衡策略

// 輪詢
type RoundRobin struct {
    addrs []string
    next  uint64
}
func (r *RoundRobin) Pick() string {
    n := atomic.AddUint64(&r.next, 1)
    return r.addrs[n%uint64(len(r.addrs))]
}

// 加權輪詢、最少連接、一致性哈希等

7. 高併發系統設計模式

7.1 令牌桶限流

import "golang.org/x/time/rate"

// 每秒 100 個請求,突發 50
limiter := rate.NewLimiter(100, 50)

func handler(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Too Many Requests", 429)
        return
    }
    // 處理請求...
}

// 每用戶獨立限流
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
    if l, ok := limiters.Load(userID); ok {
        return l.(*rate.Limiter)
    }
    l := rate.NewLimiter(10, 5)
    limiters.Store(userID, l)
    return l
}

7.2 熔斷器模式

type CircuitBreaker struct {
    mu          sync.Mutex
    failures    int
    threshold   int
    state       string // "closed", "open", "half-open"
    lastFailure time.Time
    timeout     time.Duration
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    cb.mu.Unlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.threshold {
            cb.state = "open"
        }
        return err
    }
    cb.failures = 0
    cb.state = "closed"
    return nil
}

7.3 優雅退出

func main() {
    srv := &http.Server{Addr: ":8080"}

    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // 等待中斷信號
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    log.Println("Server exited")
}

7.4 Worker Pool

func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
    results := make(chan Result, workers)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- process(job)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

8. 併發調試工具

工具 用途
go run -race 數據競爭檢測
GODEBUG=schedtrace=1000 調度器狀態
runtime.NumGoroutine() goroutine 數量監控
pprof goroutine goroutine 洩漏分析
go tool trace 可視化調度和阻塞事件
dlv(Delve) Go 專用調試器

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6737

(0)
Walker的頭像Walker
上一篇 14小時前
下一篇 2025年11月25日 00:00

相關推薦

  • Go資深工程師講解(慕課) 004

    004 goroutine package main import ( "fmt" "time" ) func main() { for i:=0;i<10;i++{ go func(i int) { fmt.Printf("Hello from goroutine %d \n",i) // …

  • 編程基礎 0009_testing詳解

    Go testing 詳解 目錄 testing 包基礎 表格驅動測試 子測試 t.Run 基準測試 Benchmark 測試覆蓋率 TestMain httptest 包 Mock 和接口測試技巧 模糊測試 Fuzz 1. testing 包基礎 1.1 測試文件和函數命名規則 Go 測試遵循嚴格的命名約定: 測試文件以 _test.go 結尾(如 use…

    後端開發 20小時前
    400
  • Go工程師體系課 009

    其它一些功能 個人中心 收藏 管理收貨地址(增刪改查) 留言 拷貝inventory_srv--> userop_srv 查詢替換所有的inventory Elasticsearch 深度解析文檔 1. 什麼是Elasticsearch Elasticsearch是一個基於Apache Lucene構建的分佈式、RESTful搜索和分析引擎,能夠快速地…

    後端開發 8小時前
    100
  • Go工程師體系課 006

    項目結構說明:user-web 模塊 user-web 是 joyshop_api 工程中的用戶服務 Web 層模塊,負責處理用戶相關的 HTTP 請求、參數校驗、業務路由以及調用後端接口等功能。以下是目錄結構說明: user-web/ ├── api/ # 控制器層,定義業務接口處理邏輯 ├── config/ # 配置模塊,包含系統配置結構體及讀取邏輯 …

  • Go工程師體系課 015

    Docker 容器化 —— Go 項目實戰指南 一、Docker 核心概念 1.1 什麼是 Docker Docker 是一個開源的容器化平臺,它可以將應用程序及其所有依賴項打包到一個標準化的單元(容器)中,從而實現"一次構建,到處運行"。對於 Go 開發者而言,Docker 解決了以下痛點: 開發環境與生產環境不一致 依賴管理複雜(數據庫、緩存、消息隊列等…

簡體中文 繁體中文 English