編程基礎 0011_Go併發與分布式實戰精華

Go 併發與分布式實戰精華

參考:《Go 併發編程實戰》(郝林)、《Mastering Concurrency in Go》(Nathan Kozyra)、《Go 語言構建高併發分布式系統實踐》

1. 併發原語深入

1.1 atomic 包

atomic 操作直接映射到 CPU 指令(如 LOCK CMPXCHG),比 mutex 快一個數量級。

import "sync/atomic"

// 基本操作
var counter int64
atomic.AddInt64(&counter, 1)           // 原子加
atomic.StoreInt64(&counter, 100)       // 原子存
val := atomic.LoadInt64(&counter)      // 原子讀
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS

// Go 1.19+ 泛型原子類型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()

// atomic.Value — 存儲任意類型(適合讀多寫少的配置熱更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)

CAS(Compare-And-Swap)自旋鎖模式:

// 無鎖計數器
func increment(addr *int64) {
    for {
        old := atomic.LoadInt64(addr)
        if atomic.CompareAndSwapInt64(addr, old, old+1) {
            return // 成功
        }
        // 失敗則重試(自旋)
        runtime.Gosched() // 讓出 CPU,避免空轉
    }
}

1.2 sync.Mutex 實現原理

Mutex 內部狀態(state int32 的位含義):
  bit0: locked    — 是否被鎖定
  bit1: woken     — 是否有喚醒的 goroutine
  bit2: starving  — 是否進入飢餓模式
  bit3+: waiters  — 等待者計數

兩種模式:
  正常模式:新到的 goroutine 與被喚醒的 goroutine 競爭鎖
           新到的更有優勢(已在 CPU 上運行),可能導致飢餓
  飢餓模式:等待超過 1ms 後切換
           鎖直接交給隊首等待者,新到的 goroutine 排隊
           最後一個等待者獲得鎖時切回正常模式

1.3 sync.RWMutex 實現

內部結構:
  w       Mutex    // 寫鎖
  writerSem uint32 // 寫等待信號量
  readerSem uint32 // 讀等待信號量
  readerCount int32 // 讀者計數(負數表示有寫者)
  readerWait  int32 // 寫者等待的讀者數

讀鎖:atomic.AddInt32(&readerCount, 1)
     如果 readerCount < 0,說明有寫者 → 等待 readerSem
讀解鎖:atomic.AddInt32(&readerCount, -1)

寫鎖:先獲取 w.Lock()
     將 readerCount 減去 rwmutexMaxReaders(變為負數,阻止新讀者)
     等待現有讀者完成
寫解鎖:恢復 readerCount,喚醒等待的讀者

2. Channel 高級模式

2.1 Pipeline 模式

// 生成器 → 處理器 → 消費者
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // pipeline: generate → square → print
    for v := range square(generate(1, 2, 3, 4)) {
        fmt.Println(v)
    }
}

2.2 Fan-out / Fan-in

// Fan-out: 多個 goroutine 從同一個 channel 讀取
func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outs[i] = square(in) // 多個 worker 競爭讀取
    }
    return outs
}

// Fan-in: 將多個 channel 合併為一個
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

2.3 或通道(Or-Channel)

任一 channel 完成即返回:

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

2.4 信號量模式

// 用帶緩衝 channel 實現信號量
type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

// 使用
sem := NewSemaphore(10) // 最多 10 個併發
for _, task := range tasks {
    sem.Acquire()
    go func(t Task) {
        defer sem.Release()
        process(t)
    }(task)
}

3. 併發安全與數據競爭

3.1 Race Detector

# 編譯時啓用競爭檢測(性能下降 2-10x,內存增加 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app

# 環境變量控制
GORACE="log_path=race.log halt_on_error=1" ./app

3.2 常見競爭陷阱

// 陷阱1:循環變量捕獲
for _, v := range items {
    go func() {
        process(v) // 所有 goroutine 共享同一個 v!
    }()
}
// 修復:傳參
for _, v := range items {
    go func(item Item) {
        process(item)
    }(v)
}
// Go 1.22+ 循環變量語義改變,每次迭代創建新變量

// 陷阱2:map 併發讀寫
// map 不是併發安全的!併發讀寫直接 panic
// 修復:sync.RWMutex 或 sync.Map

// 陷阱3:slice 併發 append
// append 可能觸發擴容,導致底層數組變化
// 修復:預分配 + 索引賦值,或加鎖

// 陷阱4:interface 賦值非原子
var i interface{}
go func() { i = "hello" }() // 非原子!
go func() { i = 42 }()     // 可能讀到半寫狀態
// 修復:atomic.Value

3.3 併發安全設計原則

原則 說明
不可變數據 只讀數據天然安全
值傳遞 傳遞副本而非指針
CSP 通過 channel 通信而非共享內存
COW Copy-On-Write,atomic.Value 存儲配置
限制可見性 不導出可變狀態
鎖粒度最小化 只鎖必要的代碼段

4. errgroup 與併發控制

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕獲變量
        g.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // 任一失敗 → 取消其他
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results[i] = string(body) // 索引賦值,無競爭
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

// 限制併發數
g.SetLimit(10) // 最多 10 個併發 goroutine

5. CSP vs Actor 模型

特徵 CSP (Go) Actor (Erlang/Akka)
通信方式 通過 channel 傳遞消息 直接向 actor 發送消息
標識 channel 有類型,匿名進程 actor 有地址/名稱
耦合度 松耦合(通過 channel 解耦) 需要知道接收者
阻塞 channel 操作可阻塞 消息發送不阻塞(郵箱)
適用場景 結構化併發、pipeline 分布式系統、容錯
// Go 中模擬 Actor 模式
type Actor struct {
    mailbox chan Message
    handler func(Message)
}

func NewActor(handler func(Message)) *Actor {
    a := &Actor{
        mailbox: make(chan Message, 100),
        handler: handler,
    }
    go a.run()
    return a
}

func (a *Actor) run() {
    for msg := range a.mailbox {
        a.handler(msg)
    }
}

func (a *Actor) Send(msg Message) {
    a.mailbox <- msg
}

6. 分布式系統基礎

6.1 一致性哈希

type ConsistentHash struct {
    ring     map[uint32]string // hash → 節點
    keys     []uint32          // 有序 hash 環
    replicas int               // 虛擬節點數
}

func (h *ConsistentHash) Add(nodes ...string) {
    for _, node := range nodes {
        for i := 0; i < h.replicas; i++ {
            hash := crc32.ChecksumIEEE(
                []byte(fmt.Sprintf("%s#%d", node, i)))
            h.ring[hash] = node
            h.keys = append(h.keys, hash)
        }
    }
    sort.Slice(h.keys, func(i, j int) bool {
        return h.keys[i] < h.keys[j]
    })
}

func (h *ConsistentHash) Get(key string) string {
    hash := crc32.ChecksumIEEE([]byte(key))
    idx := sort.Search(len(h.keys), func(i int) bool {
        return h.keys[i] >= hash
    })
    if idx >= len(h.keys) {
        idx = 0
    }
    return h.ring[h.keys[idx]]
}

6.2 分布式鎖(Redis 實現)

func acquireLock(ctx context.Context, rdb *redis.Client,
    key string, ttl time.Duration) (string, error) {
    value := uuid.New().String()
    ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
    if err != nil {
        return "", err
    }
    if !ok {
        return "", errors.New("lock held by another")
    }
    return value, nil
}

func releaseLock(ctx context.Context, rdb *redis.Client,
    key, value string) error {
    // Lua 腳本保證原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
    `
    return rdb.Eval(ctx, script, []string{key}, value).Err()
}

6.3 服務發現模式

註冊中心模式:
  服務啓動 → 註冊到 Consul/etcd → 心跳保活
  客戶端   → 從註冊中心獲取服務列表 → 負載均衡 → 調用

客戶端發現 vs 服務端發現:
  客戶端發現:客戶端直接查註冊中心,自己做負載均衡(gRPC 默認)
  服務端發現:通過 LB 代理,LB 查註冊中心(Nginx、AWS ALB)

6.4 負載均衡策略

// 輪詢
type RoundRobin struct {
    addrs []string
    next  uint64
}
func (r *RoundRobin) Pick() string {
    n := atomic.AddUint64(&r.next, 1)
    return r.addrs[n%uint64(len(r.addrs))]
}

// 加權輪詢、最少連接、一致性哈希等

7. 高併發系統設計模式

7.1 令牌桶限流

import "golang.org/x/time/rate"

// 每秒 100 個請求,突發 50
limiter := rate.NewLimiter(100, 50)

func handler(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Too Many Requests", 429)
        return
    }
    // 處理請求...
}

// 每用戶獨立限流
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
    if l, ok := limiters.Load(userID); ok {
        return l.(*rate.Limiter)
    }
    l := rate.NewLimiter(10, 5)
    limiters.Store(userID, l)
    return l
}

7.2 熔斷器模式

type CircuitBreaker struct {
    mu          sync.Mutex
    failures    int
    threshold   int
    state       string // "closed", "open", "half-open"
    lastFailure time.Time
    timeout     time.Duration
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    cb.mu.Unlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.threshold {
            cb.state = "open"
        }
        return err
    }
    cb.failures = 0
    cb.state = "closed"
    return nil
}

7.3 優雅退出

func main() {
    srv := &http.Server{Addr: ":8080"}

    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // 等待中斷信號
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    log.Println("Server exited")
}

7.4 Worker Pool

func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
    results := make(chan Result, workers)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- process(job)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

8. 併發調試工具

工具 用途
go run -race 數據競爭檢測
GODEBUG=schedtrace=1000 調度器狀態
runtime.NumGoroutine() goroutine 數量監控
pprof goroutine goroutine 洩漏分析
go tool trace 可視化調度和阻塞事件
dlv(Delve) Go 專用調試器

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6737

(0)
Walker的頭像Walker
上一篇 11小時前
下一篇 2025年11月25日 00:00

相關推薦

  • Go日積月累 go-s3-upload-example

    Go 語言實現文件上傳到 AWS S3 示例 本示例演示如何使用 Go 和 AWS SDK v2 將本地文件上傳到 Amazon S3。 🧾 前提條件 已擁有 AWS 賬號; 已創建 S3 Bucket; 已配置 AWS 憑證(通過 aws configure 或設置環境變量); 已準備本地文件(如 test.jpg); 📦 安裝依賴 go mod init…

  • Go資深工程師講解(慕課) 008_GMP調度器與Go設計哲學

    Go GMP 調度器與設計哲學 對應視頻 9-2 go語言的調度器、18-1 體會Go語言的設計、18-2 課程總結 1. Go 調度器演進 1.0 時代:單線程調度器(Go 0.x) 只有一個線程運行 goroutine 所有 goroutine 排隊等待 無法利用多核 1.1 時代:多線程調度器(Go 1.0) 引入多線程 但全局鎖競爭嚴重,性能瓶頸 1…

  • Go工程師體系課 018

    API 網關與持續部署入門(Kong & Jenkins) 對應資料目錄《第 2 章 Jenkins 入門》《第 3 章 通過 Jenkins 部署服務》,整理 Kong 與 Jenkins 在企業級持續交付中的實戰路徑。即便零基礎,也能順著步驟搭建出自己的網關 + 持續部署流水線。 課前導覽:甚麼是 API 網關 API 網關位於客戶端與後端微服務…

  • 編程基礎 0005_錯誤處理進階

    Go 錯誤處理進階 目錄 Go 錯誤處理哲學 error 接口本質 自定義錯誤類型 fmt.Errorf 與 %w 包裝錯誤 errors.Is 和 errors.As 哨兵錯誤模式 錯誤處理最佳實踐 實際項目中的錯誤處理模式 1. Go 錯誤處理哲學 1.1 與 try-catch 的根本區別 在 Java、Python、C++ 等語言中,異常處理依賴 t…

    後端開發 16小時前
    700
  • Go工程師體系課 003

    grpc grpc grpc-go grpc 無縫集成了 protobuf protobuf 習慣用 Json、XML 數據存儲格式的你們,相信大多都沒聽過 Protocol Buffer。 Protocol Buffer 其實是 Google 出品的一種輕量 & 高效的結構化數據存儲格式,性能比 Json、XML 真的強!太!多! protobuf…

簡體中文 繁體中文 English