Go Concurrency and Distributed Systems in Practice Essentials
References: "Go Concurrency in Practice" (Hao Lin), "Mastering Concurrency in Go" (Nathan Kozyra), "Go Language Practice in Building High-Concurrency Distributed Systems"
1. Deep Dive into Concurrency Primitives
1.1 atomic Package
Atomic operations map directly to CPU instructions (e.g., LOCK CMPXCHG) and are an order of magnitude faster than mutexes.
import "sync/atomic"
// 基本操作
var counter int64
atomic.AddInt64(&counter, 1) // 原子加
atomic.StoreInt64(&counter, 100) // 原子存
val := atomic.LoadInt64(&counter) // 原子读
atomic.CompareAndSwapInt64(&counter, 100, 200) // CAS
// Go 1.19+ 泛型原子类型
var counter atomic.Int64
counter.Add(1)
counter.Store(100)
val := counter.Load()
// atomic.Value — 存储任意类型(适合读多写少的配置热更新)
var config atomic.Value
config.Store(Config{Debug: true})
cfg := config.Load().(Config)
CAS (Compare-And-Swap) Spin Lock Pattern:
// 无锁计数器
func increment(addr *int64) {
for {
old := atomic.LoadInt64(addr)
if atomic.CompareAndSwapInt64(addr, old, old+1) {
return // 成功
}
// 失败则重试(自旋)
runtime.Gosched() // 让出 CPU,避免空转
}
}
1.2 sync.Mutex Implementation Principles
Mutex internal state (bit meanings of state int32):
bit0: locked — whether it is locked
bit1: woken — whether there is a woken goroutine
bit2: starving — whether it has entered starvation mode
bit3+: waiters — count of waiters
Two modes:
Normal mode: newly arrived goroutines compete for the lock with woken goroutines
Newly arrived ones have an advantage (already running on CPU), which may lead to starvation
Starvation mode: switches after waiting for more than 1ms
The lock is handed directly to the first waiter in the queue, newly arrived goroutines queue up
Switches back to normal mode when the last waiter acquires the lock
1.3 sync.RWMutex Implementation
Internal structure:
w Mutex // write lock
writerSem uint32 // write waiting semaphore
readerSem uint32 // read waiting semaphore
readerCount int32 // reader count (negative means there are writers)
readerWait int32 // number of readers a writer is waiting for
Read lock: atomic.AddInt32(&readerCount, 1)
If readerCount < 0, it means there's a writer → wait for readerSem
Read unlock: atomic.AddInt32(&readerCount, -1)
Write lock: first acquire w.Lock()
Decrement readerCount by rwmutexMaxReaders (becomes negative, blocking new readers)
Wait for existing readers to complete
Write unlock: restore readerCount, wake up waiting readers
2. Advanced Channel Patterns
2.1 Pipeline Pattern
// Generator → Processor → Consumer
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// pipeline: generate → square → print
for v := range square(generate(1, 2, 3, 4)) {
fmt.Println(v)
}
}
2.2 Fan-out / Fan-in
// Fan-out: multiple goroutines read from the same channel
func fanOut(in <-chan int, workers int) []<-chan int {
outs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
outs[i] = square(in) // Multiple workers compete to read
}
return outs
}
// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
2.3 Or-Channel
Returns as soon as any channel completes:
func or(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
2.4 Semaphore Pattern
// Implement semaphore using buffered channel
type Semaphore chan struct{}
func NewSemaphore(max int) Semaphore {
return make(Semaphore, max)
}
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
// Usage
sem := NewSemaphore(10) // Max 10 concurrent operations
for _, task := range tasks {
sem.Acquire()
go func(t Task) {
defer sem.Release()
process(t)
}(task)
}
3. Concurrency Safety and Data Races
3.1 Race Detector
# Enable race detection during compilation (performance degrades 2-10x, memory increases 5-10x)
go run -race main.go
go test -race ./...
go build -race -o app
# Environment variable control
GORACE="log_path=race.log halt_on_error=1" ./app
3.2 Common Race Traps
// Trap 1: Loop variable capture
for _, v := range items {
go func() {
process(v) // All goroutines share the same v!
}()
}
// Fix: pass as argument
for _, v := range items {
go func(item Item) {
process(item)
}(v)
}
// Go 1.22+ loop variable semantics changed, a new variable is created for each iteration
// Trap 2: Concurrent map read/write
// Maps are not concurrency-safe! Concurrent read/write will panic directly
// Fix: sync.RWMutex or sync.Map
// Trap 3: Concurrent slice append
// append may trigger reallocation, causing the underlying array to change
// Fix: pre-allocate + index assignment, or add a lock
// Trap 4: Non-atomic interface assignment
var i interface{}
go func() { i = "hello" }() // Non-atomic!
go func() { i = 42 }() // May read a partially written state
// Fix: atomic.Value
3.3 Concurrency Safety Design Principles
| Principle | Description |
|---|---|
| Immutable Data | Read-only data is inherently safe |
| Pass by Value | Pass copies instead of pointers |
| CSP | Communicate via channels instead of shared memory |
| COW | Copy-On-Write, atomic.Value for storing configuration |
| Limit Visibility | Do not export mutable state |
| Minimize Lock Granularity | Lock only necessary code segments |
4. errgroup and Concurrency Control
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url // Capture variables
g.Go(func() error {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // Any failure → cancel others
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results[i] = string(body) // Index assignment, no race
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
// Limit concurrency
g.SetLimit(10) // Max 10 concurrent goroutines
5. CSP vs Actor Model
| Feature | CSP (Go) | Actor (Erlang/Akka) |
|---|---|---|
| Communication Method | Message passing via channels | Direct message sending to actors |
| Identification | Channels are typed, anonymous processes | Actors have addresses/names |
| Coupling | Loose coupling (decoupled via channels) | Requires knowing the receiver |
| Blocking | Channel operations can block | Message sending is non-blocking (mailbox) |
| Applicable Scenarios | Structured concurrency, pipeline | Distributed systems, fault tolerance |
// Simulating Actor pattern in Go
type Actor struct {
mailbox chan Message
handler func(Message)
}
func NewActor(handler func(Message)) *Actor {
a := &Actor{
mailbox: make(chan Message, 100),
handler: handler,
}
go a.run()
return a
}
func (a *Actor) run() {
for msg := range a.mailbox {
a.handler(msg)
}
}
func (a *Actor) Send(msg Message) {
a.mailbox <- msg
}
6. Distributed Systems Fundamentals
6.1 Consistent Hashing
type ConsistentHash struct {
ring map[uint32]string // hash → node
keys []uint32 // ordered hash ring
replicas int // number of virtual nodes
}
func (h *ConsistentHash) Add(nodes ...string) {
for _, node := range nodes {
for i := 0; i < h.replicas; i++ {
hash := crc32.ChecksumIEEE(
[]byte(fmt.Sprintf("%s#%d", node, i)))
h.ring[hash] = node
h.keys = append(h.keys, hash)
}
}
sort.Slice(h.keys, func(i, j int) bool {
return h.keys[i] < h.keys[j]
})
}
func (h *ConsistentHash) Get(key string) string {
hash := crc32.ChecksumIEEE([]byte(key))
idx := sort.Search(len(h.keys), func(i int) bool {
return h.keys[i] >= hash
})
if idx >= len(h.keys) {
idx = 0
}
return h.ring[h.keys[idx]]
}
6.2 Distributed Lock (Redis Implementation)
func acquireLock(ctx context.Context, rdb *redis.Client,
key string, ttl time.Duration) (string, error) {
value := uuid.New().String()
ok, err := rdb.SetNX(ctx, key, value, ttl).Result()
if err != nil {
return "", err
}
if !ok {
return "", errors.New("lock held by another")
}
return value, nil
}
func releaseLock(ctx context.Context, rdb *redis.Client,
key, value string) error {
// Lua script ensures atomicity
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`
return rdb.Eval(ctx, script, []string{key}, value).Err()
}
6.3 Service Discovery Pattern
Registry Pattern:
Service startup → register with Consul/etcd → heartbeat to keep alive
Client → retrieve service list from registry → load balance → invoke
Client-side Discovery vs Server-side Discovery:
Client-side discovery: client directly queries the registry, performs load balancing itself (gRPC default)
Server-side discovery: via LB proxy, LB queries the registry (Nginx, AWS ALB)
6.4 Load Balancing Strategies
// Round Robin
type RoundRobin struct {
addrs []string
next uint64
}
func (r *RoundRobin) Pick() string {
n := atomic.AddUint64(&r.next, 1)
return r.addrs[n%uint64(len(r.addrs))]
}
// Weighted Round Robin, Least Connections, Consistent Hashing, etc.
7. High-Concurrency System Design Patterns
7.1 Token Bucket Rate Limiting
import "golang.org/x/time/rate"
// 100 requests per second, burst 50
limiter := rate.NewLimiter(100, 50)
func handler(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "Too Many Requests", 429)
return
}
// Process request...
}
// Per-user independent rate limiting
var limiters sync.Map
func getUserLimiter(userID string) *rate.Limiter {
if l, ok := limiters.Load(userID); ok {
return l.(*rate.Limiter)
}
l := rate.NewLimiter(10, 5)
limiters.Store(userID, l)
return l
}
7.2 Circuit Breaker Pattern
type CircuitBreaker struct {
mu sync.Mutex
failures int
threshold int
state string // "closed", "open", "half-open"
lastFailure time.Time
timeout time.Duration
}
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
if cb.state == "open" {
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = "half-open"
} else {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
}
cb.mu.Unlock()
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = "open"
}
return err
}
cb.failures = 0
cb.state = "closed"
return nil
}
7.3 Graceful Shutdown
func main() {
srv := &http.Server{Addr: ":8080"}
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server forced to shutdown:", err)
}
log.Println("Server exited")
}
7.4 Worker Pool
func workerPool(ctx context.Context, jobs <-chan Job, workers int) <-chan Result {
results := make(chan Result, workers)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
return
}
results <- process(job)
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(results)
}()
return results
}
8. Concurrency Debugging Tools
| Tool | Purpose |
|---|---|
go run -race |
Data race detection |
GODEBUG=schedtrace=1000 |
Scheduler status |
runtime.NumGoroutine() |
Goroutine count monitoring |
pprof goroutine |
Goroutine leak analysis |
go tool trace |
Visualize scheduling and blocking events |
dlv (Delve) |
Go-specific debugger |
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6737