Go Engineer System Course 007

Product Microservice

Entity Structure Description

This module includes the following core entities:

  • Goods
  • Category
  • Brands
  • Banner
  • GoodsCategoryBrand

1. Goods

Describes the goods information actually displayed and sold on the platform.

Field Description

Field Name Type Description
name String Goods name, required
brand Pointer -> Brands Brand to which the goods belong, associated with the Brands entity
category Pointer -> Category Category to which the goods belong, associated with the Category entity
GoodsSn String Goods number
ShopPrice Int Selling price of the goods

2. Category

Used to manage the hierarchical classification structure of goods.

Field Description

Field Name Type Description
name String Category name, required
level Int Category level, e.g., 1 for a primary category
parent Pointer -> Category Parent category, used to build a tree structure

3. Brands

Used to represent the brand information of goods.

Field Description

Field Name Type Description
name String Brand name, required
logo String Brand Logo URL

4. Banner

Promotional images displayed on the homepage or channel pages.

Field Description

Field Name Type Description
image String Image URL
url String Jump link (product page)

5. Brand Category Relationship (GoodsCategoryBrand)

Used to define the binding relationship between brands and categories.

Field Description

⚠️ Note: The field details for this entity are not listed in the diagram. It is speculated that it may contain the following:

Field Name Type Description
category Pointer -> Category Associated goods category
brand Pointer -> Brands Associated brand

Data Relationship Summary

  • A goods item belongs to one brand and one category (many-to-one).
  • A category can have multiple subcategories (tree structure).
  • There is a many-to-many relationship between brands and categories, which needs to be managed through the intermediate table GoodsCategoryBrand.
  • Banners are associated with goods via URL. It is recommended to associate with the actual goods ID to improve jump accuracy.

OSS

Alibaba Cloud Documentation

Go SDK V2 Quick Start

/*
 * @Author: error: error: git config user.name & please set dead value or install git && error: git config user.email & please set dead value or install git & please set dead value or install git
 * @Date: 2025-06-01 11:50:08
 * @LastEditors: error: error: git config user.name & please set dead value or install git && error: git config user.email & please set dead value or install git & please set dead value or install git
 * @LastEditTime: 2025-06-01 12:11:13
 * @FilePath: /RpcLearn/aliyun_oss_test/main.go
 * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
 */
package main

import (
 "fmt"
 "os"

 "github.com/aliyun/aliyun-oss-go-sdk/oss"
)

func main() {
 // 从环境变量获取配置
 accessKeyID := os.Getenv("OSS_ACCESS_KEY_ID")
 accessKeySecret := os.Getenv("OSS_ACCESS_KEY_SECRET")

 // 添加调试信息
 fmt.Printf("环境变量值:\nOSS_ACCESS_KEY_ID: %s\nOSS_ACCESS_KEY_SECRET: %s\n",
  accessKeyID, accessKeySecret)

 bucketName := "blog4me"
 endpoint := "oss-cn-beijing.aliyuncs.com"

 // 检查必要的环境变量
 if accessKeyID == "" || accessKeySecret == "" {
  fmt.Println("请设置环境变量 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET")
  return
 }

 // 创建OSSClient实例
 client, err := oss.New(endpoint, accessKeyID, accessKeySecret)
 if err != nil {
  fmt.Println("创建OSS客户端失败:", err)
  return
 }

 // 获取存储空间
 bucket, err := client.Bucket(bucketName)
 if err != nil {
  fmt.Println("获取Bucket失败:", err)
  return
 }

 // 上传文件
 objectKey := "go_learn/upload_test.jpg"
 localFile := "upload_test.jpg"

 // 检查文件是否存在
 if _, err := os.Stat(localFile); os.IsNotExist(err) {
  fmt.Printf("文件 %s 不存在\n", localFile)
  return
 }

 err = bucket.PutObjectFromFile(objectKey, localFile)
 if err != nil {
  fmt.Println("上传文件失败:", err)
  return
 }

 fmt.Printf("文件 %s 已成功上传到 %s\n", localFile, objectKey)
}

Server-side signed direct upload and upload callback settings

Use persistent connections to achieve intranet penetration.

S3 Upload Process

用户浏览器
|
| (1) 上传文件到 S3(通过预签名 URL)
|
v
Amazon S3
|
| (2) 上传完成,前端调用你的 API Gateway
|
v
API Gateway --> Lambda 函数
|
| (3) Lambda 收到上传完成事件
|
v
数据库、通知服务、后端业务逻辑...

This example demonstrates how to upload local files to Amazon S3 using Go and AWS SDK v2.

🧾 Prerequisites

  • An AWS account is owned;
  • An S3 Bucket has been created;
  • AWS credentials have been configured (via aws configure or by setting environment variables);
  • A local file (e.g., test.jpg) has been prepared;

📦 Install Dependencies

go mod init s3uploadtest
go get github.com/aws/aws-sdk-go-v2/config
go get github.com/aws/aws-sdk-go-v2/service/s3

🧑‍💻 Example Code

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "path/filepath"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

func main() {
    bucket := "your-bucket-name"         // 替换为你的 S3 桶名
    region := "ap-southeast-1"           // 替换为你的区域
    key := "uploads/test.jpg"            // 上传后在 S3 中的路径
    filePath := "./test.jpg"             // 本地文件路径

    // 加载 AWS 配置
    cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
    if err != nil {
        log.Fatalf("无法加载 AWS 配置: %v", err)
    }

    // 创建 S3 客户端
    client := s3.NewFromConfig(cfg)

    // 打开文件
    file, err := os.Open(filePath)
    if err != nil {
        log.Fatalf("无法打开文件: %v", err)
    }
    defer file.Close()

    // 获取文件大小与内容类型
    fileInfo, _ := file.Stat()
    size := fileInfo.Size()
    contentType := detectContentType(filePath)

    // 执行上传
    _, err = client.PutObject(context.TODO(), &s3.PutObjectInput{
        Bucket:        &bucket,
        Key:           &key,
        Body:          file,
        ContentLength: size,
        ContentType:   &contentType,
    })

    if err != nil {
        log.Fatalf("上传失败: %v", err)
    }

    fmt.Println("上传成功 ✅")
    fmt.Printf("访问地址: https://%s.s3.amazonaws.com/%s\n", bucket, key)
}

func detectContentType(path string) string {
    ext := filepath.Ext(path)
    switch ext {
    case ".jpg", ".jpeg":
        return "image/jpeg"
    case ".png":
        return "image/png"
    case ".gif":
        return "image/gif"
    case ".txt":
        return "text/plain"
    default:
        return "application/octet-stream"
    }
}

✅ Upload Verification

After the upload is complete, access the URL:

https://your-bucket-name.s3.amazonaws.com/uploads/test.jpg

📚 References

Inventory

Under concurrent conditions, inventory cannot be deducted normally.

Before the update is complete, you cannot even query. Before querying, you need to acquire a lock. Whoever has the lock operates, and then releases it after the update.

Goods Service: Set inventory
Order: Pre-deduct inventory, order timeout mechanism (timeout mechanism) [return inventory]
Payment: Deduct inventory after payment is complete

package handler

import (
 "context"
 "sync"
 "time"

 "inventory_srv/global"
 "inventory_srv/model"
 "inventory_srv/proto"

 "go.uber.org/zap"
 "google.golang.org/grpc/codes"
 "google.golang.org/grpc/status"
 "google.golang.org/protobuf/types/known/emptypb"
 "gorm.io/gorm"
 "gorm.io/gorm/clause"
)

type InventoryServer struct {
 proto.UnimplementedInventoryServiceServer
 mu sync.Mutex
}

// SetInventory 设置库存
func (s *InventoryServer) SetInventory(ctx context.Context, req *proto.GoodsInvInfo) (*emptypb.Empty, error) {
 s.mu.Lock()
 defer s.mu.Unlock()

 var inv model.Inventory
 result := global.DB.Where("goods_id = ?", req.GoodsId).First(&inv)
 if result.Error != nil {
  if result.Error == gorm.ErrRecordNotFound {
   // 如果记录不存在,创建新记录
   inv = model.Inventory{
    GoodsID: req.GoodsId,
    Stock:   req.Num,
    Version: 0,
   }
   if err := global.DB.Create(&inv).Error; err != nil {
    zap.S().Errorf("创建库存记录失败: %v", err)
    return nil, status.Error(codes.Internal, "创建库存记录失败")
   }
  } else {
   zap.S().Errorf("查询库存记录失败: %v", result.Error)
   return nil, status.Error(codes.Internal, "查询库存记录失败")
  }
 } else {
  // 更新现有记录
  inv.Stock = req.Num
  if err := global.DB.Save(&inv).Error; err != nil {
   zap.S().Errorf("更新库存记录失败: %v", err)
   return nil, status.Error(codes.Internal, "更新库存记录失败")
  }
 }

 return &emptypb.Empty{}, nil
}

// GetInventory 获取库存
func (s *InventoryServer) GetInventory(ctx context.Context, req *proto.GoodsInvInfo) (*proto.GoodsInvInfo, error) {
 var inv model.Inventory
 result := global.DB.Where("goods_id = ?", req.GoodsId).First(&inv)
 if result.Error != nil {
  if result.Error == gorm.ErrRecordNotFound {
   return &proto.GoodsInvInfo{
    GoodsId: req.GoodsId,
    Num:     0,
   }, nil
  }
  zap.S().Errorf("查询库存记录失败: %v", result.Error)
  return nil, status.Error(codes.Internal, "查询库存记录失败")
 }

 return &proto.GoodsInvInfo{
  GoodsId: inv.GoodsID,
  Num:     inv.Stock,
 }, nil
}

// Sell 库存扣减
func (s *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
 const maxRetries = 3
 const retryDelay = 100 * time.Millisecond

 for retry := 0; retry < maxRetries; retry++ {
  // 开启事务
  tx := global.DB.Begin()
  if tx.Error != nil {
   return nil, status.Error(codes.Internal, "开启事务失败")
  }

  success := true
  // 遍历所有商品
  for _, goodsInfo := range req.GoodsInvInfo {
   var inv model.Inventory
   // 使用行锁查询
   result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
    Where("goods_id = ?", goodsInfo.GoodsId).
    First(&inv)

   if result.Error != nil {
    tx.Rollback()
    if result.Error == gorm.ErrRecordNotFound {
     return nil, status.Error(codes.NotFound, "商品库存不存在")
    }
    return nil, status.Error(codes.Internal, "查询库存失败")
   }

   // 检查库存是否充足
   if inv.Stock < goodsInfo.Num {
    tx.Rollback()
    return nil, status.Error(codes.ResourceExhausted, "库存不足")
   }

   // 使用乐观锁更新库存
   updateResult := tx.Model(&model.Inventory{}).
    Where("goods_id = ? AND version = ?", goodsInfo.GoodsId, inv.Version).
    Updates(map[string]interface{}{
     "stock":   inv.Stock - goodsInfo.Num,
     "version": inv.Version + 1,
    })

   if updateResult.Error != nil {
    tx.Rollback()
    success = false
    break
   }

   if updateResult.RowsAffected == 0 {
    tx.Rollback()
    success = false
    break
   }
  }

  if success {
   // 提交事务
   if err := tx.Commit().Error; err != nil {
    return nil, status.Error(codes.Internal, "提交事务失败")
   }
   return &emptypb.Empty{}, nil
  }

  // 如果失败且还有重试次数,等待后重试
  if retry < maxRetries-1 {
   time.Sleep(retryDelay)
   continue
  }
 }

 return nil, status.Error(codes.Internal, "库存更新失败,请重试")
}

// Reback 库存归还
func (s *InventoryServer) Reback(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
 const maxRetries = 3
 const retryDelay = 100 * time.Millisecond

 for retry := 0; retry < maxRetries; retry++ {
  // 开启事务
  tx := global.DB.Begin()
  if tx.Error != nil {
   return nil, status.Error(codes.Internal, "开启事务失败")
  }

  success := true
  // 遍历所有商品
  for _, goodsInfo := range req.GoodsInvInfo {
   var inv model.Inventory
   // 使用行锁查询
   result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
    Where("goods_id = ?", goodsInfo.GoodsId).
    First(&inv)

   if result.Error != nil {
    tx.Rollback()
    if result.Error == gorm.ErrRecordNotFound {
     return nil, status.Error(codes.NotFound, "商品库存不存在")
    }
    return nil, status.Error(codes.Internal, "查询库存失败")
   }

   // 使用乐观锁更新库存
   updateResult := tx.Model(&model.Inventory{}).
    Where("goods_id = ? AND version = ?", goodsInfo.GoodsId, inv.Version).
    Updates(map[string]interface{}{
     "stock":   inv.Stock + goodsInfo.Num,
     "version": inv.Version + 1,
    })

   if updateResult.Error != nil {
    tx.Rollback()
    success = false
    break
   }

   if updateResult.RowsAffected == 0 {
    tx.Rollback()
    success = false
    break
   }
  }

  if success {
   // 提交事务
   if err := tx.Commit().Error; err != nil {
    return nil, status.Error(codes.Internal, "提交事务失败")
   }
   return &emptypb.Empty{}, nil
  }

  // 如果失败且还有重试次数,等待后重试
  if retry < maxRetries-1 {
   time.Sleep(retryDelay)
   continue
  }
 }

 return nil, status.Error(codes.Internal, "库存更新失败,请重试")
}

Lock Mechanism Description

Pessimistic Lock

Pessimistic Lock is a locking mechanism that assumes concurrent conflicts occur frequently. Before operating on data, a lock is applied to the data, and other transactions can only proceed after the lock is released. It is common in database row locks, table locks, etc.

Examples:

  • In MySQL, SELECT ... FOR UPDATE applies an exclusive lock to the read rows, preventing other transactions from modifying these rows until the current transaction commits or rolls back.
  • In GORM, row-level pessimistic locks can be achieved using db.Clauses(clause.Locking{Strength: "UPDATE"}).
  • for update row locks are only effective for primary keys and indexes. If there is no index, the row lock will escalate to a table lock, and the lock is only effective for updates (read-write locks in Go language).
// 在事务中使用 -  db.Clauses(clause.Locking{Strength: "UPDATE"}).Find(&users)
tx.Clauses(clause.Locking{Strength: "UPDATE"}).Find(&users)
// SELECT * FROM `users` FOR UPDATE
// db.Clauses(clause.Locking{
tx.Clauses(clause.Locking{
  Strength: "SHARE",
  Table: clause.Table{Name: clause.CurrentTable},
}).Find(&users)
// SELECT * FROM `users` FOR SHARE OF `users`

Optimistic Lock

Optimistic Lock assumes that concurrent conflicts rarely occur. It does not apply a lock before operations but checks whether the data has been modified by other transactions during an update. It is commonly implemented using version numbers or timestamps.

Examples:

  • Add a version field to the table. When updating, include the old version. The update only succeeds if the version in the database has not changed; otherwise, it indicates a concurrent conflict and requires a retry.
  • Code example:

```go
tx.Model(&Inventory{}).
Where("goods_id = ? AND version = ?", goodsId, oldVersion).
Updates(map[string]interface{}{
"stock": newStock,
"version": oldVersion + 1,
})

// 升级写法:使用 Select 避免默认值和0值不更新的问题
tx.Model(&Inventory{}).
Where("goods_id = ? AND version = ?", goodsId, oldVersion).
Select("stock", "version").
Updates(Inventory{
Stock: newStock,
Version: oldVersion + 1,
})
```

Note: Using the Select method allows explicitly specifying the fields to be updated. Even if the field value is 0 or a default value, it will be updated. This avoids the problem of GORM ignoring zero-value fields by default.


Introduction to Distributed Locks

What is a Distributed Lock

A distributed lock is a mechanism used in distributed systems for mutual exclusion access to shared resources among multiple processes/nodes. It ensures that at any given moment, only one node can acquire the lock and operate on critical resources, preventing data races and inconsistencies.

Reasons for Introduction

  • In a single-machine environment, local locks (e.g., mutexes) can be used to ensure mutual exclusion between processes.
  • In a distributed environment, multiple service instances, nodes, or containers may operate on the same resource simultaneously, and local locks cannot be effective across processes/hosts.
  • Distributed locks achieve global mutual exclusion through middleware such as Redis, ZooKeeper, and Etcd, and are often used in scenarios requiring strong consistency, such as inventory deduction and order generation.

Common Implementation Methods:

  • Redis's SETNX + expiration time
  • ZooKeeper's ephemeral sequential nodes
  • Etcd's lease mechanism

If MySQL data does not have an index, its row lock will escalate to a table lock.

// Sell 库存扣减
func (s *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
 const maxRetries = 3
 const retryDelay = 100 * time.Millisecond

 for retry := 0; retry < maxRetries; retry++ {
  // 开启事务
  tx := global.DB.Begin()
  if tx.Error != nil {
   return nil, status.Error(codes.Internal, "开启事务失败")
  }

  success := true
  // 遍历所有商品
  for _, goodsInfo := range req.GoodsInvInfo {
   var inv model.Inventory
   // 使用行锁查询
   result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
    Where("goods_id = ?", goodsInfo.GoodsId).
    First(&inv)

   if result.Error != nil {
    tx.Rollback()
    if result.Error == gorm.ErrRecordNotFound {
     return nil, status.Error(codes.NotFound, "商品库存不存在")
    }
    return nil, status.Error(codes.Internal, "查询库存失败")
   }

   // 检查库存是否充足
   if inv.Stock < goodsInfo.Num {
    tx.Rollback()
    return nil, status.Error(codes.ResourceExhausted, "库存不足")
   }

   // 使用乐观锁更新库存
   updateResult := tx.Model(&model.Inventory{}).
    Where("goods_id = ? AND version = ?", goodsInfo.GoodsId, inv.Version).
    Updates(map[string]interface{}{
     "stock":   inv.Stock - goodsInfo.Num,
     "version": inv.Version + 1,
    })

   if updateResult.Error != nil {
    tx.Rollback()
    success = false
    break
   }

   if updateResult.RowsAffected == 0 {
    tx.Rollback()
    success = false
    break
   }
  }

  if success {
   // 提交事务
   if err := tx.Commit().Error; err != nil {
    return nil, status.Error(codes.Internal, "提交事务失败")
   }
   return &emptypb.Empty{}, nil
  }

  // 如果失败且还有重试次数,等待后重试
  if retry < maxRetries-1 {
   time.Sleep(retryDelay)
   continue
  }
 }

 return nil, status.Error(codes.Internal, "库存更新失败,请重试")
}

Implementing Distributed Locks based on Redis

Analysis

Redsync Source Code Analysis

  1. The role of SETNX

  2. Makes getting and setting values atomic operations

  3. What if my service crashes - deadlock
    a. Set an expiration time

  4. b. If you set an expiration time, what if my business logic hasn't finished executing when the expiration time arrives?
    i. Refresh it before it expires
    ii. You need to use a background goroutine to handle the delay. 1. The delayed interface might have negative impacts - if one of the services hangs, it might complete in 2 seconds, but if it hangs, you will keep requesting to extend the lock, causing others to never acquire the lock, which is critical.

  5. Problems to be solved in distributed systems
    a. Mutual exclusion - SETNX
    b. Deadlock
    c. Safety
    i. The lock can only be deleted by the user who holds it, not by other users. 1. Only the current goroutine knows the value set for the lock. 2. When deleting, the value in Redis must be compared with the value saved by the current goroutine.

Redis is one of the most commonly used middleware for implementing distributed locks. Its core idea is to leverage Redis's atomic operations (such as SETNX) to ensure that only one client can acquire the lock at any given time. Key implementation points are as follows:

  • Utilize SET key value NX PX expiration_time to ensure atomicity and automatic expiration, preventing deadlocks.
  • When a client releases a lock, it needs to verify the value to prevent accidentally deleting another client's lock (e.g., using a UUID identifier).
  • Consider issues such as automatic lock expiration, renewal, and fault tolerance in exceptional cases.
  • For production environments, mature solutions like Redisson and RedLock are recommended.

Go Code Example

Taking go-redis as an example, implement simple distributed lock acquire and release:

import (
    "context"
    "github.com/redis/go-redis/v9"
    "time"
    "github.com/google/uuid"
)

type RedisLock struct {
    client *redis.Client
    key    string
    value  string
    ttl    time.Duration
}

func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
    return &RedisLock{
        client: client,
        key:    key,
        value:  uuid.NewString(), // 唯一标识
        ttl:    ttl,
    }
}

// 加锁
func (l *RedisLock) Acquire(ctx context.Context) (bool, error) {
    ok, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
    return ok, err
}

// 释放锁(仅删除自己加的锁)
func (l *RedisLock) Release(ctx context.Context) (bool, error) {
    // Lua 脚本保证原子性
    script := `
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end`
    res, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
    return res == int64(1), err
}

Explanation

  • Use SetNX when acquiring a lock to ensure only one client succeeds.
  • Use a Lua script to verify the value when releasing the lock, ensuring that another client's lock is not accidentally deleted.
  • For production environments, it is recommended to use distributed lock libraries with automatic renewal and fault tolerance (e.g., redsync, redisson).
  • Distributed locks are suitable for short-term, critical resource mutual exclusion, not for long-term holding.
  • RedLock Explained

    Background

    A distributed lock implemented with a single Redis instance has a single point of failure. When the Redis instance goes down, the entire lock service becomes unavailable. To improve availability, a Redis cluster needs to be set up.

    RedLock Algorithm Principle

    RedLock is a distributed lock algorithm proposed by Redis official, which implements distributed locks on multiple independent Redis instances to ensure normal operation even if some instances fail.

    Core Idea

    1. Multiple Independent Redis Instances: Typically uses 5 independent Redis instances (an odd number, for easier voting)
    2. Majority Principle: A lock is considered successfully acquired only if it is successfully acquired on a majority of instances (more than half).
    3. Time Window Control: Set a time limit for acquiring the lock to avoid long waits.

    Algorithm Steps

    1. Acquire Lock:

    2. Get current timestamp (milliseconds)

    3. Sequentially attempt to acquire the lock on all Redis instances, using the same key and a random value.
    4. Each instance's acquisition operation sets a timeout (much shorter than the lock's expiration time).
    5. If the lock is successfully acquired on a majority of instances, and the total time elapsed is less than the lock's valid time, then the lock acquisition is considered successful.
    6. Lock's valid time = original valid time - time consumed to acquire the lock

    7. Release Lock:

    8. Send a lock release request to all Redis instances.
    9. Regardless of whether the lock was successfully acquired on that instance before.

    Cluster Configuration Example

    # redis-cluster.yml
    version: '3'
    services:
      redis1:
        image: redis:6-alpine
        ports:
          - '6379:6379'
        command: redis-server --appendonly yes
    
      redis2:
        image: redis:6-alpine
        ports:
          - '6380:6379'
        command: redis-server --appendonly yes
    
      redis3:
        image: redis:6-alpine
        ports:
          - '6381:6379'
        command: redis-server --appendonly yes
    
      redis4:
        image: redis:6-alpine
        ports:
          - '6382:6379'
        command: redis-server --appendonly yes
    
      redis5:
        image: redis:6-alpine
        ports:
          - '6383:6379'
        command: redis-server --appendonly yes
    

    Go Code Implementation

    package main
    
    import (
        "context"
        "crypto/rand"
        "encoding/hex"
        "fmt"
        "sync"
        "time"
    
        "github.com/go-redis/redis/v8"
    )
    
    type RedLock struct {
        clients []*redis.Client
        quorum  int
    }
    
    type Lock struct {
        key        string
        value      string
        expiration time.Duration
        clients    []*redis.Client
    }
    
    func NewRedLock(addrs []string) *RedLock {
        clients := make([]*redis.Client, len(addrs))
        for i, addr := range addrs {
            clients[i] = redis.NewClient(&redis.Options{
                Addr: addr,
            })
        }
    
        return &RedLock{
            clients: clients,
            quorum:  len(addrs)/2 + 1, // 大多数
        }
    }
    
    func (r *RedLock) Lock(key string, expiration time.Duration) (*Lock, error) {
        value := generateRandomValue()
    
        start := time.Now()
        successCount := 0
    
        // 并发向所有实例获取锁
        var wg sync.WaitGroup
        var mu sync.Mutex
    
        for _, client := range r.clients {
            wg.Add(1)
            go func(c *redis.Client) {
                defer wg.Done()
    
                ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
                defer cancel()
    
                result := c.SetNX(ctx, key, value, expiration)
                if result.Err() == nil && result.Val() {
                    mu.Lock()
                    successCount++
                    mu.Unlock()
                }
            }(client)
        }
    
        wg.Wait()
    
        elapsed := time.Since(start)
        validTime := expiration - elapsed
    
        // 检查是否获取锁成功
        if successCount >= r.quorum && validTime > 0 {
            return &Lock{
                key:        key,
                value:      value,
                expiration: validTime,
                clients:    r.clients,
            }, nil
        }
    
        // 获取失败,释放已获取的锁
        r.unlock(key, value)
        return nil, fmt.Errorf("failed to acquire lock")
    }
    
    func (r *RedLock) unlock(key, value string) {
        script := `
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
        `
    
        var wg sync.WaitGroup
        for _, client := range r.clients {
            wg.Add(1)
            go func(c *redis.Client) {
                defer wg.Done()
                ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
                defer cancel()
                c.Eval(ctx, script, []string{key}, value)
            }(client)
        }
        wg.Wait()
    }
    
    func (l *Lock) Unlock() {
        script := `
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
        `
    
        var wg sync.WaitGroup
        for _, client := range l.clients {
            wg.Add(1)
            go func(c *redis.Client) {
                defer wg.Done()
                ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
                defer cancel()
                c.Eval(ctx, script, []string{l.key}, l.value)
            }(client)
        }
        wg.Wait()
    }
    
    func generateRandomValue() string {
        bytes := make([]byte, 16)
        rand.Read(bytes)
        return hex.EncodeToString(bytes)
    }
    
    // 使用示例
    func main() {
        addrs := []string{
            "localhost:6379",
            "localhost:6380",
            "localhost:6381",
            "localhost:6382",
            "localhost:6383",
        }
    
        redlock := NewRedLock(addrs)
    
        lock, err := redlock.Lock("resource_key", 10*time.Second)
        if err != nil {
            fmt.Printf("获取锁失败: %v\n", err)
            return
        }
    
        fmt.Println("成功获取锁,执行业务逻辑...")
        time.Sleep(5 * time.Second)
    
        lock.Unlock()
        fmt.Println("释放锁完成")
    }
    

    Advantages and Disadvantages of RedLock

    Advantages

    1. High Availability: Continues to function normally even if some Redis instances fail.
    2. Fault Tolerance: Can tolerate network partitions or failures of a minority of instances.
    3. Consistency: Ensures lock consistency through the majority principle.

    Disadvantages

    1. Complexity: More complex to implement and operate than a single instance.
    2. Performance Overhead: Requires communication with multiple instances, increasing latency.
    3. Clock Dependency: Relies on clock synchronization across nodes.
    4. Resource Consumption: Requires maintaining multiple Redis instances.

    Production Environment Recommendations

    1. Use Mature Libraries: Recommended to use proven libraries such as redsync.
    2. Monitoring and Alerting: Monitor the status of each Redis instance and the lock acquisition success rate.
    3. Reasonable Configuration: Set appropriate timeout durations and retry strategies based on business requirements.
    4. Alternative Solutions: Consider using other distributed coordination services such as Etcd or ZooKeeper.

    Redis cluster and Redis-based sync have not been implemented (to be added later).

    主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6773

(0)
Walker的头像Walker
上一篇 13 hours ago
下一篇 Nov 25, 2025 15:00

Related Posts

EN
简体中文 繁體中文 English