← 返回
后端开发 2026.03.06

编程基础 0006_并发进阶_sync包与Context

后端开发

一、sync 包详解

1. sync.Mutex 与 sync.RWMutex

// Mutex: 互斥锁,同一时间只有一个 goroutine 能持有
var mu sync.Mutex
var count int

func increment() {
    mu.Lock()
    defer mu.Unlock()
    count++
}

// RWMutex: 读写锁,允许多个读,但写是排他的
var rwmu sync.RWMutex
var data map[string]string

func read(key string) string {
    rwmu.RLock()         // 读锁,多个 goroutine 可同时持有
    defer rwmu.RUnlock()
    return data[key]
}

func write(key, val string) {
    rwmu.Lock()          // 写锁,排他
    defer rwmu.Unlock()
    data[key] = val
}

何时用 RWMutex? 读多写少的场景(如缓存、配置)。如果读写差不多,Mutex 就够了,RWMutex 有额外开销。

2. sync.Once

保证某个操作只执行一次,常用于单例初始化。

var (
    instance *Database
    once     sync.Once
)

func GetDB() *Database {
    once.Do(func() {
        // 无论多少 goroutine 同时调用,只执行一次
        instance = &Database{
            conn: connectDB(),
        }
        fmt.Println("数据库初始化完成")
    })
    return instance
}

func main() {
    // 并发调用,只初始化一次
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDB()
            _ = db
        }()
    }
    wg.Wait()
}

3. sync.Map

并发安全的 Map,无需额外加锁。

func main() {
    var m sync.Map

    // 存储
    m.Store("name", "Alice")
    m.Store("age", 30)

    // 读取
    val, ok := m.Load("name")
    if ok {
        fmt.Println(val) // Alice
    }

    // 读取或存储(key不存在时存储)
    actual, loaded := m.LoadOrStore("name", "Bob")
    fmt.Println(actual, loaded) // Alice true (已存在,未存储)

    actual2, loaded2 := m.LoadOrStore("city", "Beijing")
    fmt.Println(actual2, loaded2) // Beijing false (新存储的)

    // 删除
    m.Delete("age")

    // 遍历
    m.Range(func(key, value any) bool {
        fmt.Printf("%s: %v\n", key, value)
        return true // 返回 false 停止遍历
    })

    // LoadAndDelete: 读取并删除(Go 1.15+)
    val3, loaded3 := m.LoadAndDelete("city")
    fmt.Println(val3, loaded3) // Beijing true
}

sync.Map vs map+Mutex:

场景推荐
key 相对固定,读多写少sync.Map
频繁增删 keymap + Mutex/RWMutex
需要 len() 或遍历性能map + Mutex/RWMutex
不同 goroutine 操作不同的 keysync.Map

4. sync.Pool

临时对象池,减少内存分配和 GC 压力。对象可能在任何时候被 GC 回收。

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer) // 当池为空时创建新对象
    },
}

func processRequest(data string) string {
    // 从池中获取
    buf := bufPool.Get().(*bytes.Buffer)
    buf.Reset() // 重置状态!非常重要

    // 使用
    buf.WriteString("处理: ")
    buf.WriteString(data)
    result := buf.String()

    // 归还到池中
    bufPool.Put(buf)

    return result
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processRequest(fmt.Sprintf("请求%d", id))
            _ = result
        }(i)
    }
    wg.Wait()
}

注意事项: - Get 后务必 Reset 对象状态 - 不要假设 Put 的对象下次一定能 Get 到(GC 会清空 Pool) - 适合频繁创建的临时对象(如 buffer、临时 slice) - 标准库 fmt 包就大量使用 sync.Pool

5. sync.Cond

条件变量,用于多个 goroutine 等待某个条件满足。

type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    return &Queue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// 生产者
func (q *Queue) Put(item int) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.items = append(q.items, item)
    q.cond.Signal() // 唤醒一个等待者(Broadcast 唤醒所有)
}

// 消费者
func (q *Queue) Get() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.items) == 0 {
        q.cond.Wait() // 释放锁并等待,被唤醒时重新获取锁
    }
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    q := NewQueue()

    // 消费者
    go func() {
        for {
            item := q.Get()
            fmt.Println("消费:", item)
        }
    }()

    // 生产者
    for i := 0; i < 10; i++ {
        q.Put(i)
        time.Sleep(200 * time.Millisecond)
    }
    time.Sleep(time.Second)
}

实际项目中 channel 比 sync.Cond 更常用,但理解 Cond 有助于理解并发原语。

6. sync.WaitGroup 进阶

func main() {
    var wg sync.WaitGroup

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                results[idx] = -1
                return
            }
            defer resp.Body.Close()
            results[idx] = resp.StatusCode
        }(i, url)
    }

    wg.Wait()
    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

常见错误:

// 错误1:在 goroutine 内部 Add
go func() {
    wg.Add(1) // 可能在 Wait 之后才执行!
    defer wg.Done()
}()
wg.Wait()

// 正确:在启动 goroutine 前 Add
wg.Add(1)
go func() {
    defer wg.Done()
}()
wg.Wait()

// 错误2:忘记 Done 导致永远阻塞
// 用 defer wg.Done() 确保一定执行

二、Context 上下文

1. Context 是什么?

Context 用于在 goroutine 之间传递取消信号超时控制请求级别数据

type Context interface {
    Deadline() (deadline time.Time, ok bool) // 截止时间
    Done() <-chan struct{}                    // 取消信号 channel
    Err() error                               // Done 关闭的原因
    Value(key any) any                        // 请求级别的数据
}

2. context.Background() 和 context.TODO()

// Background: 根 context,永不取消,没有值,没有截止时间
// 通常用于 main 函数、初始化、测试
ctx := context.Background()

// TODO: 当不确定该用什么 context 时的占位符
// 代码审查时如果看到 TODO,说明需要改进
ctx := context.TODO()

3. context.WithCancel

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("worker 收到取消信号:", ctx.Err())
                return
            default:
                fmt.Println("工作中...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }(ctx)

    time.Sleep(2 * time.Second)
    cancel() // 发送取消信号
    time.Sleep(100 * time.Millisecond)
    // 输出: worker 收到取消信号: context canceled
}

4. context.WithTimeout 和 WithDeadline

// WithTimeout: 指定超时时长
func fetchWithTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel() // 即使没超时也要调用 cancel 释放资源

    req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        fmt.Println("请求失败:", err) // context deadline exceeded
        return
    }
    defer resp.Body.Close()
    fmt.Println("状态码:", resp.StatusCode)
}

// WithDeadline: 指定截止时间点
func fetchWithDeadline() {
    deadline := time.Now().Add(2 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    // 用法与 WithTimeout 相同
    _ = ctx
}

5. context.WithValue

type contextKey string

const (
    keyUserID    contextKey = "user_id"
    keyRequestID contextKey = "request_id"
)

func middleware(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 从请求中提取信息,放入 context
        ctx := r.Context()
        ctx = context.WithValue(ctx, keyRequestID, generateID())
        ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
        next(w, r.WithContext(ctx))
    }
}

func handler(w http.ResponseWriter, r *http.Request) {
    // 从 context 中取值
    reqID := r.Context().Value(keyRequestID).(string)
    userID := r.Context().Value(keyUserID).(string)
    fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}

重要: key 应该用自定义的未导出类型(如 contextKey),避免不同包的 key 冲突。

6. Context 在实际项目中的应用

// 数据库查询带超时
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
    var user User
    if err := row.Scan(&user.ID, &user.Name); err != nil {
        return nil, err
    }
    return &user, nil
}

// gRPC 服务自动传递 context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    // ctx 自动携带了超时和取消信号
    user, err := s.repo.FindByID(ctx, req.Id)
    if err != nil {
        return nil, err
    }
    return toProto(user), nil
}

// 级联取消:父 context 取消时,所有子 context 自动取消
func processOrder(ctx context.Context, orderID string) error {
    // 子任务继承父 context
    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error { return checkInventory(ctx, orderID) })
    g.Go(func() error { return chargePayment(ctx, orderID) })
    g.Go(func() error { return sendNotification(ctx, orderID) })

    return g.Wait() // 任一失败自动取消其他
}

7. Context 最佳实践

规则说明
作为第一个参数func DoSomething(ctx context.Context, ...)
不要存储在 struct 中Context 应该在函数间传递,不要作为字段
不要传 nilcontext.Background()context.TODO()
WithValue 只传请求级别数据如 request ID、用户信息,不要传业务参数
总是调用 cancel即使超时也要 defer cancel() 释放资源
不要在多个 goroutine 中传同一个 cancel谁创建谁取消

Context 传播链路示意

HTTP Request 进入


context.Background() + WithValue(requestID)

    ├──► WithTimeout(5s) ──► 查数据库

    ├──► WithTimeout(3s) ──► 调 gRPC 服务
    │                            │
    │                            ├──► 子查询1
    │                            └──► 子查询2

    └──► WithCancel() ──► 发通知(可手动取消)

// 任何一层超时或取消,下游全部自动取消