併發進階:sync 包與 Context
一、sync 包詳解
1. sync.Mutex 與 sync.RWMutex
// Mutex: 互斥鎖,同一時間只有一個 goroutine 能持有
var mu sync.Mutex
var count int
func increment() {
mu.Lock()
defer mu.Unlock()
count++
}
// RWMutex: 讀寫鎖,允許多個讀,但寫是排他的
var rwmu sync.RWMutex
var data map[string]string
func read(key string) string {
rwmu.RLock() // 讀鎖,多個 goroutine 可同時持有
defer rwmu.RUnlock()
return data[key]
}
func write(key, val string) {
rwmu.Lock() // 寫鎖,排他
defer rwmu.Unlock()
data[key] = val
}
何時用 RWMutex? 讀多寫少的場景(如緩存、配置)。如果讀寫差不多,Mutex 就夠了,RWMutex 有額外開銷。
2. sync.Once
保證某個操作只執行一次,常用於單例初始化。
var (
instance *Database
once sync.Once
)
func GetDB() *Database {
once.Do(func() {
// 無論多少 goroutine 同時調用,只執行一次
instance = &Database{
conn: connectDB(),
}
fmt.Println("數據庫初始化完成")
})
return instance
}
func main() {
// 併發調用,只初始化一次
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDB()
_ = db
}()
}
wg.Wait()
}
3. sync.Map
併發安全的 Map,無需額外加鎖。
func main() {
var m sync.Map
// 存儲
m.Store("name", "Alice")
m.Store("age", 30)
// 讀取
val, ok := m.Load("name")
if ok {
fmt.Println(val) // Alice
}
// 讀取或存儲(key不存在時存儲)
actual, loaded := m.LoadOrStore("name", "Bob")
fmt.Println(actual, loaded) // Alice true (已存在,未存儲)
actual2, loaded2 := m.LoadOrStore("city", "Beijing")
fmt.Println(actual2, loaded2) // Beijing false (新存儲的)
// 刪除
m.Delete("age")
// 遍歷
m.Range(func(key, value any) bool {
fmt.Printf("%s: %v\n", key, value)
return true // 返回 false 停止遍歷
})
// LoadAndDelete: 讀取並刪除(Go 1.15+)
val3, loaded3 := m.LoadAndDelete("city")
fmt.Println(val3, loaded3) // Beijing true
}
sync.Map vs map+Mutex:
| 場景 | 推薦 |
|---|---|
| key 相對固定,讀多寫少 | sync.Map |
| 頻繁增刪 key | map + Mutex/RWMutex |
| 需要 len() 或遍歷性能 | map + Mutex/RWMutex |
| 不同 goroutine 操作不同的 key | sync.Map |
4. sync.Pool
臨時對象池,減少內存分配和 GC 壓力。對象可能在任何時候被 GC 回收。
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer) // 當池為空時創建新對象
},
}
func processRequest(data string) string {
// 從池中獲取
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset() // 重置狀態!非常重要
// 使用
buf.WriteString("處理: ")
buf.WriteString(data)
result := buf.String()
// 歸還到池中
bufPool.Put(buf)
return result
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processRequest(fmt.Sprintf("請求%d", id))
_ = result
}(i)
}
wg.Wait()
}
注意事項:
- Get 後務必 Reset 對象狀態
- 不要假設 Put 的對象下次一定能 Get 到(GC 會清空 Pool)
- 適合頻繁創建的臨時對象(如 buffer、臨時 slice)
- 標準庫 fmt 包就大量使用 sync.Pool
5. sync.Cond
條件變量,用於多個 goroutine 等待某個條件滿足。
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
// 生產者
func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 喚醒一個等待者(Broadcast 喚醒所有)
}
// 消費者
func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 釋放鎖並等待,被喚醒時重新獲取鎖
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func main() {
q := NewQueue()
// 消費者
go func() {
for {
item := q.Get()
fmt.Println("消費:", item)
}
}()
// 生產者
for i := 0; i < 10; i++ {
q.Put(i)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(time.Second)
}
實際項目中 channel 比 sync.Cond 更常用,但理解 Cond 有助於理解併發原語。
6. sync.WaitGroup 進階
func main() {
var wg sync.WaitGroup
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.baidu.com",
}
results := make([]int, len(urls))
for i, url := range urls {
wg.Add(1)
go func(idx int, u string) {
defer wg.Done()
resp, err := http.Get(u)
if err != nil {
results[idx] = -1
return
}
defer resp.Body.Close()
results[idx] = resp.StatusCode
}(i, url)
}
wg.Wait()
for i, url := range urls {
fmt.Printf("%s -> %d\n", url, results[i])
}
}
常見錯誤:
// 錯誤1:在 goroutine 內部 Add
go func() {
wg.Add(1) // 可能在 Wait 之後才執行!
defer wg.Done()
}()
wg.Wait()
// 正確:在啓動 goroutine 前 Add
wg.Add(1)
go func() {
defer wg.Done()
}()
wg.Wait()
// 錯誤2:忘記 Done 導致永遠阻塞
// 用 defer wg.Done() 確保一定執行
二、Context 上下文
1. Context 是甚麼?
Context 用於在 goroutine 之間傳遞取消信號、超時控制和請求級別數據。
type Context interface {
Deadline() (deadline time.Time, ok bool) // 截止時間
Done() <-chan struct{} // 取消信號 channel
Err() error // Done 關閉的原因
Value(key any) any // 請求級別的數據
}
2. context.Background() 和 context.TODO()
// Background: 根 context,永不取消,沒有值,沒有截止時間
// 通常用於 main 函數、初始化、測試
ctx := context.Background()
// TODO: 當不確定該用甚麼 context 時的佔位符
// 代碼審查時如果看到 TODO,說明需要改進
ctx := context.TODO()
3. context.WithCancel
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("worker 收到取消信號:", ctx.Err())
return
default:
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}(ctx)
time.Sleep(2 * time.Second)
cancel() // 發送取消信號
time.Sleep(100 * time.Millisecond)
// 輸出: worker 收到取消信號: context canceled
}
4. context.WithTimeout 和 WithDeadline
// WithTimeout: 指定超時時長
func fetchWithTimeout() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() // 即使沒超時也要調用 cancel 釋放資源
req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("請求失敗:", err) // context deadline exceeded
return
}
defer resp.Body.Close()
fmt.Println("狀態碼:", resp.StatusCode)
}
// WithDeadline: 指定截止時間點
func fetchWithDeadline() {
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
// 用法與 WithTimeout 相同
_ = ctx
}
5. context.WithValue
type contextKey string
const (
keyUserID contextKey = "user_id"
keyRequestID contextKey = "request_id"
)
func middleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 從請求中提取信息,放入 context
ctx := r.Context()
ctx = context.WithValue(ctx, keyRequestID, generateID())
ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
next(w, r.WithContext(ctx))
}
}
func handler(w http.ResponseWriter, r *http.Request) {
// 從 context 中取值
reqID := r.Context().Value(keyRequestID).(string)
userID := r.Context().Value(keyUserID).(string)
fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}
重要: key 應該用自定義的未導出類型(如 contextKey),避免不同包的 key 衝突。
6. Context 在實際項目中的應用
// 數據庫查詢帶超時
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
var user User
if err := row.Scan(&user.ID, &user.Name); err != nil {
return nil, err
}
return &user, nil
}
// gRPC 服務自動傳遞 context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// ctx 自動攜帶了超時和取消信號
user, err := s.repo.FindByID(ctx, req.Id)
if err != nil {
return nil, err
}
return toProto(user), nil
}
// 級聯取消:父 context 取消時,所有子 context 自動取消
func processOrder(ctx context.Context, orderID string) error {
// 子任務繼承父 context
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return checkInventory(ctx, orderID) })
g.Go(func() error { return chargePayment(ctx, orderID) })
g.Go(func() error { return sendNotification(ctx, orderID) })
return g.Wait() // 任一失敗自動取消其他
}
7. Context 最佳實踐
| 規則 | 說明 |
|---|---|
| 作為第一個參數 | func DoSomething(ctx context.Context, ...) |
| 不要存儲在 struct 中 | Context 應該在函數間傳遞,不要作為字段 |
| 不要傳 nil | 用 context.Background() 或 context.TODO() |
| WithValue 只傳請求級別數據 | 如 request ID、用戶信息,不要傳業務參數 |
| 總是調用 cancel | 即使超時也要 defer cancel() 釋放資源 |
| 不要在多個 goroutine 中傳同一個 cancel | 誰創建誰取消 |
Context 傳播鏈路示意
HTTP Request 進入
│
▼
context.Background() + WithValue(requestID)
│
├──► WithTimeout(5s) ──► 查數據庫
│
├──► WithTimeout(3s) ──► 調 gRPC 服務
│ │
│ ├──► 子查詢1
│ └──► 子查詢2
│
└──► WithCancel() ──► 發通知(可手動取消)
// 任何一層超時或取消,下游全部自動取消
主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6720