Go工程師體系課 013

11次閱讀

訂單事務

  • 先扣庫存 後扣庫存 都會對庫存和訂單都會有影響, 所以要使用分佈式事務
  • 業務(下單不對付)業務問題
  • 支付成功再扣減(下單了,支付時沒庫存了)
  • 訂單扣減,不支付(訂單超時歸還)【常用方式】

事務和分佈式事務

1. 什麼是事務?

事務(Transaction)是數據庫管理系統中的一個重要概念,它是一組數據庫操作的集合,這些操作要麼全部成功執行,要麼全部失敗回滾。

1.1 事務的 ACID 特性

  • 原子性(Atomicity):事務中的所有操作要麼全部成功,要麼全部失敗,不存在部分成功的情況
  • 一致性(Consistency):事務執行前後,數據庫從一個一致狀態轉換到另一個一致狀態
  • 隔離性(Isolation):併發執行的事務之間相互隔離,一個事務的執行不應影響其他事務
  • 持久性(Durability):事務一旦提交,其結果就永久保存在數據庫中

1.2 事務的隔離級別

  1. 讀未提交(Read Uncommitted):最低級別,可能讀到髒數據
  2. 讀已提交(Read Committed):只能讀到已提交的數據
  3. 可重複讀(Repeatable Read):同一事務中多次讀取結果一致
  4. 串行化(Serializable):最高級別,完全串行執行

2. 什麼是分佈式事務?

分佈式事務是指涉及多個數據庫或服務的事務操作,需要保證跨多個節點的數據一致性。

2.1 分佈式事務的挑戰

  • 網絡分區:網絡故障導致節點間通信中斷
  • 節點故障:某個節點宕機或重啓
  • 時鐘不同步:各節點時間不一致
  • 數據一致性:如何保證跨節點的數據一致性

2.2 CAP 理論

  • 一致性(Consistency):所有節點在同一時間看到相同的數據(更新返回客戶端後)
  • 可用性(Availability):系統持續可用,不會出現操作失敗
  • 分區容錯性(Partition Tolerance):系統能夠容忍網絡分區故障

CAP 定理:在分佈式系統中,最多隻能同時滿足 CAP 中的兩個特性。

2.3 BASE 理論(與 CAP 的工程化取捨)

  • Basically Available(基本可用):在發生故障時,允許系統降級提供有限功能(如響應變慢、部分功能不可用)
  • Soft state(軟狀態):系統狀態允許在一段時間內存在中間態(未強一致)
  • Eventual consistency(最終一致):經過一段時間(或重試 / 補償)後,數據達到一致

工程實踐中:多數互聯網業務選擇 AP → 以 BASE 理論爲指導,犧牲強一致,換取高可用與可擴展,通過“補償、重試、去重、對賬”實現最終一致。

3. 分佈式事務解決方案

3.1 兩階段提交(2PC)

原理

  1. 準備階段:協調者詢問所有參與者是否可以提交
  2. 提交階段:根據參與者響應決定提交或回滾

優點:強一致性
缺點:性能差、單點故障、阻塞問題

流程細化(示意)

  1. 協調者向參與者發送 prepare 請求,各參與者預留資源、寫預提交日誌,並返回 yes/no
  2. 協調者彙總:全部 yes → 下發 commit;任一 no/ 超時 → 下發 rollback
  3. 參與者根據指令提交或回滾,並回執協調者

常見問題:協調者單點、參與者阻塞(長時間持鎖),網絡分區時恢復複雜。

sequenceDiagram
  participant C as 協調者(Coordinator)
  participant P1 as 參與者 1
  participant P2 as 參與者 2

  C->>P1: prepare
  C->>P2: prepare
  P1-->>C: yes/ 預提交成功
  P2-->>C: yes/ 預提交成功
  alt 全部 yes
    C->>P1: commit
    C->>P2: commit
    P1-->>C: ack
    P2-->>C: ack
  else 任一 no/ 超時
    C->>P1: rollback
    C->>P2: rollback
  end

3.2 三階段提交(3PC)

在 2PC 基礎上增加預提交階段,減少阻塞時間,但仍存在單點故障問題。

3.3 TCC(Try-Confirm-Cancel)

原理

  • Try:嘗試執行業務,預留資源
  • Confirm:確認執行業務,提交資源
  • Cancel:取消執行業務,釋放資源

優點:性能好、無阻塞
缺點:實現複雜、需要業務補償

落地要點(以訂單 - 庫存 - 支付爲例)

  • Try:創建訂單預狀態、預佔庫存(扣減可用庫存、增加預佔庫存)、預下單支付
  • Confirm(支付成功回調或異步確認):訂單變爲已支付、庫存從預佔轉正式扣減
  • Cancel(支付失敗 / 超時):訂單取消、釋放預佔庫存

實現細節:接口冪等(去重表 / 唯一業務鍵)、空回滾 / 懸掛處理、事務日誌記錄與重試任務。

sequenceDiagram
  participant Order as 訂單服務
  participant Inv as 庫存服務
  participant Pay as 支付服務

  rect rgb(230,250,230)
  Note over Order,Inv: Try 階段(預留資源)Order->>Inv: Try 預佔庫存
  Order->>Pay: Try 預下單 / 凍結
  end

  alt 支付成功
    rect rgb(230,230,255)
    Note over Order,Inv: Confirm 階段
    Pay-->>Order: 支付成功回調
    Order->>Inv: Confirm 扣減庫存
    Order->>Pay: Confirm 確認扣款
    end
  else 失敗 / 超時
    rect rgb(255,230,230)
    Note over Order,Inv: Cancel 階段
    Order->>Inv: Cancel 釋放預佔
    Order->>Pay: Cancel 解凍 / 撤銷
    end
  end

3.4 基於消息的最終一致性

原理

  1. 本地事務執行
  2. 發送消息到消息隊列
  3. 消費者處理消息,保證最終一致性

優點:性能好、實現相對簡單
缺點:只能保證最終一致性

3.4.1 基於本地消息表(Outbox Pattern)

流程:同庫同事務內寫業務數據與 outbox 消息表 → 後臺轉發器輪詢投遞到 MQ → 消費者處理並落庫 → 發送確認 / 對賬。

要點:

  • 生產端強一致(業務 + 消息同庫同事務)
  • 轉發冪等(按消息 ID 投遞、消費去重)
  • 失敗重試與死信隊列、人工對賬修復
flowchart LR
  A[應用 / 業務服務] -->| 同庫同事務 | B[(業務表 + Outbox 表)]
  B -->| 後臺轉發器掃描 / 拉取 | MQ[消息隊列]
  MQ --> C[下游服務]
  C --> D[(消費落庫 / 去重表)]

  subgraph 重試與對賬
    E[失敗重投 / 死信隊列]
    F[對賬 / 人工修復]
  end
  MQ --> E
  E --> F
3.4.2 基於可靠消息的最終一致性(常用)

流程:

  1. 業務方向 MQ 申請“預消息 / 半消息”(prepare)
  2. 業務本地提交成功後調用 MQ 確認(commit),否則回滾(rollback)
  3. MQ 掛起未確認的半消息並回查(check)業務方最終狀態,決定提交或丟棄

要點:

  • 依賴 MQ 的事務消息 / 回查能力(RocketMQ 等)
  • 生產與消費兩端均需冪等處理
sequenceDiagram
  participant Biz as 業務服務
  participant MQ as MQ(事務消息)
  participant D as 下游服務

  Biz->>MQ: 發送半消息(Prepare)
  Biz->>Biz: 執行業務本地事務
  alt 成功
    Biz->>MQ: Commit 確認
  else 失敗
    Biz->>MQ: Rollback 撤銷
  end
  MQ->>D: 投遞正式消息
  D-->>MQ: Ack/ 重試

  MQ->>Biz: 事務回查(Check) 未確認半消息
  Biz-->>MQ: 返回最終狀態(提交 / 回滾)
3.4.3 最大努力通知

流程:事件發生後向下游發起通知(HTTP/MQ),失敗則按策略重試若干次,超過閾值進入人工處理。

適用:對一致性要求相對寬鬆的場景(如短信、站內信、積分發放)。

flowchart LR
  A[事件源] --> B{通知}
  B -->|HTTP/MQ| C[下游]
  B --> R1[重試 1]
  R1 --> R2[重試 2]
  R2 --> R3[重試 N]
  R3 --> DLQ[死信 / 人工補償]
  C --> Idem[去重 / 冪等處理]

4. 訂單系統中的事務處理

4.1 庫存扣減問題

在訂單系統中,庫存扣減是關鍵操作:

// 庫存扣減示例
func (s *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    // 開啓事務
    tx := global.DB.Begin()
    if tx.Error != nil {return nil, status.Error(codes.Internal, " 開啓事務失敗 ")
    }

    // 遍歷所有商品
    for _, goodsInfo := range req.GoodsInvInfo {
        var inv model.Inventory
        // 使用行鎖查詢
        result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
            Where("goods_id = ?", goodsInfo.GoodsId).
            First(&inv)

        // 檢查庫存是否充足
        if inv.Stock < goodsInfo.Num {tx.Rollback()
            return nil, status.Error(codes.ResourceExhausted, " 庫存不足 ")
        }

        // 使用樂觀鎖更新庫存
        updateResult := tx.Model(&model.Inventory{}).
            Where("goods_id = ? AND version = ?", goodsInfo.GoodsId, inv.Version).
            Updates(map[string]interface{}{
                "stock":   inv.Stock - goodsInfo.Num,
                "version": inv.Version + 1,
            })
    }

    // 提交事務
    if err := tx.Commit().Error; err != nil {return nil, status.Error(codes.Internal, " 提交事務失敗 ")
    }
    return &emptypb.Empty{}, nil}

4.2 分佈式鎖解決併發問題

// 基於 Redis 的分佈式鎖
func (s *InventoryServer) SellWithDistributedLock(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    // 獲取分佈式鎖
    lockKey := fmt.Sprintf("inventory_lock_%d", req.GoodsInvInfo[0].GoodsId)
    lock := s.redisClient.NewMutex(lockKey, time.Second*10)

    if err := lock.Lock(); err != nil {return nil, status.Error(codes.Internal, " 獲取鎖失敗 ")
    }
    defer lock.Unlock()

    // 執行庫存扣減邏輯
    return s.Sell(ctx, req)
}

5. 業務場景分析

5.1 下單不支付問題

問題:用戶下單後不支付,導致庫存被佔用

解決方案

  1. 訂單超時機制:設置訂單超時時間,超時後自動取消
  2. 庫存預佔:下單時預佔庫存,支付成功後確認扣減
  3. 定時任務:定期清理超時訂單,釋放庫存

5.2 支付時庫存不足問題

問題:下單時庫存充足,支付時庫存不足

解決方案

  1. 庫存預佔:下單時預佔庫存,避免超賣
  2. 支付時再次檢查:支付前再次驗證庫存
  3. 補償機制:庫存不足時提供替代方案

6. 最佳實踐

  1. 合理使用事務:避免長事務,減少鎖競爭
  2. 選擇合適的隔離級別:根據業務需求選擇
  3. 使用樂觀鎖:減少鎖競爭,提高併發性能
  4. 實現重試機制:處理臨時性失敗
  5. 監控和告警:及時發現和處理問題

7. 總結

事務和分佈式事務是保證數據一致性的重要機制。在微服務架構中,需要根據業務場景選擇合適的分佈式事務解決方案,平衡一致性、可用性和性能。訂單系統作爲典型的分佈式事務場景,需要特別注意庫存扣減、訂單狀態管理等關鍵操作的數據一致性。

總結與思考

TCC 分佈式事務總結

總結一下,你要玩兒 TCC 分佈式事務的話:

  1. 首先需要選擇某種 TCC 分佈式事務框架,各個服務裏就會有這個 TCC 分佈式事務框架在運行。

  2. 然後你原本的一個接口,要改造爲 3 個邏輯:Try-Confirm-Cancel。

TCC 流程:

  • 先是服務調用鏈路依次執行 Try 邏輯
  • 如果都正常的話,TCC 分佈式事務框架推進執行 Confirm 邏輯,完成整個事務
  • 如果某個服務的 Try 邏輯有問題,TCC 分佈式事務框架感知到之後就會推進執行各個服務的 Cancel 邏輯,撤銷之前執行的各種操作
  • 這就是所謂的 TCC 分佈式事務

TCC 分佈式事務的核心思想,說白了,就是當遇到下面這些情況時:

  1. 某個服務的數據庫宕機了
  2. 某個服務自己掛了
  3. 那個服務的 redis、elasticsearch、MQ 等基礎設施故障了
  4. 某些資源不足了,比如說庫存不夠這些

進一步解釋:

  • 先來 Try 一下,不到 3:20 業務邏輯完成,先試試看,看各個服務能可能基本正常運轉,能不能先凍結我需要的資源
  • 如果 Try 都 ok,也就是說,底層的數據庫、redis、elasticsearch、MQ 都是可以寫入數據的,並且你保留好了需要使用的一些資源(比如凍結了一部分庫存)

基於本地消息表的最終一致性

本地消息表方案詳解

方案來源:本地消息表方案最初由 eBay 提出,是一種經典的分佈式事務最終一致性解決方案。

核心原理

  • 通過本地事務來保證數據業務操作和消息的一致性
  • 使用定時任務將消息發送到消息中間件(如 MQ)
  • 消息只有在成功投遞給消費者後才被刪除

實現流程

1. 基本流程

  1. 業務操作:在本地數據庫中執行業務操作
  2. 消息記錄:在同一個本地事務中,將消息記錄到本地消息表
  3. 消息發送:定時任務掃描本地消息表,將未發送的消息發送到 MQ
  4. 消息確認:消費者處理完消息後,發送確認回執
  5. 消息清理:收到確認後,從本地消息表中刪除該消息

2. 關鍵設計要點

本地消息表結構

CREATE TABLE local_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    business_id VARCHAR(64) NOT NULL,  -- 業務 ID
    message_content TEXT NOT NULL,     -- 消息內容
    message_status TINYINT DEFAULT 0,  -- 0: 待發送 1: 已發送 2: 已確認
    retry_count INT DEFAULT 0,         -- 重試次數
    created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_created (message_status, created_time)
);

冪等性保證

  • 消費者端需要實現冪等性處理
  • 使用業務唯一標識避免重複處理
  • 記錄已處理消息的 ID 或業務鍵

重試機制

  • 消息發送失敗時進行重試
  • 設置最大重試次數,超過後進入死信隊列
  • 採用指數退避策略避免頻繁重試

實際應用示例:註冊送積分

業務場景:用戶註冊成功後,自動贈送積分

涉及服務

  • 用戶服務:負責用戶註冊
  • 積分服務:負責積分管理

實現步驟

  1. 用戶註冊:用戶服務接收註冊請求
  2. 新增用戶:在用戶表中插入新用戶記錄
  3. 記錄消息:在同一事務中,向本地消息表插入 ” 贈送積分 ” 消息
  4. 發送消息:定時任務將消息發送到 MQ
  5. 處理積分:積分服務消費消息,爲用戶增加積分
  6. 確認處理:積分服務處理完成後發送確認
  7. 清理消息:用戶服務收到確認後刪除本地消息

流程圖解

sequenceDiagram
    participant User as 用戶
    participant UserService as 用戶服務
    participant LocalDB as 本地數據庫
    participant MessageTable as 本地消息表
    participant Scheduler as 定時任務
    participant MQ as 消息隊列
    participant PointsService as 積分服務

    User->>UserService: 1. 用戶註冊請求
    UserService->>LocalDB: 2. 開啓本地事務
    UserService->>LocalDB: 3. 新增用戶記錄
    UserService->>MessageTable: 4. 插入消息記錄
    UserService->>LocalDB: 5. 提交事務

    Note over Scheduler: 定時掃描未發送消息
    Scheduler->>MessageTable: 6. 查詢待發送消息
    Scheduler->>MQ: 7. 發送消息到 MQ
    Scheduler->>MessageTable: 8. 更新消息狀態爲已發送

    MQ->>PointsService: 9. 投遞消息
    PointsService->>PointsService: 10. 處理積分邏輯
    PointsService->>MQ: 11. 發送確認 ACK

    Note over Scheduler: 收到確認後清理
    Scheduler->>MessageTable: 12. 刪除已確認消息

架構圖

graph TB
    subgraph " 用戶服務 "
        A[用戶註冊接口] --> B[本地數據庫]
        B --> C[本地消息表]
    end

    subgraph " 消息處理 "
        D[定時任務掃描器] --> C
        D --> E[消息隊列 MQ]
    end

    subgraph " 積分服務 "
        E --> F[積分服務消費者]
        F --> G[積分數據庫]
    end

    subgraph " 監控與重試 "
        H[重試機制] --> D
        I[死信隊列] --> D
        J[監控告警] --> D
    end

    A --> D
    F --> E

優缺點分析

優點

  1. 強一致性:本地事務保證業務操作和消息記錄的一致性
  2. 可靠性高:消息持久化存儲,不會丟失
  3. 實現簡單:不需要複雜的事務協調器
  4. 性能較好:異步處理,不阻塞主業務流程

缺點

  1. 最終一致性:只能保證最終一致性,不能保證強一致性
  2. 消息延遲:定時任務掃描存在延遲
  3. 資源消耗:需要額外的存儲空間和定時任務
  4. 複雜度增加:需要處理消息重試、死信等場景

適用場景

  1. 對一致性要求不是特別嚴格 的業務場景
  2. 可以接受最終一致性 的分佈式系統
  3. 消息處理可以異步進行 的場景
  4. 需要保證消息不丟失 的重要業務

最佳實踐

  1. 消息表設計:合理設計消息表結構,添加必要的索引
  2. 批量處理:定時任務批量處理消息,提高效率
  3. 監控告警:監控消息積壓、處理失敗等情況
  4. 冪等設計:消費者端必須實現冪等性處理
  5. 異常處理:完善的異常處理和重試機制

基於可靠消息的最終一致性(事務消息)

RocketMQ 事務消息方案詳解

方案特點:基於 RocketMQ 的事務消息機制,實現分佈式事務的最終一致性。

核心原理

  • 通過 RocketMQ 的事務消息功能,保證消息發送和業務操作的一致性
  • 利用消息回查機制,確保事務的最終一致性
  • 支持半消息(Half Message)和事務回查(Transaction Check)

RocketMQ 事務消息流程

1. 基本流程

  1. 發送半消息:業務方發送半消息到 RocketMQ
  2. 執行業務:執行業務邏輯(本地事務)
  3. 提交 / 回滾:根據業務執行結果,提交或回滾事務消息
  4. 消息投遞:RocketMQ 將已提交的消息投遞給消費者
  5. 事務回查:對於未確認的半消息,RocketMQ 會回查業務方狀態

2. 關鍵概念

半消息(Half Message)

  • 對消費者不可見的消息
  • 用於確保消息發送和業務操作的一致性
  • 只有事務提交後纔會投遞給消費者

事務回查(Transaction Check)

  • RocketMQ 主動回查業務方的事務狀態
  • 用於處理網絡異常、服務重啓等場景
  • 確保事務消息的最終一致性

RocketMQ 事務消息流程圖

sequenceDiagram
    participant Producer as 業務生產者
    participant RMQ as RocketMQ
    participant Consumer as 業務消費者
    participant LocalDB as 本地數據庫

    Note over Producer,LocalDB: 第一階段:發送半消息
    Producer->>RMQ: 1. 發送半消息(事務消息)
    RMQ-->>Producer: 2. 返回半消息發送結果

    Note over Producer,LocalDB: 第二階段:執行業務邏輯
    Producer->>LocalDB: 3. 開啓本地事務
    Producer->>LocalDB: 4. 執行業務操作
    Producer->>LocalDB: 5. 記錄事務狀態

    alt 業務執行成功
        Note over Producer,LocalDB: 第三階段:提交事務
        Producer->>RMQ: 6a. 提交事務(commit)
        RMQ->>RMQ: 7a. 將半消息轉爲正式消息
        RMQ->>Consumer: 8a. 投遞消息給消費者
        Consumer->>Consumer: 9a. 處理業務邏輯
        Consumer-->>RMQ: 10a. 返回消費確認
    else 業務執行失敗
        Note over Producer,LocalDB: 第三階段:回滾事務
        Producer->>RMQ: 6b. 回滾事務(rollback)
        RMQ->>RMQ: 7b. 丟棄半消息
    end

    Note over Producer,RMQ: 事務回查機制
    RMQ->>Producer: 11. 事務回查(未收到 commit/rollback)
    Producer->>LocalDB: 12. 查詢本地事務狀態
    Producer-->>RMQ: 13. 返回事務狀態(commit/rollback)

    alt 回查結果爲 commit
        RMQ->>RMQ: 14a. 將半消息轉爲正式消息
        RMQ->>Consumer: 15a. 投遞消息
    else 回查結果爲 rollback
        RMQ->>RMQ: 14b. 丟棄半消息
    end

詳細架構圖

graph TB
    subgraph " 業務生產者 "
        A[業務應用] --> B[事務消息發送器]
        B --> C[本地事務執行器]
        C --> D[本地數據庫]
        C --> E[事務狀態記錄]
    end

    subgraph "RocketMQ 集羣 "
        F[NameServer] --> G[Broker Master]
        G --> H[Broker Slave]
        I[事務消息存儲] --> G
        J[消息隊列] --> G
    end

    subgraph " 業務消費者 "
        K[消息消費者] --> L[業務處理邏輯]
        L --> M[消費者數據庫]
    end

    subgraph " 事務回查 "
        N[事務回查服務] --> E
        N --> G
    end

    B --> F
    G --> K
    G --> N

    style I fill:#e1f5fe
    style N fill:#fff3e0

實現示例代碼

生產者端實現

// RocketMQ 事務消息生產者
type TransactionProducer struct {producer rocketmq.TransactionProducer}

// 發送事務消息
func (tp *TransactionProducer) SendTransactionMessage(topic, message string) error {
    msg := &rocketmq.Message{
        Topic: topic,
        Body:  []byte(message),
    }

    // 發送事務消息
    result, err := tp.producer.SendMessageInTransaction(msg, func(msg *rocketmq.Message) rocketmq.LocalTransactionState {
        // 執行本地事務
        return tp.executeLocalTransaction(msg)
    })

    if err != nil {return err}

    log.Printf(" 事務消息發送結果: %s", result.String())
    return nil
}

// 執行本地事務
func (tp *TransactionProducer) executeLocalTransaction(msg *rocketmq.Message) rocketmq.LocalTransactionState {
    // 開啓本地事務
    tx := global.DB.Begin()
    if tx.Error != nil {return rocketmq.RollbackMessage}

    // 執行業務邏輯
    if err := tp.doBusinessLogic(msg); err != nil {tx.Rollback()
        return rocketmq.RollbackMessage
    }

    // 記錄事務狀態
    if err := tp.recordTransactionState(msg, "COMMIT"); err != nil {tx.Rollback()
        return rocketmq.RollbackMessage
    }

    tx.Commit()
    return rocketmq.CommitMessage
}

// 事務回查
func (tp *TransactionProducer) checkLocalTransaction(msg *rocketmq.Message) rocketmq.LocalTransactionState {
    // 查詢本地事務狀態
    state, err := tp.getTransactionState(msg)
    if err != nil {return rocketmq.RollbackMessage}

    switch state {
    case "COMMIT":
        return rocketmq.CommitMessage
    case "ROLLBACK":
        return rocketmq.RollbackMessage
    default:
        return rocketmq.Unknow
    }
}

消費者端實現

// RocketMQ 消息消費者
type MessageConsumer struct {consumer rocketmq.PushConsumer}

// 消費消息
func (mc *MessageConsumer) ConsumeMessage(msgs []*rocketmq.MessageExt) rocketmq.ConsumeResult {
    for _, msg := range msgs {
        // 冪等性檢查
        if mc.isProcessed(msg) {continue}

        // 處理業務邏輯
        if err := mc.processBusinessLogic(msg); err != nil {
            // 記錄處理失敗,稍後重試
            mc.recordProcessFailure(msg, err)
            return rocketmq.ConsumeRetryLater
        }

        // 記錄處理成功
        mc.recordProcessSuccess(msg)
    }

    return rocketmq.ConsumeSuccess
}

// 冪等性檢查
func (mc *MessageConsumer) isProcessed(msg *rocketmq.MessageExt) bool {
    // 檢查消息是否已處理
    var count int64
    global.DB.Model(&ProcessedMessage{}).
        Where("message_id = ?", msg.MsgId).
        Count(&count)

    return count > 0
}

關鍵配置

RocketMQ 配置

# RocketMQ 配置
rocketmq:
  name-server: '127.0.0.1:9876'
  producer:
    group: 'transaction-producer-group'
    send-message-timeout: 3000
    retry-times: 2
  consumer:
    group: 'business-consumer-group'
    consume-timeout: 30000
    max-reconsume-times: 3

優缺點分析

優點

  1. 強一致性保證:通過事務消息機制保證消息和業務的一致性
  2. 自動回查:RocketMQ 自動處理事務回查,減少業務複雜度
  3. 高性能:異步處理,不阻塞主業務流程
  4. 可靠性高:消息持久化,支持集羣部署

缺點

  1. 依賴特定 MQ:需要支持事務消息的消息隊列
  2. 實現複雜:需要處理事務回查邏輯
  3. 最終一致性:只能保證最終一致性,不能保證強一致性
  4. 資源消耗:需要額外的存儲和處理資源

適用場景

  1. 對一致性要求較高 的分佈式事務場景
  2. 可以接受最終一致性 的業務系統
  3. 使用 RocketMQ 作爲消息中間件的系統
  4. 需要保證消息不丟失 的重要業務

最佳實踐

  1. 事務回查實現:必須實現可靠的事務回查邏輯
  2. 冪等性處理:消費者端必須實現冪等性處理
  3. 異常處理:完善的異常處理和重試機制
  4. 監控告警:監控事務消息的發送和消費情況
  5. 性能優化:合理設置消息隊列參數,優化性能

最大努力通知

最大努力通知方案詳解

方案特點:最大努力通知是一種相對寬鬆的分佈式事務解決方案,適用於對一致性要求不是特別嚴格的場景。

核心原理

  • 業務方在完成本地事務後,盡力通知其他業務方
  • 如果通知失敗,則按照策略進行重試
  • 超過最大重試次數後,進入人工處理流程

支付通知場景分析

業務場景

以電商支付爲例:

  1. 用戶支付:用戶完成支付操作
  2. 支付成功:支付系統確認支付成功
  3. 通知訂單:支付系統通知訂單系統更新訂單狀態
  4. 通知庫存:支付系統通知庫存系統扣減庫存
  5. 通知積分:支付系統通知積分系統增加用戶積分

實現流程

sequenceDiagram
    participant User as 用戶
    participant Payment as 支付系統
    participant Order as 訂單系統
    participant Inventory as 庫存系統
    participant Points as 積分系統
    participant Notification as 通知服務
    participant Retry as 重試服務
    participant Manual as 人工處理

    User->>Payment: 1. 發起支付
    Payment->>Payment: 2. 處理支付邏輯
    Payment->>Payment: 3. 更新支付狀態

    Note over Payment,Notification: 異步通知階段
    Payment->>Notification: 4. 發送通知任務

    par 並行通知多個系統
        Notification->>Order: 5a. 通知訂單系統
        Order-->>Notification: 6a. 返回處理結果
    and
        Notification->>Inventory: 5b. 通知庫存系統
        Inventory-->>Notification: 6b. 返回處理結果
    and
        Notification->>Points: 5c. 通知積分系統
        Points-->>Notification: 6c. 返回處理結果
    end

    alt 通知成功
        Notification->>Notification: 7a. 記錄通知成功
    else 通知失敗
        Notification->>Retry: 7b. 進入重試隊列
        Retry->>Retry: 8. 按策略重試

        alt 重試成功
            Retry->>Notification: 9a. 標記通知成功
        else 超過最大重試次數
            Retry->>Manual: 9b. 進入人工處理
            Manual->>Manual: 10. 人工干預處理
        end
    end

詳細架構設計

graph TB
    subgraph " 支付系統 "
        A[支付處理] --> B[支付狀態更新]
        B --> C[通知任務創建]
    end

    subgraph " 通知服務 "
        D[通知調度器] --> E[通知執行器]
        E --> F[通知記錄表]
        G[重試調度器] --> H[重試執行器]
        H --> I[重試記錄表]
    end

    subgraph " 下游系統 "
        J[訂單系統] --> K[訂單狀態更新]
        L[庫存系統] --> M[庫存扣減]
        N[積分系統] --> O[積分增加]
    end

    subgraph " 監控告警 "
        P[失敗監控] --> Q[告警通知]
        R[人工處理] --> S[補償操作]
    end

    C --> D
    E --> J
    E --> L
    E --> N
    F --> G
    I --> P
    P --> R

    style F fill:#e8f5e8
    style I fill:#fff3e0
    style R fill:#ffebee

核心實現代碼

1. 通知任務模型

// 通知任務模型
type NotificationTask struct {
    ID           int64     `json:"id" gorm:"primaryKey"`
    BusinessID   string    `json:"business_id" gorm:"index"`   // 業務 ID
    BusinessType string    `json:"business_type"`              // 業務類型
    TargetURL    string    `json:"target_url"`                 // 通知目標 URL
    Payload      string    `json:"payload"`                    // 通知內容
    Status       int       `json:"status"`                     // 0: 待通知 1: 通知成功 2: 通知失敗
    RetryCount   int       `json:"retry_count"`                // 重試次數
    MaxRetry     int       `json:"max_retry"`                  // 最大重試次數
    NextRetryAt  time.Time `json:"next_retry_at"`              // 下次重試時間
    CreatedAt    time.Time `json:"created_at"`
    UpdatedAt    time.Time `json:"updated_at"`
}

// 通知記錄模型
type NotificationLog struct {
    ID         int64     `json:"id" gorm:"primaryKey"`
    TaskID     int64     `json:"task_id" gorm:"index"`
    Status     int       `json:"status"`     // 0: 成功 1: 失敗
    Response   string    `json:"response"`   // 響應內容
    ErrorMsg   string    `json:"error_msg"`  // 錯誤信息
    CreatedAt  time.Time `json:"created_at"`
}

2. 通知服務實現

// 通知服務
type NotificationService struct {
    db     *gorm.DB
    client *http.Client
    logger *log.Logger
}

// 創建通知任務
func (ns *NotificationService) CreateNotificationTask(businessID, businessType, targetURL, payload string) error {
    task := &NotificationTask{
        BusinessID:   businessID,
        BusinessType: businessType,
        TargetURL:    targetURL,
        Payload:      payload,
        Status:       0, // 待通知
        RetryCount:   0,
        MaxRetry:     5, // 默認最大重試 5 次
        NextRetryAt:  time.Now(),}

    return ns.db.Create(task).Error
}

// 執行通知
func (ns *NotificationService) ExecuteNotification(task *NotificationTask) error {
    // 構建 HTTP 請求
    req, err := http.NewRequest("POST", task.TargetURL, strings.NewReader(task.Payload))
    if err != nil {return err}

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Business-ID", task.BusinessID)
    req.Header.Set("X-Business-Type", task.BusinessType)

    // 發送請求
    resp, err := ns.client.Do(req)
    if err != nil {ns.recordNotificationLog(task.ID, 1, "", err.Error())
        return ns.handleNotificationFailure(task)
    }
    defer resp.Body.Close()

    // 讀取響應
    body, _ := io.ReadAll(resp.Body)

    if resp.StatusCode == 200 {
        // 通知成功
        ns.recordNotificationLog(task.ID, 0, string(body), "")
        return ns.handleNotificationSuccess(task)
    } else {
        // 通知失敗
        ns.recordNotificationLog(task.ID, 1, string(body), fmt.Sprintf("HTTP %d", resp.StatusCode))
        return ns.handleNotificationFailure(task)
    }
}

// 處理通知成功
func (ns *NotificationService) handleNotificationSuccess(task *NotificationTask) error {return ns.db.Model(task).Updates(map[string]interface{}{"status": 1, // 通知成功}).Error
}

// 處理通知失敗
func (ns *NotificationService) handleNotificationFailure(task *NotificationTask) error {
    task.RetryCount++

    if task.RetryCount >= task.MaxRetry {
        // 超過最大重試次數,標記爲失敗
        return ns.db.Model(task).Updates(map[string]interface{}{"status": 2, // 通知失敗}).Error
    } else {
        // 計算下次重試時間(指數退避)nextRetryAt := time.Now().Add(time.Duration(math.Pow(2, float64(task.RetryCount))) * time.Minute)
        return ns.db.Model(task).Updates(map[string]interface{}{
            "retry_count":   task.RetryCount,
            "next_retry_at": nextRetryAt,
        }).Error
    }
}

// 記錄通知日誌
func (ns *NotificationService) recordNotificationLog(taskID int64, status int, response, errorMsg string) {
    log := &NotificationLog{
        TaskID:    taskID,
        Status:    status,
        Response:  response,
        ErrorMsg:  errorMsg,
        CreatedAt: time.Now(),}

    ns.db.Create(log)
}

3. 重試服務實現

// 重試服務
type RetryService struct {
    db                *gorm.DB
    notificationSvc   *NotificationService
    logger           *log.Logger
}

// 啓動重試服務
func (rs *RetryService) Start() {ticker := time.NewTicker(30 * time.Second) // 每 30 秒檢查一次
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            rs.processRetryTasks()}
    }
}

// 處理重試任務
func (rs *RetryService) processRetryTasks() {var tasks []NotificationTask

    // 查詢需要重試的任務
    err := rs.db.Where("status = ? AND retry_count < max_retry AND next_retry_at <= ?",
        0, time.Now()).Find(&tasks).Error

    if err != nil {rs.logger.Printf(" 查詢重試任務失敗: %v", err)
        return
    }

    for _, task := range tasks {go rs.retryNotification(&task)
    }
}

// 重試通知
func (rs *RetryService) retryNotification(task *NotificationTask) {rs.logger.Printf(" 開始重試通知任務: %d, 重試次數: %d", task.ID, task.RetryCount+1)

    if err := rs.notificationSvc.ExecuteNotification(task); err != nil {rs.logger.Printf(" 重試通知失敗: %v", err)
    }
}

4. 支付系統集成

// 支付服務
type PaymentService struct {
    db                *gorm.DB
    notificationSvc   *NotificationService
}

// 處理支付成功
func (ps *PaymentService) HandlePaymentSuccess(paymentID string, amount float64, userID int64) error {
    // 開啓事務
    tx := ps.db.Begin()
    if tx.Error != nil {return tx.Error}

    // 更新支付狀態
    if err := tx.Model(&Payment{}).Where("id = ?", paymentID).
        Updates(map[string]interface{}{
            "status": "SUCCESS",
            "paid_at": time.Now(),}).Error; err != nil {tx.Rollback()
        return err
    }

    // 提交事務
    if err := tx.Commit().Error; err != nil {return err}

    // 異步創建通知任務
    go ps.createNotificationTasks(paymentID, amount, userID)

    return nil
}

// 創建通知任務
func (ps *PaymentService) createNotificationTasks(paymentID string, amount float64, userID int64) {
    // 通知訂單系統
    orderPayload := map[string]interface{}{
        "payment_id": paymentID,
        "status": "PAID",
        "amount": amount,
    }
    payload, _ := json.Marshal(orderPayload)
    ps.notificationSvc.CreateNotificationTask(
        paymentID, "PAYMENT_SUCCESS",
        "http://order-service/api/payment/notify",
        string(payload),
    )

    // 通知庫存系統
    inventoryPayload := map[string]interface{}{
        "payment_id": paymentID,
        "user_id": userID,
        "action": "DEDUCT",
    }
    payload, _ = json.Marshal(inventoryPayload)
    ps.notificationSvc.CreateNotificationTask(
        paymentID, "PAYMENT_SUCCESS",
        "http://inventory-service/api/payment/notify",
        string(payload),
    )

    // 通知積分系統
    pointsPayload := map[string]interface{}{
        "user_id": userID,
        "points": int(amount), // 1 元 = 1 積分
        "source": "PAYMENT",
    }
    payload, _ = json.Marshal(pointsPayload)
    ps.notificationSvc.CreateNotificationTask(
        paymentID, "PAYMENT_SUCCESS",
        "http://points-service/api/points/add",
        string(payload),
    )
}

重試策略配置

# 通知配置
notification:
  max_retry: 5 # 最大重試次數
  retry_interval: 30s # 重試檢查間隔
  exponential_backoff: true # 是否使用指數退避
  base_delay: 1m # 基礎延遲時間
  max_delay: 30m # 最大延遲時間

  # 不同業務類型的重試策略
  business_strategies:
    PAYMENT_SUCCESS:
      max_retry: 5
      base_delay: 1m
    ORDER_UPDATE:
      max_retry: 3
      base_delay: 30s
    INVENTORY_DEDUCT:
      max_retry: 5
      base_delay: 2m

監控和告警

// 監控服務
type MonitorService struct {db *gorm.DB}

// 檢查失敗通知
func (ms *MonitorService) CheckFailedNotifications() {
    var failedCount int64

    // 統計 24 小時內失敗的通知
    ms.db.Model(&NotificationTask{}).
        Where("status = ? AND updated_at >= ?", 2, time.Now().Add(-24*time.Hour)).
        Count(&failedCount)

    if failedCount > 10 { // 失敗數量超過閾值
        ms.sendAlert(fmt.Sprintf(" 通知失敗數量過多: %d", failedCount))
    }
}

// 發送告警
func (ms *MonitorService) sendAlert(message string) {
    // 發送郵件、短信、釘釘等告警
    log.Printf(" 告警: %s", message)
}

優缺點分析

優點

  1. 實現簡單:邏輯清晰,易於理解和實現
  2. 性能較好:異步處理,不阻塞主業務流程
  3. 靈活性高:可以針對不同業務設置不同的重試策略
  4. 容錯性強:通過重試機制提高成功率

缺點

  1. 最終一致性:只能保證最終一致性,不能保證強一致性
  2. 數據可能不一致:在重試期間,數據可能處於不一致狀態
  3. 需要人工干預:超過重試次數後需要人工處理
  4. 監控複雜:需要完善的監控和告警機制

適用場景

  1. 對一致性要求相對寬鬆 的業務場景
  2. 可以接受最終一致性 的分佈式系統
  3. 通知類業務:如支付通知、狀態變更通知等
  4. 非核心業務:如積分發放、消息推送等

最佳實踐

  1. 合理設置重試策略:根據業務重要性設置不同的重試次數和間隔
  2. 實現冪等性:接收方必須實現冪等性處理
  3. 完善監控:監控通知成功率、失敗率等關鍵指標
  4. 人工處理流程:建立完善的人工處理流程和工具
  5. 數據對賬:定期進行數據對賬,發現和處理不一致問題
正文完
 0