Go 并发模式
常见的 Go 并发设计模式,每个模式都有完整可运行示例和适用场景说明
1. Worker Pool 模式
固定数量的 worker goroutine 从共享的任务队列中取任务执行,控制并发度。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d 开始处理任务 %d\n", id, j)
time.Sleep(time.Second) // 模拟耗时操作
results <- j * 2
fmt.Printf("Worker %d 完成任务 %d\n", id, j)
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动 worker
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 关闭任务通道,worker 的 range 会结束
// 等待所有 worker 完成后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for r := range results {
fmt.Println("结果:", r)
}
}
适用场景: HTTP 请求处理、批量数据处理、限制对外部服务的并发调用数
2. Pipeline 模式
多个阶段串联,每个阶段是一组 goroutine,通过 channel 传递数据。
package main
import "fmt"
// 阶段1: 生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2: 平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段3: 加倍
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
func main() {
// 串联管道: generate -> square -> double
ch := double(square(generate(2, 3, 4)))
for v := range ch {
fmt.Println(v) // 8, 18, 32
}
}
适用场景: 数据处理流水线、ETL、日志处理管道
3. Fan-out / Fan-in 模式
- Fan-out: 多个 goroutine 从同一个 channel 读取(分摊工作)
- Fan-in: 多个 channel 的输出合并到一个 channel
package main
import (
"fmt"
"sync"
"time"
)
// 生产者
func producer(id int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < 5; i++ {
time.Sleep(50 * time.Millisecond)
out <- id*100 + i
}
close(out)
}()
return out
}
// Fan-in: 合并多个 channel 到一个
func fanIn(channels ...<-chan int) <-chan int {
merged := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// Fan-out: 3 个生产者并行工作
c1 := producer(1)
c2 := producer(2)
c3 := producer(3)
// Fan-in: 合并结果
for v := range fanIn(c1, c2, c3) {
fmt.Println(v)
}
}
适用场景: 多数据源聚合、并行 API 调用结果合并
4. errgroup 包
golang.org/x/sync/errgroup — 并发执行多个任务,任何一个出错则取消全部。
package main
import (
"context"
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.baidu.com",
}
results := make([]int, len(urls))
for i, url := range urls {
i, url := i, url // 捕获循环变量
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
results[i] = resp.StatusCode
return nil
})
}
// 等待所有任务完成或第一个错误
if err := g.Wait(); err != nil {
fmt.Println("错误:", err)
return
}
for i, url := range urls {
fmt.Printf("%s -> %d\n", url, results[i])
}
}
errgroup 限制并发数(SetLimit)
g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 个并发
for _, task := range tasks {
task := task
g.Go(func() error {
return process(task)
})
}
if err := g.Wait(); err != nil {
log.Fatal(err)
}
适用场景: 并发 API 调用、批量操作需要收集错误、需要自动取消
5. 限流器模式
5.1 time.Ticker 简单限流
package main
import (
"fmt"
"time"
)
func main() {
// 每 200ms 处理一个请求(5 QPS)
limiter := time.NewTicker(200 * time.Millisecond)
defer limiter.Stop()
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-limiter.C // 等待下一个 tick
fmt.Printf("[%s] 处理请求 %d\n", time.Now().Format("04:05.000"), req)
}
}
5.2 令牌桶限流
package main
import (
"fmt"
"sync"
"time"
)
type TokenBucket struct {
tokens chan struct{}
interval time.Duration
stop chan struct{}
}
func NewTokenBucket(rate int, burst int) *TokenBucket {
tb := &TokenBucket{
tokens: make(chan struct{}, burst),
interval: time.Second / time.Duration(rate),
stop: make(chan struct{}),
}
// 初始填满
for i := 0; i < burst; i++ {
tb.tokens <- struct{}{}
}
// 持续补充令牌
go func() {
ticker := time.NewTicker(tb.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case tb.tokens <- struct{}{}:
default: // 桶满了,丢弃
}
case <-tb.stop:
return
}
}
}()
return tb
}
func (tb *TokenBucket) Allow() bool {
select {
case <-tb.tokens:
return true
default:
return false
}
}
func (tb *TokenBucket) Wait() {
<-tb.tokens
}
func (tb *TokenBucket) Close() {
close(tb.stop)
}
func main() {
bucket := NewTokenBucket(5, 3) // 5 QPS, 突发 3
defer bucket.Close()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
bucket.Wait() // 阻塞等待令牌
fmt.Printf("[%s] 请求 %d 获得令牌\n", time.Now().Format("04:05.000"), id)
}(i)
}
wg.Wait()
}
适用场景: API 限流、防止下游过载、控制资源消耗
6. 超时与取消模式
6.1 context.WithTimeout
package main
import (
"context"
"fmt"
"time"
)
func slowOperation(ctx context.Context) (string, error) {
select {
case <-time.After(3 * time.Second): // 模拟慢操作
return "操作完成", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
// 设置 1 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := slowOperation(ctx)
if err != nil {
fmt.Println("超时或取消:", err) // context deadline exceeded
return
}
fmt.Println(result)
}
6.2 手动取消 + 多个 goroutine
func main() {
ctx, cancel := context.WithCancel(context.Background())
// 启动多个 worker
for i := 0; i < 5; i++ {
go func(id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到取消信号\n", id)
return
default:
// 工作...
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d 工作中...\n", id)
}
}
}(i)
}
time.Sleep(2 * time.Second)
cancel() // 取消所有 worker
time.Sleep(100 * time.Millisecond)
fmt.Println("所有 worker 已停止")
}
7. 生产者-消费者模式
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Order struct {
ID int
Price float64
}
func producer(orders chan<- Order, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 20; i++ {
order := Order{
ID: i,
Price: float64(rand.Intn(1000)) / 10.0,
}
orders <- order
fmt.Printf("生产订单 #%d (%.1f元)\n", order.ID, order.Price)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
}
func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
defer wg.Done()
for order := range orders {
fmt.Printf(" 消费者%d 处理订单 #%d (%.1f元)\n", id, order.ID, order.Price)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}
func main() {
orders := make(chan Order, 5) // 缓冲 5 个订单
var producerWg, consumerWg sync.WaitGroup
// 2 个生产者
for i := 0; i < 2; i++ {
producerWg.Add(1)
go producer(orders, &producerWg)
}
// 3 个消费者
for i := 1; i <= 3; i++ {
consumerWg.Add(1)
go consumer(i, orders, &consumerWg)
}
// 等生产者完成后关闭 channel
producerWg.Wait()
close(orders)
// 等消费者处理完
consumerWg.Wait()
fmt.Println("所有订单处理完毕")
}
8. Or-Done Channel 模式
从一个 channel 读取,但需要响应取消信号:
// orDone 包装一个 channel,使其可以被 done 信号中断
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-done:
return
}
}
}
}()
return out
}
func main() {
done := make(chan struct{})
data := make(chan int)
// 生产数据
go func() {
for i := 0; ; i++ {
data <- i
time.Sleep(100 * time.Millisecond)
}
}()
// 2 秒后取消
go func() {
time.Sleep(2 * time.Second)
close(done)
}()
for v := range orDone(done, data) {
fmt.Println(v)
}
fmt.Println("已取消")
}
9. Semaphore(信号量)模式
用带缓冲的 channel 模拟信号量,控制最大并发数:
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore chan struct{}
func NewSemaphore(max int) Semaphore {
return make(Semaphore, max)
}
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
func main() {
sem := NewSemaphore(3) // 最多 3 个并发
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
fmt.Printf("[%s] 任务 %d 开始\n", time.Now().Format("05.000"), id)
time.Sleep(time.Second)
fmt.Printf("[%s] 任务 %d 完成\n", time.Now().Format("05.000"), id)
}(i)
}
wg.Wait()
}
模式速查表
| 模式 | 核心思想 | 典型场景 |
|---|---|---|
| Worker Pool | 固定 goroutine 数消费任务队列 | 批量处理、限并发 |
| Pipeline | 多阶段 channel 串联 | 数据处理流水线 |
| Fan-in/Fan-out | 拆分并行+合并结果 | 多数据源聚合 |
| errgroup | 并发+错误收集+自动取消 | 并行 API 调用 |
| 令牌桶 | 控制请求速率 | API 限流 |
| Context超时/取消 | 传播取消信号 | RPC/HTTP 超时控制 |
| 生产者-消费者 | 解耦生产和消费速率 | 消息队列、任务调度 |
| Or-Done | 可中断的 channel 读取 | 需要优雅退出的管道 |
| Semaphore | 信号量控制并发数 | 连接池、资源限制 |
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6721