Go Concurrency Patterns
Common Go concurrency design patterns, each with complete runnable examples and descriptions of applicable scenarios
1. Worker Pool Pattern
A fixed number of worker goroutines fetch and execute tasks from a shared task queue, controlling the degree of concurrency.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d 开始处理任务 %d\n", id, j)
time.Sleep(time.Second) // 模拟耗时操作
results <- j * 2
fmt.Printf("Worker %d 完成任务 %d\n", id, j)
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动 worker
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 关闭任务通道,worker 的 range 会结束
// 等待所有 worker 完成后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for r := range results {
fmt.Println("结果:", r)
}
}
Applicable Scenarios: HTTP request processing, batch data processing, limiting concurrent calls to external services
2. Pipeline Pattern
Multiple stages are chained together, with each stage being a group of goroutines passing data via channels.
package main
import "fmt"
// 阶段1: 生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2: 平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段3: 加倍
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
func main() {
// 串联管道: generate -> square -> double
ch := double(square(generate(2, 3, 4)))
for v := range ch {
fmt.Println(v) // 8, 18, 32
}
}
Applicable Scenarios: Data processing pipelines, ETL, log processing pipelines
3. Fan-out / Fan-in Pattern
- Fan-out: Multiple goroutines read from the same channel (distributing work)
- Fan-in: The outputs of multiple channels are merged into one channel
package main
import (
"fmt"
"sync"
"time"
)
// 生产者
func producer(id int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < 5; i++ {
time.Sleep(50 * time.Millisecond)
out <- id*100 + i
}
close(out)
}()
return out
}
// Fan-in: 合并多个 channel 到一个
func fanIn(channels ...<-chan int) <-chan int {
merged := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// Fan-out: 3 个生产者并行工作
c1 := producer(1)
c2 := producer(2)
c3 := producer(3)
// Fan-in: 合并结果
for v := range fanIn(c1, c2, c3) {
fmt.Println(v)
}
}
Applicable Scenarios: Aggregation of multiple data sources, merging results of parallel API calls
4. errgroup Package
golang.org/x/sync/errgroup — Executes multiple tasks concurrently, canceling all if any one fails.
package main
import (
"context"
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.baidu.com",
}
results := make([]int, len(urls))
for i, url := range urls {
i, url := i, url // 捕获循环变量
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
results[i] = resp.StatusCode
return nil
})
}
// 等待所有任务完成或第一个错误
if err := g.Wait(); err != nil {
fmt.Println("错误:", err)
return
}
for i, url := range urls {
fmt.Printf("%s -> %d\n", url, results[i])
}
}
errgroup Limiting Concurrency (SetLimit)
g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 个并发
for _, task := range tasks {
task := task
g.Go(func() error {
return process(task)
})
}
if err := g.Wait(); err != nil {
log.Fatal(err)
}
Applicable Scenarios: Concurrent API calls, batch operations requiring error collection, automatic cancellation
5. Rate Limiter Pattern
5.1 time.Ticker Simple Rate Limiting
package main
import (
"fmt"
"time"
)
func main() {
// 每 200ms 处理一个请求(5 QPS)
limiter := time.NewTicker(200 * time.Millisecond)
defer limiter.Stop()
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-limiter.C // 等待下一个 tick
fmt.Printf("[%s] 处理请求 %d\n", time.Now().Format("04:05.000"), req)
}
}
5.2 Token Bucket Rate Limiting
package main
import (
"fmt"
"sync"
"time"
)
type TokenBucket struct {
tokens chan struct{}
interval time.Duration
stop chan struct{}
}
func NewTokenBucket(rate int, burst int) *TokenBucket {
tb := &TokenBucket{
tokens: make(chan struct{}, burst),
interval: time.Second / time.Duration(rate),
stop: make(chan struct{}),
}
// 初始填满
for i := 0; i < burst; i++ {
tb.tokens <- struct{}{}
}
// 持续补充令牌
go func() {
ticker := time.NewTicker(tb.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case tb.tokens <- struct{}{}:
default: // 桶满了,丢弃
}
case <-tb.stop:
return
}
}
}()
return tb
}
func (tb *TokenBucket) Allow() bool {
select {
case <-tb.tokens:
return true
default:
return false
}
}
func (tb *TokenBucket) Wait() {
<-tb.tokens
}
func (tb *TokenBucket) Close() {
close(tb.stop)
}
func main() {
bucket := NewTokenBucket(5, 3) // 5 QPS, 突发 3
defer bucket.Close()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
bucket.Wait() // 阻塞等待令牌
fmt.Printf("[%s] 请求 %d 获得令牌\n", time.Now().Format("04:05.000"), id)
}(i)
}
wg.Wait()
}
Applicable Scenarios: API rate limiting, preventing downstream overload, controlling resource consumption
6. Timeout and Cancellation Pattern
6.1 context.WithTimeout
package main
import (
"context"
"fmt"
"time"
)
func slowOperation(ctx context.Context) (string, error) {
select {
case <-time.After(3 * time.Second): // 模拟慢操作
return "操作完成", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
// 设置 1 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := slowOperation(ctx)
if err != nil {
fmt.Println("超时或取消:", err) // context deadline exceeded
return
}
fmt.Println(result)
}
6.2 Manual Cancellation + Multiple Goroutines
func main() {
ctx, cancel := context.WithCancel(context.Background())
// 启动多个 worker
for i := 0; i < 5; i++ {
go func(id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 收到取消信号\n", id)
return
default:
// 工作...
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d 工作中...\n", id)
}
}
}(i)
}
time.Sleep(2 * time.Second)
cancel() // 取消所有 worker
time.Sleep(100 * time.Millisecond)
fmt.Println("所有 worker 已停止")
}
7. Producer-Consumer Pattern
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Order struct {
ID int
Price float64
}
func producer(orders chan<- Order, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 20; i++ {
order := Order{
ID: i,
Price: float64(rand.Intn(1000)) / 10.0,
}
orders <- order
fmt.Printf("生产订单 #%d (%.1f元)\n", order.ID, order.Price)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
}
func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
defer wg.Done()
for order := range orders {
fmt.Printf(" 消费者%d 处理订单 #%d (%.1f元)\n", id, order.ID, order.Price)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}
func main() {
orders := make(chan Order, 5) // 缓冲 5 个订单
var producerWg, consumerWg sync.WaitGroup
// 2 个生产者
for i := 0; i < 2; i++ {
producerWg.Add(1)
go producer(orders, &producerWg)
}
// 3 个消费者
for i := 1; i <= 3; i++ {
consumerWg.Add(1)
go consumer(i, orders, &consumerWg)
}
// 等生产者完成后关闭 channel
producerWg.Wait()
close(orders)
// 等消费者处理完
consumerWg.Wait()
fmt.Println("所有订单处理完毕")
}
8. Or-Done Channel Pattern
Reads from a channel but needs to respond to a cancellation signal:
// orDone 包装一个 channel,使其可以被 done 信号中断
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-done:
return
}
}
}
}()
return out
}
func main() {
done := make(chan struct{})
data := make(chan int)
// 生产数据
go func() {
for i := 0; ; i++ {
data <- i
time.Sleep(100 * time.Millisecond)
}
}()
// 2 秒后取消
go func() {
time.Sleep(2 * time.Second)
close(done)
}()
for v := range orDone(done, data) {
fmt.Println(v)
}
fmt.Println("已取消")
}
9. Semaphore Pattern
Uses a buffered channel to simulate a semaphore, controlling the maximum degree of concurrency:
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore chan struct{}
func NewSemaphore(max int) Semaphore {
return make(Semaphore, max)
}
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
func main() {
sem := NewSemaphore(3) // 最多 3 个并发
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
fmt.Printf("[%s] 任务 %d 开始\n", time.Now().Format("05.000"), id)
time.Sleep(time.Second)
fmt.Printf("[%s] 任务 %d 完成\n", time.Now().Format("05.000"), id)
}(i)
}
wg.Wait()
}
Pattern Quick Reference Table
| Pattern | Core Idea | Typical Scenario |
|---|---|---|
| Worker Pool | Fixed number of goroutines consuming task queue | Batch processing, limiting concurrency |
| Pipeline | Multiple stages chained via channels | Data processing pipelines |
| Fan-in/Fan-out | Split for parallelism + merge results | Multi-data source aggregation |
| errgroup | Concurrency + error collection + automatic cancellation | Parallel API calls |
| Token Bucket | Control request rate | API rate limiting |
| Context Timeout/Cancellation | Propagate cancellation signal | RPC/HTTP timeout control |
| Producer-Consumer | Decouple production and consumption rates | Message queues, task scheduling |
| Or-Done | Interruptible channel read | Pipelines requiring graceful shutdown |
| Semaphore | Semaphore controlling concurrency | Connection pools, resource limits |
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6721