Programming Basics 0006_Advanced Concurrency_sync Package and Context

Advanced Concurrency: The sync Package and Context

I. Detailed Explanation of the sync Package

1. sync.Mutex and sync.RWMutex

// Mutex: Mutual exclusion lock, only one goroutine can hold it at a time
var mu sync.Mutex
var count int

func increment() {
    mu.Lock()
    defer mu.Unlock()
    count++
}

// RWMutex: Read-write lock, allows multiple reads, but writes are exclusive
var rwmu sync.RWMutex
var data map[string]string

func read(key string) string {
    rwmu.RLock()         // Read lock, multiple goroutines can hold simultaneously
    defer rwmu.RUnlock()
    return data[key]
}

func write(key, val string) {
    rwmu.Lock()          // Write lock, exclusive
    defer rwmu.Unlock()
    data[key] = val
}

When to use RWMutex? In scenarios with many reads and few writes (e.g., caches, configurations). If reads and writes are roughly equal, Mutex is sufficient, as RWMutex has additional overhead.

2. sync.Once

Ensures that a certain operation is executed only once, commonly used for singleton initialization.

var (
    instance *Database
    once     sync.Once
)

func GetDB() *Database {
    once.Do(func() {
        // No matter how many goroutines call simultaneously, it executes only once
        instance = &Database{
            conn: connectDB(),
        }
        fmt.Println("数据库初始化完成")
    })
    return instance
}

func main() {
    // Concurrent calls, initializes only once
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDB()
            _ = db
        }()
    }
    wg.Wait()
}

3. sync.Map

Concurrent-safe Map, no need for additional locking.

func main() {
    var m sync.Map

    // Store
    m.Store("name", "Alice")
    m.Store("age", 30)

    // Load
    val, ok := m.Load("name")
    if ok {
        fmt.Println(val) // Alice
    }

    // Load or Store (store if key does not exist)
    actual, loaded := m.LoadOrStore("name", "Bob")
    fmt.Println(actual, loaded) // Alice true (already exists, not stored)

    actual2, loaded2 := m.LoadOrStore("city", "Beijing")
    fmt.Println(actual2, loaded2) // Beijing false (newly stored)

    // Delete
    m.Delete("age")

    // Iterate
    m.Range(func(key, value any) bool {
        fmt.Printf("%s: %v\n", key, value)
        return true // Return false to stop iteration
    })

    // LoadAndDelete: Load and delete (Go 1.15+)
    val3, loaded3 := m.LoadAndDelete("city")
    fmt.Println(val3, loaded3) // Beijing true
}

sync.Map vs map+Mutex:

Scenario Recommendation
Keys are relatively fixed, many reads, few writes sync.Map
Frequent addition/deletion of keys map + Mutex/RWMutex
Requires len() or iteration performance map + Mutex/RWMutex
Different goroutines operate on different keys sync.Map

4. sync.Pool

Temporary object pool, reduces memory allocation and GC pressure. Objects may be reclaimed by GC at any time.

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer) // Create a new object when the pool is empty
    },
}

func processRequest(data string) string {
    // Get from pool
    buf := bufPool.Get().(*bytes.Buffer)
    buf.Reset() // Reset state! Very important

    // Use
    buf.WriteString("处理: ")
    buf.WriteString(data)
    result := buf.String()

    // Return to pool
    bufPool.Put(buf)

    return result
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processRequest(fmt.Sprintf("请求%d", id))
            _ = result
        }(i)
    }
    wg.Wait()
}

Notes:
- Must Reset object state after Get
- Do not assume that an object Put back will necessarily be Get-able next time (GC may clear the Pool)
- Suitable for frequently created temporary objects (e.g., buffers, temporary slices)
- The standard library fmt package heavily uses sync.Pool

5. sync.Cond

Condition variable, used for multiple goroutines to wait for a certain condition to be met.

type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    return &Queue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// Producer
func (q *Queue) Put(item int) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.items = append(q.items, item)
    q.cond.Signal() // Wake up one waiter (Broadcast wakes all)
}

// Consumer
func (q *Queue) Get() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.items) == 0 {
        q.cond.Wait() // Release lock and wait, re-acquire lock when awakened
    }
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    q := NewQueue()

    // Consumer
    go func() {
        for {
            item := q.Get()
            fmt.Println("消费:", item)
        }
    }()

    // Producer
    for i := 0; i < 10; i++ {
        q.Put(i)
        time.Sleep(200 * time.Millisecond)
    }
    time.Sleep(time.Second)
}

In real-world projects, channels are more commonly used than sync.Cond, but understanding Cond helps in understanding concurrency primitives.

6. sync.WaitGroup Advanced

func main() {
    var wg sync.WaitGroup

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                results[idx] = -1
                return
            }
            defer resp.Body.Close()
            results[idx] = resp.StatusCode
        }(i, url)
    }

    wg.Wait()
    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

Common errors:

// Error 1: Add inside the goroutine
go func() {
    wg.Add(1) // Might execute after Wait!
    defer wg.Done()
}()
wg.Wait()

// Correct: Add before starting the goroutine
wg.Add(1)
go func() {
    defer wg.Done()
}()
wg.Wait()

// Error 2: Forgetting Done leads to eternal blocking
// Use defer wg.Done() to ensure execution

II. Context

1. What is Context?

Context is used to pass cancellation signals, timeout control, and request-scoped data between goroutines.

type Context interface {
    Deadline() (deadline time.Time, ok bool) // Deadline
    Done() <-chan struct{}                    // Cancellation signal channel
    Err() error                               // Reason for Done closing
    Value(key any) any                        // Request-scoped data
}

2. context.Background() and context.TODO()

// Background: Root context, never canceled, no values, no deadline
// Typically used in main functions, initialization, testing
ctx := context.Background()

// TODO: Placeholder when it's unclear which context to use
// If you see TODO during code review, it indicates a need for improvement
ctx := context.TODO()

3. context.WithCancel

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("worker received cancellation signal:", ctx.Err())
                return
            default:
                fmt.Println("Working...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }(ctx)

    time.Sleep(2 * time.Second)
    cancel() // Send cancellation signal
    time.Sleep(100 * time.Millisecond)
    // Output: worker received cancellation signal: context canceled
}

4. context.WithTimeout and WithDeadline

// WithTimeout: Specify timeout duration
func fetchWithTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel() // Even if not timed out, call cancel to release resources

    req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        fmt.Println("Request failed:", err) // context deadline exceeded
        return
    }
    defer resp.Body.Close()
    fmt.Println("Status code:", resp.StatusCode)
}

// WithDeadline: Specify a deadline time
func fetchWithDeadline() {
    deadline := time.Now().Add(2 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    // Usage is the same as WithTimeout
    _ = ctx
}

5. context.WithValue

type contextKey string

const (
    keyUserID    contextKey = "user_id"
    keyRequestID contextKey = "request_id"
)

func middleware(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // Extract information from the request, put into context
        ctx := r.Context()
        ctx = context.WithValue(ctx, keyRequestID, generateID())
        ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
        next(w, r.WithContext(ctx))
    }
}

func handler(w http.ResponseWriter, r *http.Request) {
    // Get values from context
    reqID := r.Context().Value(keyRequestID).(string)
    userID := r.Context().Value(keyUserID).(string)
    fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}

Important: Keys should use custom unexported types (e.g., contextKey) to avoid key collisions between different packages.

6. Context Applications in Real-World Projects

// Database query with timeout
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
    var user User
    if err := row.Scan(&user.ID, &user.Name); err != nil {
        return nil, err
    }
    return &user, nil
}

// gRPC service automatically passes context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    // ctx automatically carries timeout and cancellation signals
    user, err := s.repo.FindByID(ctx, req.Id)
    if err != nil {
        return nil, err
    }
    return toProto(user), nil
}

// Cascading cancellation: When parent context is canceled, all child contexts are automatically canceled
func processOrder(ctx context.Context, orderID string) error {
    // Child tasks inherit parent context
    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error { return checkInventory(ctx, orderID) })
    g.Go(func() error { return chargePayment(ctx, orderID) })
    g.Go(func() error { return sendNotification(ctx, orderID) })

    return g.Wait() // Any failure automatically cancels others
}

7. Context Best Practices

Rule Description
As the first parameter func DoSomething(ctx context.Context, ...)
Do not store in a struct Context should be passed between functions, not as a field
Do not pass nil Use context.Background() or context.TODO()
WithValue only for request-scoped data e.g., request ID, user info, do not pass business parameters
Always call cancel Even if timed out, use defer cancel() to release resources
Do not pass the same cancel to multiple goroutines Whoever creates it, cancels it

Context Propagation Chain Diagram

HTTP Request enters
    │
    ▼
context.Background() + WithValue(requestID)
    │
    ├──► WithTimeout(5s) ──► Query database
    │
    ├──► WithTimeout(3s) ──► Call gRPC service
    │                            │
    │                            ├──► Sub-query 1
    │                            └──► Sub-query 2
    │
    └──► WithCancel() ──► Send notification (can be manually canceled)

// If any layer times out or is canceled, all downstream operations are automatically canceled

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6720

(0)
Walker的头像Walker
上一篇 12 hours ago
下一篇 1 day ago

Related Posts

  • 编程基础 0001_基础教程

    Translation is not yet available. Showing original content. go 什么是 Go是一门并发支持、垃圾加收的编译型系统编程语言,具有静态编译语言的高性能和动态语言的,主要特点如下 类型安全和内存安全 以非常直观和极低代价的方案实现高并发 高效的垃圾回收机制 快速编译(同时解决了 C 语言中头文件太多的问…

    后端开发 15 hours ago
    900
  • Go Senior Engineer Explains (MOOC) 006_Functional Programming

    Go Functional Programming Corresponds to video Ch6 (6-2 Functional Programming Example One), expanding on 002.md with more functional programming patterns 1. Review: Functions are …

    后端开发 1 day ago
    200
  • Go Engineer Systematic Course 017

    Introduction to Rate Limiting, Circuit Breaking, and Degradation (with Sentinel Hands-on) Based on the key video points from Chapter 3 (3-1 ~ 3-9) of the courseware, this guide pro…

    后端开发 1 day ago
    300
  • Go Engineering System Course 015

    Docker Containerization — A Practical Guide for Go Projects I. Docker Core Concepts 1.1 What is Docker Docker is an open-source containerization platform that packages applications…

    后端开发 57 minutes ago
    000
  • Go工程师体系课 010

    Translation is not yet available. Showing original content. es 安装 elasticsearch(理解为库) kibana(理解为连接工具)es 和 kibana(5601) 的版本要保持一致 MySQL 对照学习 Elasticsearch(ES) 术语对照 MySQL Elasticsearc…

    后端开发 5 hours ago
    100
EN
简体中文 繁體中文 English