Programming Basics 0006_Advanced Concurrency_sync Package and Context

Advanced Concurrency: The sync Package and Context

I. Detailed Explanation of the sync Package

1. sync.Mutex and sync.RWMutex

// Mutex: Mutual exclusion lock, only one goroutine can hold it at a time
var mu sync.Mutex
var count int

func increment() {
    mu.Lock()
    defer mu.Unlock()
    count++
}

// RWMutex: Read-write lock, allows multiple reads, but writes are exclusive
var rwmu sync.RWMutex
var data map[string]string

func read(key string) string {
    rwmu.RLock()         // Read lock, multiple goroutines can hold simultaneously
    defer rwmu.RUnlock()
    return data[key]
}

func write(key, val string) {
    rwmu.Lock()          // Write lock, exclusive
    defer rwmu.Unlock()
    data[key] = val
}

When to use RWMutex? In scenarios with many reads and few writes (e.g., caches, configurations). If reads and writes are roughly equal, Mutex is sufficient, as RWMutex has additional overhead.

2. sync.Once

Ensures that a certain operation is executed only once, commonly used for singleton initialization.

var (
    instance *Database
    once     sync.Once
)

func GetDB() *Database {
    once.Do(func() {
        // No matter how many goroutines call simultaneously, it executes only once
        instance = &Database{
            conn: connectDB(),
        }
        fmt.Println("数据库初始化完成")
    })
    return instance
}

func main() {
    // Concurrent calls, initializes only once
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDB()
            _ = db
        }()
    }
    wg.Wait()
}

3. sync.Map

Concurrent-safe Map, no need for additional locking.

func main() {
    var m sync.Map

    // Store
    m.Store("name", "Alice")
    m.Store("age", 30)

    // Load
    val, ok := m.Load("name")
    if ok {
        fmt.Println(val) // Alice
    }

    // Load or Store (store if key does not exist)
    actual, loaded := m.LoadOrStore("name", "Bob")
    fmt.Println(actual, loaded) // Alice true (already exists, not stored)

    actual2, loaded2 := m.LoadOrStore("city", "Beijing")
    fmt.Println(actual2, loaded2) // Beijing false (newly stored)

    // Delete
    m.Delete("age")

    // Iterate
    m.Range(func(key, value any) bool {
        fmt.Printf("%s: %v\n", key, value)
        return true // Return false to stop iteration
    })

    // LoadAndDelete: Load and delete (Go 1.15+)
    val3, loaded3 := m.LoadAndDelete("city")
    fmt.Println(val3, loaded3) // Beijing true
}

sync.Map vs map+Mutex:

Scenario Recommendation
Keys are relatively fixed, many reads, few writes sync.Map
Frequent addition/deletion of keys map + Mutex/RWMutex
Requires len() or iteration performance map + Mutex/RWMutex
Different goroutines operate on different keys sync.Map

4. sync.Pool

Temporary object pool, reduces memory allocation and GC pressure. Objects may be reclaimed by GC at any time.

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer) // Create a new object when the pool is empty
    },
}

func processRequest(data string) string {
    // Get from pool
    buf := bufPool.Get().(*bytes.Buffer)
    buf.Reset() // Reset state! Very important

    // Use
    buf.WriteString("处理: ")
    buf.WriteString(data)
    result := buf.String()

    // Return to pool
    bufPool.Put(buf)

    return result
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processRequest(fmt.Sprintf("请求%d", id))
            _ = result
        }(i)
    }
    wg.Wait()
}

Notes:
- Must Reset object state after Get
- Do not assume that an object Put back will necessarily be Get-able next time (GC may clear the Pool)
- Suitable for frequently created temporary objects (e.g., buffers, temporary slices)
- The standard library fmt package heavily uses sync.Pool

5. sync.Cond

Condition variable, used for multiple goroutines to wait for a certain condition to be met.

type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    return &Queue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// Producer
func (q *Queue) Put(item int) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.items = append(q.items, item)
    q.cond.Signal() // Wake up one waiter (Broadcast wakes all)
}

// Consumer
func (q *Queue) Get() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.items) == 0 {
        q.cond.Wait() // Release lock and wait, re-acquire lock when awakened
    }
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    q := NewQueue()

    // Consumer
    go func() {
        for {
            item := q.Get()
            fmt.Println("消费:", item)
        }
    }()

    // Producer
    for i := 0; i < 10; i++ {
        q.Put(i)
        time.Sleep(200 * time.Millisecond)
    }
    time.Sleep(time.Second)
}

In real-world projects, channels are more commonly used than sync.Cond, but understanding Cond helps in understanding concurrency primitives.

6. sync.WaitGroup Advanced

func main() {
    var wg sync.WaitGroup

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                results[idx] = -1
                return
            }
            defer resp.Body.Close()
            results[idx] = resp.StatusCode
        }(i, url)
    }

    wg.Wait()
    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

Common errors:

// Error 1: Add inside the goroutine
go func() {
    wg.Add(1) // Might execute after Wait!
    defer wg.Done()
}()
wg.Wait()

// Correct: Add before starting the goroutine
wg.Add(1)
go func() {
    defer wg.Done()
}()
wg.Wait()

// Error 2: Forgetting Done leads to eternal blocking
// Use defer wg.Done() to ensure execution

II. Context

1. What is Context?

Context is used to pass cancellation signals, timeout control, and request-scoped data between goroutines.

type Context interface {
    Deadline() (deadline time.Time, ok bool) // Deadline
    Done() <-chan struct{}                    // Cancellation signal channel
    Err() error                               // Reason for Done closing
    Value(key any) any                        // Request-scoped data
}

2. context.Background() and context.TODO()

// Background: Root context, never canceled, no values, no deadline
// Typically used in main functions, initialization, testing
ctx := context.Background()

// TODO: Placeholder when it's unclear which context to use
// If you see TODO during code review, it indicates a need for improvement
ctx := context.TODO()

3. context.WithCancel

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("worker received cancellation signal:", ctx.Err())
                return
            default:
                fmt.Println("Working...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }(ctx)

    time.Sleep(2 * time.Second)
    cancel() // Send cancellation signal
    time.Sleep(100 * time.Millisecond)
    // Output: worker received cancellation signal: context canceled
}

4. context.WithTimeout and WithDeadline

// WithTimeout: Specify timeout duration
func fetchWithTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel() // Even if not timed out, call cancel to release resources

    req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        fmt.Println("Request failed:", err) // context deadline exceeded
        return
    }
    defer resp.Body.Close()
    fmt.Println("Status code:", resp.StatusCode)
}

// WithDeadline: Specify a deadline time
func fetchWithDeadline() {
    deadline := time.Now().Add(2 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    // Usage is the same as WithTimeout
    _ = ctx
}

5. context.WithValue

type contextKey string

const (
    keyUserID    contextKey = "user_id"
    keyRequestID contextKey = "request_id"
)

func middleware(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // Extract information from the request, put into context
        ctx := r.Context()
        ctx = context.WithValue(ctx, keyRequestID, generateID())
        ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
        next(w, r.WithContext(ctx))
    }
}

func handler(w http.ResponseWriter, r *http.Request) {
    // Get values from context
    reqID := r.Context().Value(keyRequestID).(string)
    userID := r.Context().Value(keyUserID).(string)
    fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}

Important: Keys should use custom unexported types (e.g., contextKey) to avoid key collisions between different packages.

6. Context Applications in Real-World Projects

// Database query with timeout
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
    var user User
    if err := row.Scan(&user.ID, &user.Name); err != nil {
        return nil, err
    }
    return &user, nil
}

// gRPC service automatically passes context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    // ctx automatically carries timeout and cancellation signals
    user, err := s.repo.FindByID(ctx, req.Id)
    if err != nil {
        return nil, err
    }
    return toProto(user), nil
}

// Cascading cancellation: When parent context is canceled, all child contexts are automatically canceled
func processOrder(ctx context.Context, orderID string) error {
    // Child tasks inherit parent context
    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error { return checkInventory(ctx, orderID) })
    g.Go(func() error { return chargePayment(ctx, orderID) })
    g.Go(func() error { return sendNotification(ctx, orderID) })

    return g.Wait() // Any failure automatically cancels others
}

7. Context Best Practices

Rule Description
As the first parameter func DoSomething(ctx context.Context, ...)
Do not store in a struct Context should be passed between functions, not as a field
Do not pass nil Use context.Background() or context.TODO()
WithValue only for request-scoped data e.g., request ID, user info, do not pass business parameters
Always call cancel Even if timed out, use defer cancel() to release resources
Do not pass the same cancel to multiple goroutines Whoever creates it, cancels it

Context Propagation Chain Diagram

HTTP Request enters
    │
    ▼
context.Background() + WithValue(requestID)
    │
    ├──► WithTimeout(5s) ──► Query database
    │
    ├──► WithTimeout(3s) ──► Call gRPC service
    │                            │
    │                            ├──► Sub-query 1
    │                            └──► Sub-query 2
    │
    └──► WithCancel() ──► Send notification (can be manually canceled)

// If any layer times out or is canceled, all downstream operations are automatically canceled

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6720

(0)
Walker的头像Walker
上一篇 Mar 8, 2026 15:11
下一篇 Mar 9, 2026 12:56

Related Posts

EN
简体中文 繁體中文 English