Go 併發模式
常見的 Go 併發設計模式,每個模式都有完整可運行示例和適用場景說明
1. Worker Pool 模式
固定數量的 worker goroutine 從共享的任務隊列中取任務執行,控制併發度。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d 開始處理任務 %d\n", id, j)
time.Sleep(time.Second) // 模擬耗時操作
results <- j * 2
fmt.Printf("Worker %d 完成任務 %d\n", id, j)
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 啓動 worker
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 發送任務
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 關閉任務通道,worker 的 range 會結束
// 等待所有 worker 完成後關閉 results
go func() {
wg.Wait()
close(results)
}()
// 收集結果
for r := range results {
fmt.Println("結果:", r)
}
}
適用場景: HTTP 請求處理、批量數據處理、限制對外部服務的併發調用數
2. Pipeline 模式
多個階段串聯,每個階段是一組 goroutine,通過 channel 傳遞數據。
package main
import "fmt"
// 階段1: 生成數據
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 階段2: 平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 階段3: 加倍
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
func main() {
// 串聯管道: generate -> square -> double
ch := double(square(generate(2, 3, 4)))
for v := range ch {
fmt.Println(v) // 8, 18, 32
}
}
適用場景: 數據處理流水線、ETL、日誌處理管道
3. Fan-out / Fan-in 模式
- Fan-out: 多個 goroutine 從同一個 channel 讀取(分攤工作)
- Fan-in: 多個 channel 的輸出合併到一個 channel
package main
import (
"fmt"
"sync"
"time"
)
// 生產者
func producer(id int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < 5; i++ {
time.Sleep(50 * time.Millisecond)
out <- id*100 + i
}
close(out)
}()
return out
}
// Fan-in: 合併多個 channel 到一個
func fanIn(channels ...<-chan int) <-chan int {
merged := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// Fan-out: 3 個生產者並行工作
c1 := producer(1)
c2 := producer(2)
c3 := producer(3)
// Fan-in: 合併結果
for v := range fanIn(c1, c2, c3) {
fmt.Println(v)
}
}
適用場景: 多數據源聚合、並行 API 調用結果合併
4. errgroup 包
golang.org/x/sync/errgroup — 併發執行多個任務,任何一個出錯則取消全部。
package main
import (
"context"
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.baidu.com",
}
results := make([]int, len(urls))
for i, url := range urls {
i, url := i, url // 捕獲循環變量
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
results[i] = resp.StatusCode
return nil
})
}
// 等待所有任務完成或第一個錯誤
if err := g.Wait(); err != nil {
fmt.Println("錯誤:", err)
return
}
for i, url := range urls {
fmt.Printf("%s -> %d\n", url, results[i])
}
}
errgroup 限制併發數(SetLimit)
g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 個併發
for _, task := range tasks {
task := task
g.Go(func() error {
return process(task)
})
}
if err := g.Wait(); err != nil {
log.Fatal(err)
}
適用場景: 併發 API 調用、批量操作需要收集錯誤、需要自動取消
5. 限流器模式
5.1 time.Ticker 簡單限流
package main
import (
"fmt"
"time"
)
func main() {
// 每 200ms 處理一個請求(5 QPS)
limiter := time.NewTicker(200 * time.Millisecond)
defer limiter.Stop()
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-limiter.C // 等待下一個 tick
fmt.Printf("[%s] 處理請求 %d\n", time.Now().Format("04:05.000"), req)
}
}
5.2 令牌桶限流
package main
import (
"fmt"
"sync"
"time"
)
type TokenBucket struct {
tokens chan struct{}
interval time.Duration
stop chan struct{}
}
func NewTokenBucket(rate int, burst int) *TokenBucket {
tb := &TokenBucket{
tokens: make(chan struct{}, burst),
interval: time.Second / time.Duration(rate),
stop: make(chan struct{}),
}
// 初始填滿
for i := 0; i < burst; i++ {
tb.tokens <- struct{}{}
}
// 持續補充令牌
go func() {
ticker := time.NewTicker(tb.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case tb.tokens <- struct{}{}:
default: // 桶滿了,丟棄
}
case <-tb.stop:
return
}
}
}()
return tb
}
func (tb *TokenBucket) Allow() bool {
select {
case <-tb.tokens:
return true
default:
return false
}
}
func (tb *TokenBucket) Wait() {
<-tb.tokens
}
func (tb *TokenBucket) Close() {
close(tb.stop)
}
func main() {
bucket := NewTokenBucket(5, 3) // 5 QPS, 突發 3
defer bucket.Close()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
bucket.Wait() // 阻塞等待令牌
fmt.Printf("[%s] 請求 %d 獲得令牌\n", time.Now().Format("04:05.000"), id)
}(i)
}
wg.Wait()
}
適用場景: API 限流、防止下游過載、控制資源消耗
6. 超時與取消模式
6.1 context.WithTimeout
package main
import (
"context"
"fmt"
"time"
)
func slowOperation(ctx context.Context) (string, error) {
select {
case <-time.After(3 * time.Second): // 模擬慢操作
return "操作完成", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
// 設置 1 秒超時
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := slowOperation(ctx)
if err != nil {
fmt.Println("超時或取消:", err) // context deadline exceeded
return
}
fmt.Println(result)
}
6.2 手動取消 + 多個 goroutine
func main() {
ctx, cancel := context.WithCancel(context.Background())
// 啓動多個 worker
for i := 0; i < 5; i++ {
go func(id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到取消信號\n", id)
return
default:
// 工作...
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d 工作中...\n", id)
}
}
}(i)
}
time.Sleep(2 * time.Second)
cancel() // 取消所有 worker
time.Sleep(100 * time.Millisecond)
fmt.Println("所有 worker 已停止")
}
7. 生產者-消費者模式
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Order struct {
ID int
Price float64
}
func producer(orders chan<- Order, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 20; i++ {
order := Order{
ID: i,
Price: float64(rand.Intn(1000)) / 10.0,
}
orders <- order
fmt.Printf("生產訂單 #%d (%.1f元)\n", order.ID, order.Price)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
}
func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
defer wg.Done()
for order := range orders {
fmt.Printf(" 消費者%d 處理訂單 #%d (%.1f元)\n", id, order.ID, order.Price)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}
func main() {
orders := make(chan Order, 5) // 緩衝 5 個訂單
var producerWg, consumerWg sync.WaitGroup
// 2 個生產者
for i := 0; i < 2; i++ {
producerWg.Add(1)
go producer(orders, &producerWg)
}
// 3 個消費者
for i := 1; i <= 3; i++ {
consumerWg.Add(1)
go consumer(i, orders, &consumerWg)
}
// 等生產者完成後關閉 channel
producerWg.Wait()
close(orders)
// 等消費者處理完
consumerWg.Wait()
fmt.Println("所有訂單處理完畢")
}
8. Or-Done Channel 模式
從一個 channel 讀取,但需要響應取消信號:
// orDone 包裝一個 channel,使其可以被 done 信號中斷
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-done:
return
}
}
}
}()
return out
}
func main() {
done := make(chan struct{})
data := make(chan int)
// 生產數據
go func() {
for i := 0; ; i++ {
data <- i
time.Sleep(100 * time.Millisecond)
}
}()
// 2 秒後取消
go func() {
time.Sleep(2 * time.Second)
close(done)
}()
for v := range orDone(done, data) {
fmt.Println(v)
}
fmt.Println("已取消")
}
9. Semaphore(信號量)模式
用帶緩衝的 channel 模擬信號量,控制最大併發數:
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore chan struct{}
func NewSemaphore(max int) Semaphore {
return make(Semaphore, max)
}
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
func main() {
sem := NewSemaphore(3) // 最多 3 個併發
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
fmt.Printf("[%s] 任務 %d 開始\n", time.Now().Format("05.000"), id)
time.Sleep(time.Second)
fmt.Printf("[%s] 任務 %d 完成\n", time.Now().Format("05.000"), id)
}(i)
}
wg.Wait()
}
模式速查表
| 模式 | 核心思想 | 典型場景 |
|---|---|---|
| Worker Pool | 固定 goroutine 數消費任務隊列 | 批量處理、限併發 |
| Pipeline | 多階段 channel 串聯 | 數據處理流水線 |
| Fan-in/Fan-out | 拆分並行+合併結果 | 多數據源聚合 |
| errgroup | 併發+錯誤收集+自動取消 | 並行 API 調用 |
| 令牌桶 | 控制請求速率 | API 限流 |
| Context超時/取消 | 傳播取消信號 | RPC/HTTP 超時控制 |
| 生產者-消費者 | 解耦生產和消費速率 | 消息隊列、任務調度 |
| Or-Done | 可中斷的 channel 讀取 | 需要優雅退出的管道 |
| Semaphore | 信號量控制併發數 | 連接池、資源限制 |
主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6721