Go 并发与分布式实战精华
参考:《Go 并发编程实战》(郝林)、《Mastering Concurrency in Go》(Nathan Kozyra)、《Go 语言构建高并发分布式系统实践》
1. 并发原语深入
1.1 atomic 包
atomic 操作直接映射到 CPU 指令(如 LOCK CMPXCHG),比 mutex 快一个数量级。
import "sync/atomic"
// 基本操作
var counter int64
atomic.AddInt64(&counter, 1) // 原子加
atomic.StoreInt64(&counter, 100) // 原子存
val := atomic.LoadInt64(&counter) // 原子读
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS
// Go 1.19+ 泛型原子类型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()
// atomic.Value — 存储任意类型(适合读多写少的配置热更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)
CAS(Compare-And-Swap)自旋锁模式:
// 无锁计数器
func increment(addr *int64) {
for {
old := atomic.LoadInt64(addr)
if atomic.CompareAndSwapInt64(addr, old, old+1) {
return // 成功
}
// 失败则重试(自旋)
runtime.Gosched() // 让出 CPU,避免空转
}
}
1.2 sync.Mutex 实现原理
Mutex 内部状态(state int32 的位含义):
bit0: locked — 是否被锁定
bit1: woken — 是否有唤醒的 goroutine
bit2: starving — 是否进入饥饿模式
bit3+: waiters — 等待者计数
两种模式:
正常模式:新到的 goroutine 与被唤醒的 goroutine 竞争锁
新到的更有优势(已在 CPU 上运行),可能导致饥饿
饥饿模式:等待超过 1ms 后切换
锁直接交给队首等待者,新到的 goroutine 排队
最后一个等待者获得锁时切回正常模式
1.3 sync.RWMutex 实现
内部结构:
w Mutex // 写锁
writerSem uint32 // 写等待信号量
readerSem uint32 // 读等待信号量
readerCount int32 // 读者计数(负数表示有写者)
readerWait int32 // 写者等待的读者数
读锁:atomic.AddInt32(&readerCount, 1)
如果 readerCount < 0,说明有写者 → 等待 readerSem
读解锁:atomic.AddInt32(&readerCount, -1)
写锁:先获取 w.Lock()
将 readerCount 减去 rwmutexMaxReaders(变为负数,阻止新读者)
等待现有读者完成
写解锁:恢复 readerCount,唤醒等待的读者
2. Channel 高级模式
2.1 Pipeline 模式
// 生成器 → 处理器 → 消费者
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// pipeline: generate → square → print
for v := range square(generate(1, 2, 3, 4)) {
fmt.Println(v)
}
}
2.2 Fan-out / Fan-in
// Fan-out: 多个 goroutine 从同一个 channel 读取
func fanOut(in <-chan int, workers int) []<-chan int {
outs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
outs[i] = square(in) // 多个 worker 竞争读取
}
return outs
}
// Fan-in: 将多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
2.3 或通道(Or-Channel)
任一 channel 完成即返回:
func or(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
2.4 信号量模式
// 用带缓冲 channel 实现信号量
type Semaphore chan struct{}
func NewSemaphore(max int) Semaphore {
return make(Semaphore, max)
}
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
// 使用
sem := NewSemaphore(10) // 最多 10 个并发
for _, task := range tasks {
sem.Acquire()
go func(t Task) {
defer sem.Release()
process(t)
}(task)
}
3. 并发安全与数据竞争
3.1 Race Detector
# 编译时启用竞争检测(性能下降 2-10x,内存增加 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app
# 环境变量控制
GORACE="log_path=race.log halt_on_error=1" ./app
3.2 常见竞争陷阱
// 陷阱1:循环变量捕获
for _, v := range items {
go func() {
process(v) // 所有 goroutine 共享同一个 v!
}()
}
// 修复:传参
for _, v := range items {
go func(item Item) {
process(item)
}(v)
}
// Go 1.22+ 循环变量语义改变,每次迭代创建新变量
// 陷阱2:map 并发读写
// map 不是并发安全的!并发读写直接 panic
// 修复:sync.RWMutex 或 sync.Map
// 陷阱3:slice 并发 append
// append 可能触发扩容,导致底层数组变化
// 修复:预分配 + 索引赋值,或加锁
// 陷阱4:interface 赋值非原子
var i interface{}
go func() { i = "hello" }() // 非原子!
go func() { i = 42 }() // 可能读到半写状态
// 修复:atomic.Value
3.3 并发安全设计原则
| 原则 | 说明 |
|---|---|
| 不可变数据 | 只读数据天然安全 |
| 值传递 | 传递副本而非指针 |
| CSP | 通过 channel 通信而非共享内存 |
| COW | Copy-On-Write,atomic.Value 存储配置 |
| 限制可见性 | 不导出可变状态 |
| 锁粒度最小化 | 只锁必要的代码段 |
4. errgroup 与并发控制
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url // 捕获变量
g.Go(func() error {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // 任一失败 → 取消其他
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results[i] = string(body) // 索引赋值,无竞争
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
// 限制并发数
g.SetLimit(10) // 最多 10 个并发 goroutine
5. CSP vs Actor 模型
| 特征 | CSP (Go) | Actor (Erlang/Akka) |
|---|---|---|
| 通信方式 | 通过 channel 传递消息 | 直接向 actor 发送消息 |
| 标识 | channel 有类型,匿名进程 | actor 有地址/名称 |
| 耦合度 | 松耦合(通过 channel 解耦) | 需要知道接收者 |
| 阻塞 | channel 操作可阻塞 | 消息发送不阻塞(邮箱) |
| 适用场景 | 结构化并发、pipeline | 分布式系统、容错 |
// Go 中模拟 Actor 模式
type Actor struct {
mailbox chan Message
handler func(Message)
}
func NewActor(handler func(Message)) *Actor {
a := &Actor{
mailbox: make(chan Message, 100),
handler: handler,
}
go a.run()
return a
}
func (a *Actor) run() {
for msg := range a.mailbox {
a.handler(msg)
}
}
func (a *Actor) Send(msg Message) {
a.mailbox <- msg
}
6. 分布式系统基础
6.1 一致性哈希
type ConsistentHash struct {
ring map[uint32]string // hash → 节点
keys []uint32 // 有序 hash 环
replicas int // 虚拟节点数
}
func (h *ConsistentHash) Add(nodes ...string) {
for _, node := range nodes {
for i := 0; i < h.replicas; i++ {
hash := crc32.ChecksumIEEE(
[]byte(fmt.Sprintf("%s#%d", node, i)))
h.ring[hash] = node
h.keys = append(h.keys, hash)
}
}
sort.Slice(h.keys, func(i, j int) bool {
return h.keys[i] < h.keys[j]
})
}
func (h *ConsistentHash) Get(key string) string {
hash := crc32.ChecksumIEEE([]byte(key))
idx := sort.Search(len(h.keys), func(i int) bool {
return h.keys[i] >= hash
})
if idx >= len(h.keys) {
idx = 0
}
return h.ring[h.keys[idx]]
}
6.2 分布式锁(Redis 实现)
func acquireLock(ctx context.Context, rdb *redis.Client,
key string, ttl time.Duration) (string, error) {
value := uuid.New().String()
ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
if err != nil {
return "", err
}
if !ok {
return "", errors.New("lock held by another")
}
return value, nil
}
func releaseLock(ctx context.Context, rdb *redis.Client,
key, value string) error {
// Lua 脚本保证原子性
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`
return rdb.Eval(ctx, script, []string{key}, value).Err()
}
6.3 服务发现模式
注册中心模式:
服务启动 → 注册到 Consul/etcd → 心跳保活
客户端 → 从注册中心获取服务列表 → 负载均衡 → 调用
客户端发现 vs 服务端发现:
客户端发现:客户端直接查注册中心,自己做负载均衡(gRPC 默认)
服务端发现:通过 LB 代理,LB 查注册中心(Nginx、AWS ALB)
6.4 负载均衡策略
// 轮询
type RoundRobin struct {
addrs []string
next uint64
}
func (r *RoundRobin) Pick() string {
n := atomic.AddUint64(&r.next, 1)
return r.addrs[n%uint64(len(r.addrs))]
}
// 加权轮询、最少连接、一致性哈希等
7. 高并发系统设计模式
7.1 令牌桶限流
import "golang.org/x/time/rate"
// 每秒 100 个请求,突发 50
limiter := rate.NewLimiter(100, 50)
func handler(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Too Many Requests", 429)
return
}
// 处理请求...
}
// 每用户独立限流
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
if l, ok := limiters.Load(userID); ok {
return l.(*rate.Limiter)
}
l := rate.NewLimiter(10, 5)
limiters.Store(userID, l)
return l
}
7.2 熔断器模式
type CircuitBreaker struct {
mu sync.Mutex
failures int
threshold int
state string // "closed", "open", "half-open"
lastFailure time.Time
timeout time.Duration
}
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
if cb.state == "open" {
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = "half-open"
} else {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
}
cb.mu.Unlock()
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = "open"
}
return err
}
cb.failures = 0
cb.state = "closed"
return nil
}
7.3 优雅退出
func main() {
srv := &http.Server{Addr: ":8080"}
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown:", err)
}
log.Println("Server exited")
}
7.4 Worker Pool
func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
results := make(chan Result, workers)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
return
}
results <- process(job)
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(results)
}()
return results
}
8. 并发调试工具
| 工具 | 用途 |
|---|---|
go run -race |
数据竞争检测 |
GODEBUG=schedtrace=1000 |
调度器状态 |
runtime.NumGoroutine() |
goroutine 数量监控 |
pprof goroutine |
goroutine 泄漏分析 |
go tool trace |
可视化调度和阻塞事件 |
dlv(Delve) |
Go 专用调试器 |
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6737