并发进阶:sync 包与 Context
一、sync 包详解
1. sync.Mutex 与 sync.RWMutex
// Mutex: 互斥锁,同一时间只有一个 goroutine 能持有
var mu sync.Mutex
var count int
func increment() {
mu.Lock()
defer mu.Unlock()
count++
}
// RWMutex: 读写锁,允许多个读,但写是排他的
var rwmu sync.RWMutex
var data map[string]string
func read(key string) string {
rwmu.RLock() // 读锁,多个 goroutine 可同时持有
defer rwmu.RUnlock()
return data[key]
}
func write(key, val string) {
rwmu.Lock() // 写锁,排他
defer rwmu.Unlock()
data[key] = val
}
何时用 RWMutex? 读多写少的场景(如缓存、配置)。如果读写差不多,Mutex 就够了,RWMutex 有额外开销。
2. sync.Once
保证某个操作只执行一次,常用于单例初始化。
var (
instance *Database
once sync.Once
)
func GetDB() *Database {
once.Do(func() {
// 无论多少 goroutine 同时调用,只执行一次
instance = &Database{
conn: connectDB(),
}
fmt.Println("数据库初始化完成")
})
return instance
}
func main() {
// 并发调用,只初始化一次
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDB()
_ = db
}()
}
wg.Wait()
}
3. sync.Map
并发安全的 Map,无需额外加锁。
func main() {
var m sync.Map
// 存储
m.Store("name", "Alice")
m.Store("age", 30)
// 读取
val, ok := m.Load("name")
if ok {
fmt.Println(val) // Alice
}
// 读取或存储(key不存在时存储)
actual, loaded := m.LoadOrStore("name", "Bob")
fmt.Println(actual, loaded) // Alice true (已存在,未存储)
actual2, loaded2 := m.LoadOrStore("city", "Beijing")
fmt.Println(actual2, loaded2) // Beijing false (新存储的)
// 删除
m.Delete("age")
// 遍历
m.Range(func(key, value any) bool {
fmt.Printf("%s: %v\n", key, value)
return true // 返回 false 停止遍历
})
// LoadAndDelete: 读取并删除(Go 1.15+)
val3, loaded3 := m.LoadAndDelete("city")
fmt.Println(val3, loaded3) // Beijing true
}
sync.Map vs map+Mutex:
| 场景 | 推荐 |
|---|---|
| key 相对固定,读多写少 | sync.Map |
| 频繁增删 key | map + Mutex/RWMutex |
| 需要 len() 或遍历性能 | map + Mutex/RWMutex |
| 不同 goroutine 操作不同的 key | sync.Map |
4. sync.Pool
临时对象池,减少内存分配和 GC 压力。对象可能在任何时候被 GC 回收。
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer) // 当池为空时创建新对象
},
}
func processRequest(data string) string {
// 从池中获取
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset() // 重置状态!非常重要
// 使用
buf.WriteString("处理: ")
buf.WriteString(data)
result := buf.String()
// 归还到池中
bufPool.Put(buf)
return result
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processRequest(fmt.Sprintf("请求%d", id))
_ = result
}(i)
}
wg.Wait()
}
注意事项:
- Get 后务必 Reset 对象状态
- 不要假设 Put 的对象下次一定能 Get 到(GC 会清空 Pool)
- 适合频繁创建的临时对象(如 buffer、临时 slice)
- 标准库 fmt 包就大量使用 sync.Pool
5. sync.Cond
条件变量,用于多个 goroutine 等待某个条件满足。
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
// 生产者
func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 唤醒一个等待者(Broadcast 唤醒所有)
}
// 消费者
func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 释放锁并等待,被唤醒时重新获取锁
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func main() {
q := NewQueue()
// 消费者
go func() {
for {
item := q.Get()
fmt.Println("消费:", item)
}
}()
// 生产者
for i := 0; i < 10; i++ {
q.Put(i)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(time.Second)
}
实际项目中 channel 比 sync.Cond 更常用,但理解 Cond 有助于理解并发原语。
6. sync.WaitGroup 进阶
func main() {
var wg sync.WaitGroup
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.baidu.com",
}
results := make([]int, len(urls))
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
resp, err := http.Get(u)
if err != nil {
results[idx] = -1
return
}
defer resp.Body.Close()
results[idx] = resp.StatusCode
}(i, url)
}
wg.Wait()
for i, url := range urls {
fmt.Printf("%s -> %d\n", url, results[i])
}
}
常见错误:
// 错误1:在 goroutine 内部 Add
go func() {
wg.Add(1) // 可能在 Wait 之后才执行!
defer wg.Done()
}()
wg.Wait()
// 正确:在启动 goroutine 前 Add
wg.Add(1)
go func() {
defer wg.Done()
}()
wg.Wait()
// 错误2:忘记 Done 导致永远阻塞
// 用 defer wg.Done() 确保一定执行
二、Context 上下文
1. Context 是什么?
Context 用于在 goroutine 之间传递取消信号、超时控制和请求级别数据。
type Context interface {
Deadline() (deadline time.Time, ok bool) // 截止时间
Done() <-chan struct{} // 取消信号 channel
Err() error // Done 关闭的原因
Value(key any) any // 请求级别的数据
}
2. context.Background() 和 context.TODO()
// Background: 根 context,永不取消,没有值,没有截止时间
// 通常用于 main 函数、初始化、测试
ctx := context.Background()
// TODO: 当不确定该用什么 context 时的占位符
// 代码审查时如果看到 TODO,说明需要改进
ctx := context.TODO()
3. context.WithCancel
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("worker 收到取消信号:", ctx.Err())
return
default:
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}(ctx)
time.Sleep(2 * time.Second)
cancel() // 发送取消信号
time.Sleep(100 * time.Millisecond)
// 输出: worker 收到取消信号: context canceled
}
4. context.WithTimeout 和 WithDeadline
// WithTimeout: 指定超时时长
func fetchWithTimeout() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() // 即使没超时也要调用 cancel 释放资源
req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("请求失败:", err) // context deadline exceeded
return
}
defer resp.Body.Close()
fmt.Println("状态码:", resp.StatusCode)
}
// WithDeadline: 指定截止时间点
func fetchWithDeadline() {
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
// 用法与 WithTimeout 相同
_ = ctx
}
5. context.WithValue
type contextKey string
const (
keyUserID contextKey = "user_id"
keyRequestID contextKey = "request_id"
)
func middleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 从请求中提取信息,放入 context
ctx := r.Context()
ctx = context.WithValue(ctx, keyRequestID, generateID())
ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
next(w, r.WithContext(ctx))
}
}
func handler(w http.ResponseWriter, r *http.Request) {
// 从 context 中取值
reqID := r.Context().Value(keyRequestID).(string)
userID := r.Context().Value(keyUserID).(string)
fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}
重要: key 应该用自定义的未导出类型(如 contextKey),避免不同包的 key 冲突。
6. Context 在实际项目中的应用
// 数据库查询带超时
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
var user User
if err := row.Scan(&user.ID, &user.Name); err != nil {
return nil, err
}
return &user, nil
}
// gRPC 服务自动传递 context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// ctx 自动携带了超时和取消信号
user, err := s.repo.FindByID(ctx, req.Id)
if err != nil {
return nil, err
}
return toProto(user), nil
}
// 级联取消:父 context 取消时,所有子 context 自动取消
func processOrder(ctx context.Context, orderID string) error {
// 子任务继承父 context
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return checkInventory(ctx, orderID) })
g.Go(func() error { return chargePayment(ctx, orderID) })
g.Go(func() error { return sendNotification(ctx, orderID) })
return g.Wait() // 任一失败自动取消其他
}
7. Context 最佳实践
| 规则 | 说明 |
|---|---|
| 作为第一个参数 | func DoSomething(ctx context.Context, ...) |
| 不要存储在 struct 中 | Context 应该在函数间传递,不要作为字段 |
| 不要传 nil | 用 context.Background() 或 context.TODO() |
| WithValue 只传请求级别数据 | 如 request ID、用户信息,不要传业务参数 |
| 总是调用 cancel | 即使超时也要 defer cancel() 释放资源 |
| 不要在多个 goroutine 中传同一个 cancel | 谁创建谁取消 |
Context 传播链路示意
HTTP Request 进入
│
▼
context.Background() + WithValue(requestID)
│
├──► WithTimeout(5s) ──► 查数据库
│
├──► WithTimeout(3s) ──► 调 gRPC 服务
│ │
│ ├──► 子查询1
│ └──► 子查询2
│
└──► WithCancel() ──► 发通知(可手动取消)
// 任何一层超时或取消,下游全部自动取消
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6720