Advanced Concurrency: The sync Package and Context
I. Detailed Explanation of the sync Package
1. sync.Mutex and sync.RWMutex
// Mutex: Mutual exclusion lock, only one goroutine can hold it at a time
var mu sync.Mutex
var count int
func increment() {
mu.Lock()
defer mu.Unlock()
count++
}
// RWMutex: Read-write lock, allows multiple reads, but writes are exclusive
var rwmu sync.RWMutex
var data map[string]string
func read(key string) string {
rwmu.RLock() // Read lock, multiple goroutines can hold simultaneously
defer rwmu.RUnlock()
return data[key]
}
func write(key, val string) {
rwmu.Lock() // Write lock, exclusive
defer rwmu.Unlock()
data[key] = val
}
When to use RWMutex? In scenarios with many reads and few writes (e.g., caches, configurations). If reads and writes are roughly equal, Mutex is sufficient, as RWMutex has additional overhead.
2. sync.Once
Ensures that a certain operation is executed only once, commonly used for singleton initialization.
var (
instance *Database
once sync.Once
)
func GetDB() *Database {
once.Do(func() {
// No matter how many goroutines call simultaneously, it executes only once
instance = &Database{
conn: connectDB(),
}
fmt.Println("数据库初始化完成")
})
return instance
}
func main() {
// Concurrent calls, initializes only once
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDB()
_ = db
}()
}
wg.Wait()
}
3. sync.Map
Concurrent-safe Map, no need for additional locking.
func main() {
var m sync.Map
// Store
m.Store("name", "Alice")
m.Store("age", 30)
// Load
val, ok := m.Load("name")
if ok {
fmt.Println(val) // Alice
}
// Load or Store (store if key does not exist)
actual, loaded := m.LoadOrStore("name", "Bob")
fmt.Println(actual, loaded) // Alice true (already exists, not stored)
actual2, loaded2 := m.LoadOrStore("city", "Beijing")
fmt.Println(actual2, loaded2) // Beijing false (newly stored)
// Delete
m.Delete("age")
// Iterate
m.Range(func(key, value any) bool {
fmt.Printf("%s: %v\n", key, value)
return true // Return false to stop iteration
})
// LoadAndDelete: Load and delete (Go 1.15+)
val3, loaded3 := m.LoadAndDelete("city")
fmt.Println(val3, loaded3) // Beijing true
}
sync.Map vs map+Mutex:
| Scenario | Recommendation |
|---|---|
| Keys are relatively fixed, many reads, few writes | sync.Map |
| Frequent addition/deletion of keys | map + Mutex/RWMutex |
| Requires len() or iteration performance | map + Mutex/RWMutex |
| Different goroutines operate on different keys | sync.Map |
4. sync.Pool
Temporary object pool, reduces memory allocation and GC pressure. Objects may be reclaimed by GC at any time.
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer) // Create a new object when the pool is empty
},
}
func processRequest(data string) string {
// Get from pool
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset() // Reset state! Very important
// Use
buf.WriteString("处理: ")
buf.WriteString(data)
result := buf.String()
// Return to pool
bufPool.Put(buf)
return result
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processRequest(fmt.Sprintf("请求%d", id))
_ = result
}(i)
}
wg.Wait()
}
Notes:
- Must Reset object state after Get
- Do not assume that an object Put back will necessarily be Get-able next time (GC may clear the Pool)
- Suitable for frequently created temporary objects (e.g., buffers, temporary slices)
- The standard library fmt package heavily uses sync.Pool
5. sync.Cond
Condition variable, used for multiple goroutines to wait for a certain condition to be met.
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Producer
func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // Wake up one waiter (Broadcast wakes all)
}
// Consumer
func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // Release lock and wait, re-acquire lock when awakened
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func main() {
q := NewQueue()
// Consumer
go func() {
for {
item := q.Get()
fmt.Println("消费:", item)
}
}()
// Producer
for i := 0; i < 10; i++ {
q.Put(i)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(time.Second)
}
In real-world projects, channels are more commonly used than sync.Cond, but understanding Cond helps in understanding concurrency primitives.
6. sync.WaitGroup Advanced
func main() {
var wg sync.WaitGroup
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.baidu.com",
}
results := make([]int, len(urls))
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
resp, err := http.Get(u)
if err != nil {
results[idx] = -1
return
}
defer resp.Body.Close()
results[idx] = resp.StatusCode
}(i, url)
}
wg.Wait()
for i, url := range urls {
fmt.Printf("%s -> %d\n", url, results[i])
}
}
Common errors:
// Error 1: Add inside the goroutine
go func() {
wg.Add(1) // Might execute after Wait!
defer wg.Done()
}()
wg.Wait()
// Correct: Add before starting the goroutine
wg.Add(1)
go func() {
defer wg.Done()
}()
wg.Wait()
// Error 2: Forgetting Done leads to eternal blocking
// Use defer wg.Done() to ensure execution
II. Context
1. What is Context?
Context is used to pass cancellation signals, timeout control, and request-scoped data between goroutines.
type Context interface {
Deadline() (deadline time.Time, ok bool) // Deadline
Done() <-chan struct{} // Cancellation signal channel
Err() error // Reason for Done closing
Value(key any) any // Request-scoped data
}
2. context.Background() and context.TODO()
// Background: Root context, never canceled, no values, no deadline
// Typically used in main functions, initialization, testing
ctx := context.Background()
// TODO: Placeholder when it's unclear which context to use
// If you see TODO during code review, it indicates a need for improvement
ctx := context.TODO()
3. context.WithCancel
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("worker received cancellation signal:", ctx.Err())
return
default:
fmt.Println("Working...")
time.Sleep(500 * time.Millisecond)
}
}
}(ctx)
time.Sleep(2 * time.Second)
cancel() // Send cancellation signal
time.Sleep(100 * time.Millisecond)
// Output: worker received cancellation signal: context canceled
}
4. context.WithTimeout and WithDeadline
// WithTimeout: Specify timeout duration
func fetchWithTimeout() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() // Even if not timed out, call cancel to release resources
req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("Request failed:", err) // context deadline exceeded
return
}
defer resp.Body.Close()
fmt.Println("Status code:", resp.StatusCode)
}
// WithDeadline: Specify a deadline time
func fetchWithDeadline() {
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
// Usage is the same as WithTimeout
_ = ctx
}
5. context.WithValue
type contextKey string
const (
keyUserID contextKey = "user_id"
keyRequestID contextKey = "request_id"
)
func middleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Extract information from the request, put into context
ctx := r.Context()
ctx = context.WithValue(ctx, keyRequestID, generateID())
ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
next(w, r.WithContext(ctx))
}
}
func handler(w http.ResponseWriter, r *http.Request) {
// Get values from context
reqID := r.Context().Value(keyRequestID).(string)
userID := r.Context().Value(keyUserID).(string)
fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}
Important: Keys should use custom unexported types (e.g., contextKey) to avoid key collisions between different packages.
6. Context Applications in Real-World Projects
// Database query with timeout
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
var user User
if err := row.Scan(&user.ID, &user.Name); err != nil {
return nil, err
}
return &user, nil
}
// gRPC service automatically passes context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// ctx automatically carries timeout and cancellation signals
user, err := s.repo.FindByID(ctx, req.Id)
if err != nil {
return nil, err
}
return toProto(user), nil
}
// Cascading cancellation: When parent context is canceled, all child contexts are automatically canceled
func processOrder(ctx context.Context, orderID string) error {
// Child tasks inherit parent context
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return checkInventory(ctx, orderID) })
g.Go(func() error { return chargePayment(ctx, orderID) })
g.Go(func() error { return sendNotification(ctx, orderID) })
return g.Wait() // Any failure automatically cancels others
}
7. Context Best Practices
| Rule | Description |
|---|---|
| As the first parameter | func DoSomething(ctx context.Context, ...) |
| Do not store in a struct | Context should be passed between functions, not as a field |
| Do not pass nil | Use context.Background() or context.TODO() |
| WithValue only for request-scoped data | e.g., request ID, user info, do not pass business parameters |
| Always call cancel | Even if timed out, use defer cancel() to release resources |
| Do not pass the same cancel to multiple goroutines | Whoever creates it, cancels it |
Context Propagation Chain Diagram
HTTP Request enters
│
▼
context.Background() + WithValue(requestID)
│
├──► WithTimeout(5s) ──► Query database
│
├──► WithTimeout(3s) ──► Call gRPC service
│ │
│ ├──► Sub-query 1
│ └──► Sub-query 2
│
└──► WithCancel() ──► Send notification (can be manually canceled)
// If any layer times out or is canceled, all downstream operations are automatically canceled
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6720