编程基础 0011_Go并发与分布式实战精华

Go 并发与分布式实战精华

参考:《Go 并发编程实战》(郝林)、《Mastering Concurrency in Go》(Nathan Kozyra)、《Go 语言构建高并发分布式系统实践》

1. 并发原语深入

1.1 atomic 包

atomic 操作直接映射到 CPU 指令(如 LOCK CMPXCHG),比 mutex 快一个数量级。

import "sync/atomic"

// 基本操作
var counter int64
atomic.AddInt64(&counter, 1)           // 原子加
atomic.StoreInt64(&counter, 100)       // 原子存
val := atomic.LoadInt64(&counter)      // 原子读
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS

// Go 1.19+ 泛型原子类型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()

// atomic.Value — 存储任意类型(适合读多写少的配置热更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)

CAS(Compare-And-Swap)自旋锁模式:

// 无锁计数器
func increment(addr *int64) {
    for {
        old := atomic.LoadInt64(addr)
        if atomic.CompareAndSwapInt64(addr, old, old+1) {
            return // 成功
        }
        // 失败则重试(自旋)
        runtime.Gosched() // 让出 CPU,避免空转
    }
}

1.2 sync.Mutex 实现原理

Mutex 内部状态(state int32 的位含义):
  bit0: locked    — 是否被锁定
  bit1: woken     — 是否有唤醒的 goroutine
  bit2: starving  — 是否进入饥饿模式
  bit3+: waiters  — 等待者计数

两种模式:
  正常模式:新到的 goroutine 与被唤醒的 goroutine 竞争锁
           新到的更有优势(已在 CPU 上运行),可能导致饥饿
  饥饿模式:等待超过 1ms 后切换
           锁直接交给队首等待者,新到的 goroutine 排队
           最后一个等待者获得锁时切回正常模式

1.3 sync.RWMutex 实现

内部结构:
  w       Mutex    // 写锁
  writerSem uint32 // 写等待信号量
  readerSem uint32 // 读等待信号量
  readerCount int32 // 读者计数(负数表示有写者)
  readerWait  int32 // 写者等待的读者数

读锁:atomic.AddInt32(&readerCount, 1)
     如果 readerCount < 0,说明有写者 → 等待 readerSem
读解锁:atomic.AddInt32(&readerCount, -1)

写锁:先获取 w.Lock()
     将 readerCount 减去 rwmutexMaxReaders(变为负数,阻止新读者)
     等待现有读者完成
写解锁:恢复 readerCount,唤醒等待的读者

2. Channel 高级模式

2.1 Pipeline 模式

// 生成器 → 处理器 → 消费者
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // pipeline: generate → square → print
    for v := range square(generate(1, 2, 3, 4)) {
        fmt.Println(v)
    }
}

2.2 Fan-out / Fan-in

// Fan-out: 多个 goroutine 从同一个 channel 读取
func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outs[i] = square(in) // 多个 worker 竞争读取
    }
    return outs
}

// Fan-in: 将多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

2.3 或通道(Or-Channel)

任一 channel 完成即返回:

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

2.4 信号量模式

// 用带缓冲 channel 实现信号量
type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

// 使用
sem := NewSemaphore(10) // 最多 10 个并发
for _, task := range tasks {
    sem.Acquire()
    go func(t Task) {
        defer sem.Release()
        process(t)
    }(task)
}

3. 并发安全与数据竞争

3.1 Race Detector

# 编译时启用竞争检测(性能下降 2-10x,内存增加 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app

# 环境变量控制
GORACE="log_path=race.log halt_on_error=1" ./app

3.2 常见竞争陷阱

// 陷阱1:循环变量捕获
for _, v := range items {
    go func() {
        process(v) // 所有 goroutine 共享同一个 v!
    }()
}
// 修复:传参
for _, v := range items {
    go func(item Item) {
        process(item)
    }(v)
}
// Go 1.22+ 循环变量语义改变,每次迭代创建新变量

// 陷阱2:map 并发读写
// map 不是并发安全的!并发读写直接 panic
// 修复:sync.RWMutex 或 sync.Map

// 陷阱3:slice 并发 append
// append 可能触发扩容,导致底层数组变化
// 修复:预分配 + 索引赋值,或加锁

// 陷阱4:interface 赋值非原子
var i interface{}
go func() { i = "hello" }() // 非原子!
go func() { i = 42 }()     // 可能读到半写状态
// 修复:atomic.Value

3.3 并发安全设计原则

原则 说明
不可变数据 只读数据天然安全
值传递 传递副本而非指针
CSP 通过 channel 通信而非共享内存
COW Copy-On-Write,atomic.Value 存储配置
限制可见性 不导出可变状态
锁粒度最小化 只锁必要的代码段

4. errgroup 与并发控制

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获变量
        g.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // 任一失败 → 取消其他
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results[i] = string(body) // 索引赋值,无竞争
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

// 限制并发数
g.SetLimit(10) // 最多 10 个并发 goroutine

5. CSP vs Actor 模型

特征 CSP (Go) Actor (Erlang/Akka)
通信方式 通过 channel 传递消息 直接向 actor 发送消息
标识 channel 有类型,匿名进程 actor 有地址/名称
耦合度 松耦合(通过 channel 解耦) 需要知道接收者
阻塞 channel 操作可阻塞 消息发送不阻塞(邮箱)
适用场景 结构化并发、pipeline 分布式系统、容错
// Go 中模拟 Actor 模式
type Actor struct {
    mailbox chan Message
    handler func(Message)
}

func NewActor(handler func(Message)) *Actor {
    a := &Actor{
        mailbox: make(chan Message, 100),
        handler: handler,
    }
    go a.run()
    return a
}

func (a *Actor) run() {
    for msg := range a.mailbox {
        a.handler(msg)
    }
}

func (a *Actor) Send(msg Message) {
    a.mailbox <- msg
}

6. 分布式系统基础

6.1 一致性哈希

type ConsistentHash struct {
    ring     map[uint32]string // hash → 节点
    keys     []uint32          // 有序 hash 环
    replicas int               // 虚拟节点数
}

func (h *ConsistentHash) Add(nodes ...string) {
    for _, node := range nodes {
        for i := 0; i < h.replicas; i++ {
            hash := crc32.ChecksumIEEE(
                []byte(fmt.Sprintf("%s#%d", node, i)))
            h.ring[hash] = node
            h.keys = append(h.keys, hash)
        }
    }
    sort.Slice(h.keys, func(i, j int) bool {
        return h.keys[i] < h.keys[j]
    })
}

func (h *ConsistentHash) Get(key string) string {
    hash := crc32.ChecksumIEEE([]byte(key))
    idx := sort.Search(len(h.keys), func(i int) bool {
        return h.keys[i] >= hash
    })
    if idx >= len(h.keys) {
        idx = 0
    }
    return h.ring[h.keys[idx]]
}

6.2 分布式锁(Redis 实现)

func acquireLock(ctx context.Context, rdb *redis.Client,
    key string, ttl time.Duration) (string, error) {
    value := uuid.New().String()
    ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
    if err != nil {
        return "", err
    }
    if !ok {
        return "", errors.New("lock held by another")
    }
    return value, nil
}

func releaseLock(ctx context.Context, rdb *redis.Client,
    key, value string) error {
    // Lua 脚本保证原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
    `
    return rdb.Eval(ctx, script, []string{key}, value).Err()
}

6.3 服务发现模式

注册中心模式:
  服务启动 → 注册到 Consul/etcd → 心跳保活
  客户端   → 从注册中心获取服务列表 → 负载均衡 → 调用

客户端发现 vs 服务端发现:
  客户端发现:客户端直接查注册中心,自己做负载均衡(gRPC 默认)
  服务端发现:通过 LB 代理,LB 查注册中心(Nginx、AWS ALB)

6.4 负载均衡策略

// 轮询
type RoundRobin struct {
    addrs []string
    next  uint64
}
func (r *RoundRobin) Pick() string {
    n := atomic.AddUint64(&r.next, 1)
    return r.addrs[n%uint64(len(r.addrs))]
}

// 加权轮询、最少连接、一致性哈希等

7. 高并发系统设计模式

7.1 令牌桶限流

import "golang.org/x/time/rate"

// 每秒 100 个请求,突发 50
limiter := rate.NewLimiter(100, 50)

func handler(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Too Many Requests", 429)
        return
    }
    // 处理请求...
}

// 每用户独立限流
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
    if l, ok := limiters.Load(userID); ok {
        return l.(*rate.Limiter)
    }
    l := rate.NewLimiter(10, 5)
    limiters.Store(userID, l)
    return l
}

7.2 熔断器模式

type CircuitBreaker struct {
    mu          sync.Mutex
    failures    int
    threshold   int
    state       string // "closed", "open", "half-open"
    lastFailure time.Time
    timeout     time.Duration
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    cb.mu.Unlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.threshold {
            cb.state = "open"
        }
        return err
    }
    cb.failures = 0
    cb.state = "closed"
    return nil
}

7.3 优雅退出

func main() {
    srv := &http.Server{Addr: ":8080"}

    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // 等待中断信号
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    log.Println("Server exited")
}

7.4 Worker Pool

func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
    results := make(chan Result, workers)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- process(job)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

8. 并发调试工具

工具 用途
go run -race 数据竞争检测
GODEBUG=schedtrace=1000 调度器状态
runtime.NumGoroutine() goroutine 数量监控
pprof goroutine goroutine 泄漏分析
go tool trace 可视化调度和阻塞事件
dlv(Delve) Go 专用调试器

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6737

(0)
Walker的头像Walker
上一篇 2026年3月8日 15:11
下一篇 2026年3月9日 12:56

相关推荐

  • 编程基础 0006_并发进阶_sync包与Context

    并发进阶:sync 包与 Context 一、sync 包详解 1. sync.Mutex 与 sync.RWMutex // Mutex: 互斥锁,同一时间只有一个 goroutine 能持有 var mu sync.Mutex var count int func increment() { mu.Lock() defer mu.Unlock() cou…

    后端开发 2026年3月6日
    6200
  • Go工程师体系课 012

    Go 中集成 Elasticsearch 1. 客户端库选择 1.1 主流 Go ES 客户端 olivere/elastic:功能最全面,API 设计优雅,支持 ES 7.x/8.x elastic/go-elasticsearch:官方客户端,轻量级,更接近原生 REST API go-elasticsearch/elasticsearch:社区维护的官…

    后端开发 2026年3月7日
    5800
  • Go工程师体系课 018

    API 网关与持续部署入门(Kong & Jenkins) 对应资料目录《第 2 章 Jenkins 入门》《第 3 章 通过 Jenkins 部署服务》,整理 Kong 与 Jenkins 在企业级持续交付中的实战路径。即便零基础,也能顺着步骤搭建出自己的网关 + 持续部署流水线。 课前导览:什么是 API 网关 API 网关位于客户端与后端微服务…

    后端开发 2026年3月7日
    6700
  • Go工程师体系课 019

    Go 内存模型与 GC 1. 内存分配基础 1.1 栈(Stack)与堆(Heap) ┌─────────────────────────────┐ │ 堆 (Heap) │ ← 动态分配,GC 管理 │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ obj │ │ obj │ │ obj │ │ │ └─────┘ └─────┘ └────…

    后端开发 2026年3月7日
    6500
  • Go工程师体系课 004

    需求分析 后台管理系统 商品管理 商品列表 商品分类 品牌管理 品牌分类 订单管理 订单列表 用户信息管理 用户列表 用户地址 用户留言 轮播图管理 电商系统 登录页面 首页 商品搜索 商品分类导航 轮播图展示 推荐商品展示 商品详情页 商品图片展示 商品描述 商品规格选择 加入购物车 购物车 商品列表 数量调整 删除商品 结算功能 用户中心 订单中心 我的…

    2026年3月7日
    5900
简体中文 繁体中文 English