编程基础 0011_Go并发与分布式实战精华

Go 并发与分布式实战精华

参考:《Go 并发编程实战》(郝林)、《Mastering Concurrency in Go》(Nathan Kozyra)、《Go 语言构建高并发分布式系统实践》

1. 并发原语深入

1.1 atomic 包

atomic 操作直接映射到 CPU 指令(如 LOCK CMPXCHG),比 mutex 快一个数量级。

import "sync/atomic"

// 基本操作
var counter int64
atomic.AddInt64(&counter, 1)           // 原子加
atomic.StoreInt64(&counter, 100)       // 原子存
val := atomic.LoadInt64(&counter)      // 原子读
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS

// Go 1.19+ 泛型原子类型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()

// atomic.Value — 存储任意类型(适合读多写少的配置热更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)

CAS(Compare-And-Swap)自旋锁模式:

// 无锁计数器
func increment(addr *int64) {
    for {
        old := atomic.LoadInt64(addr)
        if atomic.CompareAndSwapInt64(addr, old, old+1) {
            return // 成功
        }
        // 失败则重试(自旋)
        runtime.Gosched() // 让出 CPU,避免空转
    }
}

1.2 sync.Mutex 实现原理

Mutex 内部状态(state int32 的位含义):
  bit0: locked    — 是否被锁定
  bit1: woken     — 是否有唤醒的 goroutine
  bit2: starving  — 是否进入饥饿模式
  bit3+: waiters  — 等待者计数

两种模式:
  正常模式:新到的 goroutine 与被唤醒的 goroutine 竞争锁
           新到的更有优势(已在 CPU 上运行),可能导致饥饿
  饥饿模式:等待超过 1ms 后切换
           锁直接交给队首等待者,新到的 goroutine 排队
           最后一个等待者获得锁时切回正常模式

1.3 sync.RWMutex 实现

内部结构:
  w       Mutex    // 写锁
  writerSem uint32 // 写等待信号量
  readerSem uint32 // 读等待信号量
  readerCount int32 // 读者计数(负数表示有写者)
  readerWait  int32 // 写者等待的读者数

读锁:atomic.AddInt32(&readerCount, 1)
     如果 readerCount < 0,说明有写者 → 等待 readerSem
读解锁:atomic.AddInt32(&readerCount, -1)

写锁:先获取 w.Lock()
     将 readerCount 减去 rwmutexMaxReaders(变为负数,阻止新读者)
     等待现有读者完成
写解锁:恢复 readerCount,唤醒等待的读者

2. Channel 高级模式

2.1 Pipeline 模式

// 生成器 → 处理器 → 消费者
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // pipeline: generate → square → print
    for v := range square(generate(1, 2, 3, 4)) {
        fmt.Println(v)
    }
}

2.2 Fan-out / Fan-in

// Fan-out: 多个 goroutine 从同一个 channel 读取
func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outs[i] = square(in) // 多个 worker 竞争读取
    }
    return outs
}

// Fan-in: 将多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

2.3 或通道(Or-Channel)

任一 channel 完成即返回:

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

2.4 信号量模式

// 用带缓冲 channel 实现信号量
type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

// 使用
sem := NewSemaphore(10) // 最多 10 个并发
for _, task := range tasks {
    sem.Acquire()
    go func(t Task) {
        defer sem.Release()
        process(t)
    }(task)
}

3. 并发安全与数据竞争

3.1 Race Detector

# 编译时启用竞争检测(性能下降 2-10x,内存增加 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app

# 环境变量控制
GORACE="log_path=race.log halt_on_error=1" ./app

3.2 常见竞争陷阱

// 陷阱1:循环变量捕获
for _, v := range items {
    go func() {
        process(v) // 所有 goroutine 共享同一个 v!
    }()
}
// 修复:传参
for _, v := range items {
    go func(item Item) {
        process(item)
    }(v)
}
// Go 1.22+ 循环变量语义改变,每次迭代创建新变量

// 陷阱2:map 并发读写
// map 不是并发安全的!并发读写直接 panic
// 修复:sync.RWMutex 或 sync.Map

// 陷阱3:slice 并发 append
// append 可能触发扩容,导致底层数组变化
// 修复:预分配 + 索引赋值,或加锁

// 陷阱4:interface 赋值非原子
var i interface{}
go func() { i = "hello" }() // 非原子!
go func() { i = 42 }()     // 可能读到半写状态
// 修复:atomic.Value

3.3 并发安全设计原则

原则 说明
不可变数据 只读数据天然安全
值传递 传递副本而非指针
CSP 通过 channel 通信而非共享内存
COW Copy-On-Write,atomic.Value 存储配置
限制可见性 不导出可变状态
锁粒度最小化 只锁必要的代码段

4. errgroup 与并发控制

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获变量
        g.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // 任一失败 → 取消其他
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results[i] = string(body) // 索引赋值,无竞争
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

// 限制并发数
g.SetLimit(10) // 最多 10 个并发 goroutine

5. CSP vs Actor 模型

特征 CSP (Go) Actor (Erlang/Akka)
通信方式 通过 channel 传递消息 直接向 actor 发送消息
标识 channel 有类型,匿名进程 actor 有地址/名称
耦合度 松耦合(通过 channel 解耦) 需要知道接收者
阻塞 channel 操作可阻塞 消息发送不阻塞(邮箱)
适用场景 结构化并发、pipeline 分布式系统、容错
// Go 中模拟 Actor 模式
type Actor struct {
    mailbox chan Message
    handler func(Message)
}

func NewActor(handler func(Message)) *Actor {
    a := &Actor{
        mailbox: make(chan Message, 100),
        handler: handler,
    }
    go a.run()
    return a
}

func (a *Actor) run() {
    for msg := range a.mailbox {
        a.handler(msg)
    }
}

func (a *Actor) Send(msg Message) {
    a.mailbox <- msg
}

6. 分布式系统基础

6.1 一致性哈希

type ConsistentHash struct {
    ring     map[uint32]string // hash → 节点
    keys     []uint32          // 有序 hash 环
    replicas int               // 虚拟节点数
}

func (h *ConsistentHash) Add(nodes ...string) {
    for _, node := range nodes {
        for i := 0; i < h.replicas; i++ {
            hash := crc32.ChecksumIEEE(
                []byte(fmt.Sprintf("%s#%d", node, i)))
            h.ring[hash] = node
            h.keys = append(h.keys, hash)
        }
    }
    sort.Slice(h.keys, func(i, j int) bool {
        return h.keys[i] < h.keys[j]
    })
}

func (h *ConsistentHash) Get(key string) string {
    hash := crc32.ChecksumIEEE([]byte(key))
    idx := sort.Search(len(h.keys), func(i int) bool {
        return h.keys[i] >= hash
    })
    if idx >= len(h.keys) {
        idx = 0
    }
    return h.ring[h.keys[idx]]
}

6.2 分布式锁(Redis 实现)

func acquireLock(ctx context.Context, rdb *redis.Client,
    key string, ttl time.Duration) (string, error) {
    value := uuid.New().String()
    ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
    if err != nil {
        return "", err
    }
    if !ok {
        return "", errors.New("lock held by another")
    }
    return value, nil
}

func releaseLock(ctx context.Context, rdb *redis.Client,
    key, value string) error {
    // Lua 脚本保证原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
    `
    return rdb.Eval(ctx, script, []string{key}, value).Err()
}

6.3 服务发现模式

注册中心模式:
  服务启动 → 注册到 Consul/etcd → 心跳保活
  客户端   → 从注册中心获取服务列表 → 负载均衡 → 调用

客户端发现 vs 服务端发现:
  客户端发现:客户端直接查注册中心,自己做负载均衡(gRPC 默认)
  服务端发现:通过 LB 代理,LB 查注册中心(Nginx、AWS ALB)

6.4 负载均衡策略

// 轮询
type RoundRobin struct {
    addrs []string
    next  uint64
}
func (r *RoundRobin) Pick() string {
    n := atomic.AddUint64(&r.next, 1)
    return r.addrs[n%uint64(len(r.addrs))]
}

// 加权轮询、最少连接、一致性哈希等

7. 高并发系统设计模式

7.1 令牌桶限流

import "golang.org/x/time/rate"

// 每秒 100 个请求,突发 50
limiter := rate.NewLimiter(100, 50)

func handler(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Too Many Requests", 429)
        return
    }
    // 处理请求...
}

// 每用户独立限流
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
    if l, ok := limiters.Load(userID); ok {
        return l.(*rate.Limiter)
    }
    l := rate.NewLimiter(10, 5)
    limiters.Store(userID, l)
    return l
}

7.2 熔断器模式

type CircuitBreaker struct {
    mu          sync.Mutex
    failures    int
    threshold   int
    state       string // "closed", "open", "half-open"
    lastFailure time.Time
    timeout     time.Duration
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    cb.mu.Unlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.threshold {
            cb.state = "open"
        }
        return err
    }
    cb.failures = 0
    cb.state = "closed"
    return nil
}

7.3 优雅退出

func main() {
    srv := &http.Server{Addr: ":8080"}

    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // 等待中断信号
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    log.Println("Server exited")
}

7.4 Worker Pool

func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
    results := make(chan Result, workers)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- process(job)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

8. 并发调试工具

工具 用途
go run -race 数据竞争检测
GODEBUG=schedtrace=1000 调度器状态
runtime.NumGoroutine() goroutine 数量监控
pprof goroutine goroutine 泄漏分析
go tool trace 可视化调度和阻塞事件
dlv(Delve) Go 专用调试器

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6737

(0)
Walker的头像Walker
上一篇 14小时前
下一篇 2025年11月25日 00:00

相关推荐

  • Go资深工程师讲解(慕课) 004

    004 goroutine package main import ( "fmt" "time" ) func main() { for i:=0;i<10;i++{ go func(i int) { fmt.Printf("Hello from goroutine %d \n",i) // …

  • 编程基础 0009_testing详解

    Go testing 详解 目录 testing 包基础 表格驱动测试 子测试 t.Run 基准测试 Benchmark 测试覆盖率 TestMain httptest 包 Mock 和接口测试技巧 模糊测试 Fuzz 1. testing 包基础 1.1 测试文件和函数命名规则 Go 测试遵循严格的命名约定: 测试文件以 _test.go 结尾(如 use…

    后端开发 20小时前
    400
  • Go工程师体系课 009

    其它一些功能 个人中心 收藏 管理收货地址(增删改查) 留言 拷贝inventory_srv--> userop_srv 查询替换所有的inventory Elasticsearch 深度解析文档 1. 什么是Elasticsearch Elasticsearch是一个基于Apache Lucene构建的分布式、RESTful搜索和分析引擎,能够快速地…

    后端开发 8小时前
    100
  • Go工程师体系课 006

    项目结构说明:user-web 模块 user-web 是 joyshop_api 工程中的用户服务 Web 层模块,负责处理用户相关的 HTTP 请求、参数校验、业务路由以及调用后端接口等功能。以下是目录结构说明: user-web/ ├── api/ # 控制器层,定义业务接口处理逻辑 ├── config/ # 配置模块,包含系统配置结构体及读取逻辑 …

  • Go工程师体系课 015

    Docker 容器化 —— Go 项目实战指南 一、Docker 核心概念 1.1 什么是 Docker Docker 是一个开源的容器化平台,它可以将应用程序及其所有依赖项打包到一个标准化的单元(容器)中,从而实现"一次构建,到处运行"。对于 Go 开发者而言,Docker 解决了以下痛点: 开发环境与生产环境不一致 依赖管理复杂(数据库、缓存、消息队列等…

简体中文 繁体中文 English