Programming Fundamentals 0011_Go Concurrency and Distributed Practical Essentials

Go Concurrency and Distributed Systems in Practice Essentials

References: "Go Concurrency in Practice" (Hao Lin), "Mastering Concurrency in Go" (Nathan Kozyra), "Go Language Practice in Building High-Concurrency Distributed Systems"

1. Deep Dive into Concurrency Primitives

1.1 atomic Package

Atomic operations map directly to CPU instructions (e.g., LOCK CMPXCHG) and are an order of magnitude faster than mutexes.

import "sync/atomic"

// 基本操作
var counter int64
atomic.AddInt64(&counter, 1)           // 原子加
atomic.StoreInt64(&counter, 100)       // 原子存
val := atomic.LoadInt64(&counter)      // 原子读
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS

// Go 1.19+ 泛型原子类型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()

// atomic.Value — 存储任意类型(适合读多写少的配置热更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)

CAS (Compare-And-Swap) Spin Lock Pattern:

// 无锁计数器
func increment(addr *int64) {
    for {
        old := atomic.LoadInt64(addr)
        if atomic.CompareAndSwapInt64(addr, old, old+1) {
            return // 成功
        }
        // 失败则重试(自旋)
        runtime.Gosched() // 让出 CPU,避免空转
    }
}

1.2 sync.Mutex Implementation Principles

Mutex internal state (bit meanings of state int32):
  bit0: locked    — whether it is locked
  bit1: woken     — whether there is a woken goroutine
  bit2: starving  — whether it has entered starvation mode
  bit3+: waiters  — count of waiters

Two modes:
  Normal mode: newly arrived goroutines compete for the lock with woken goroutines
           Newly arrived ones have an advantage (already running on CPU), which may lead to starvation
  Starvation mode: switches after waiting for more than 1ms
           The lock is handed directly to the first waiter in the queue, newly arrived goroutines queue up
           Switches back to normal mode when the last waiter acquires the lock

1.3 sync.RWMutex Implementation

Internal structure:
  w       Mutex    // write lock
  writerSem uint32 // write waiting semaphore
  readerSem uint32 // read waiting semaphore
  readerCount int32 // reader count (negative means there are writers)
  readerWait  int32 // number of readers a writer is waiting for

Read lock: atomic.AddInt32(&readerCount, 1)
     If readerCount < 0, it means there's a writer → wait for readerSem
Read unlock: atomic.AddInt32(&readerCount, -1)

Write lock: first acquire w.Lock()
     Decrement readerCount by rwmutexMaxReaders (becomes negative, blocking new readers)
     Wait for existing readers to complete
Write unlock: restore readerCount, wake up waiting readers

2. Advanced Channel Patterns

2.1 Pipeline Pattern

// Generator → Processor → Consumer
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // pipeline: generate → square → print
    for v := range square(generate(1, 2, 3, 4)) {
        fmt.Println(v)
    }
}

2.2 Fan-out / Fan-in

// Fan-out: multiple goroutines read from the same channel
func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outs[i] = square(in) // Multiple workers compete to read
    }
    return outs
}

// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

2.3 Or-Channel

Returns as soon as any channel completes:

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

2.4 Semaphore Pattern

// Implement semaphore using buffered channel
type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

// Usage
sem := NewSemaphore(10) // Max 10 concurrent operations
for _, task := range tasks {
    sem.Acquire()
    go func(t Task) {
        defer sem.Release()
        process(t)
    }(task)
}

3. Concurrency Safety and Data Races

3.1 Race Detector

# Enable race detection during compilation (performance degrades 2-10x, memory increases 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app

# Environment variable control
GORACE="log_path=race.log halt_on_error=1" ./app

3.2 Common Race Traps

// Trap 1: Loop variable capture
for _, v := range items {
    go func() {
        process(v) // All goroutines share the same v!
    }()
}
// Fix: pass as argument
for _, v := range items {
    go func(item Item) {
        process(item)
    }(v)
}
// Go 1.22+ loop variable semantics changed, a new variable is created for each iteration

// Trap 2: Concurrent map read/write
// Maps are not concurrency-safe! Concurrent read/write will panic directly
// Fix: sync.RWMutex or sync.Map

// Trap 3: Concurrent slice append
// append may trigger reallocation, causing the underlying array to change
// Fix: pre-allocate + index assignment, or add a lock

// Trap 4: Non-atomic interface assignment
var i interface{}
go func() { i = "hello" }() // Non-atomic!
go func() { i = 42 }()     // May read a partially written state
// Fix: atomic.Value

3.3 Concurrency Safety Design Principles

Principle Description
Immutable Data Read-only data is inherently safe
Pass by Value Pass copies instead of pointers
CSP Communicate via channels instead of shared memory
COW Copy-On-Write, atomic.Value for storing configuration
Limit Visibility Do not export mutable state
Minimize Lock Granularity Lock only necessary code segments

4. errgroup and Concurrency Control

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // Capture variables
        g.Go(func() error {
            req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // Any failure → cancel others
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results[i] = string(body) // Index assignment, no race
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

// Limit concurrency
g.SetLimit(10) // Max 10 concurrent goroutines

5. CSP vs Actor Model

Feature CSP (Go) Actor (Erlang/Akka)
Communication Method Message passing via channels Direct message sending to actors
Identification Channels are typed, anonymous processes Actors have addresses/names
Coupling Loose coupling (decoupled via channels) Requires knowing the receiver
Blocking Channel operations can block Message sending is non-blocking (mailbox)
Applicable Scenarios Structured concurrency, pipeline Distributed systems, fault tolerance
// Simulating Actor pattern in Go
type Actor struct {
    mailbox chan Message
    handler func(Message)
}

func NewActor(handler func(Message)) *Actor {
    a := &Actor{
        mailbox: make(chan Message, 100),
        handler: handler,
    }
    go a.run()
    return a
}

func (a *Actor) run() {
    for msg := range a.mailbox {
        a.handler(msg)
    }
}

func (a *Actor) Send(msg Message) {
    a.mailbox <- msg
}

6. Distributed Systems Fundamentals

6.1 Consistent Hashing

type ConsistentHash struct {
    ring     map[uint32]string // hash → node
    keys     []uint32          // ordered hash ring
    replicas int               // number of virtual nodes
}

func (h *ConsistentHash) Add(nodes ...string) {
    for _, node := range nodes {
        for i := 0; i < h.replicas; i++ {
            hash := crc32.ChecksumIEEE(
                []byte(fmt.Sprintf("%s#%d", node, i)))
            h.ring[hash] = node
            h.keys = append(h.keys, hash)
        }
    }
    sort.Slice(h.keys, func(i, j int) bool {
        return h.keys[i] < h.keys[j]
    })
}

func (h *ConsistentHash) Get(key string) string {
    hash := crc32.ChecksumIEEE([]byte(key))
    idx := sort.Search(len(h.keys), func(i int) bool {
        return h.keys[i] >= hash
    })
    if idx >= len(h.keys) {
        idx = 0
    }
    return h.ring[h.keys[idx]]
}

6.2 Distributed Lock (Redis Implementation)

func acquireLock(ctx context.Context, rdb *redis.Client,
    key string, ttl time.Duration) (string, error) {
    value := uuid.New().String()
    ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
    if err != nil {
        return "", err
    }
    if !ok {
        return "", errors.New("lock held by another")
    }
    return value, nil
}

func releaseLock(ctx context.Context, rdb *redis.Client,
    key, value string) error {
    // Lua script ensures atomicity
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        end
        return 0
    `
    return rdb.Eval(ctx, script, []string{key}, value).Err()
}

6.3 Service Discovery Pattern

Registry Pattern:
  Service startup → register with Consul/etcd → heartbeat to keep alive
  Client   → retrieve service list from registry → load balance → invoke

Client-side Discovery vs Server-side Discovery:
  Client-side discovery: client directly queries the registry, performs load balancing itself (gRPC default)
  Server-side discovery: via LB proxy, LB queries the registry (Nginx, AWS ALB)

6.4 Load Balancing Strategies

// Round Robin
type RoundRobin struct {
    addrs []string
    next  uint64
}
func (r *RoundRobin) Pick() string {
    n := atomic.AddUint64(&r.next, 1)
    return r.addrs[n%uint64(len(r.addrs))]
}

// Weighted Round Robin, Least Connections, Consistent Hashing, etc.

7. High-Concurrency System Design Patterns

7.1 Token Bucket Rate Limiting

import "golang.org/x/time/rate"

// 100 requests per second, burst 50
limiter := rate.NewLimiter(100, 50)

func handler(w http.ResponseWriter, r *http.Request) {
    if !limiter.Allow() {
        http.Error(w, "Too Many Requests", 429)
        return
    }
    // Process request...
}

// Per-user independent rate limiting
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
    if l, ok := limiters.Load(userID); ok {
        return l.(*rate.Limiter)
    }
    l := rate.NewLimiter(10, 5)
    limiters.Store(userID, l)
    return l
}

7.2 Circuit Breaker Pattern

type CircuitBreaker struct {
    mu          sync.Mutex
    failures    int
    threshold   int
    state       string // "closed", "open", "half-open"
    lastFailure time.Time
    timeout     time.Duration
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    if cb.state == "open" {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = "half-open"
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    cb.mu.Unlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.threshold {
            cb.state = "open"
        }
        return err
    }
    cb.failures = 0
    cb.state = "closed"
    return nil
}

7.3 Graceful Shutdown

func main() {
    srv := &http.Server{Addr: ":8080"}

    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // Wait for interrupt signal
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    log.Println("Server exited")
}

7.4 Worker Pool

func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
    results := make(chan Result, workers)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- process(job)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

8. Concurrency Debugging Tools

Tool Purpose
go run -race Data race detection
GODEBUG=schedtrace=1000 Scheduler status
runtime.NumGoroutine() Goroutine count monitoring
pprof goroutine Goroutine leak analysis
go tool trace Visualize scheduling and blocking events
dlv (Delve) Go-specific debugger

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6737

(0)
Walker的头像Walker
上一篇 14 hours ago
下一篇 Nov 25, 2025 00:00

Related Posts

EN
简体中文 繁體中文 English