Order Transactions
- Both pre-deduction and post-deduction of inventory will affect inventory and orders, so distributed transactions must be used.
- Business (unpaid order) business issues
- Deduct after successful payment (order placed, but no inventory during payment)
- Order deduction, no payment (order timeout returns) [Common method]
Transactions and Distributed Transactions
1. What is a Transaction?
A transaction (Transaction) is an important concept in database management systems, representing a collection of database operations that either all succeed or all fail and roll back.
1.1 ACID Properties of Transactions
- Atomicity: All operations within a transaction either succeed completely or fail completely; there is no partial success.
- Consistency: Before and after a transaction executes, the database transitions from one consistent state to another consistent state.
- Isolation: Concurrently executing transactions are isolated from each other; the execution of one transaction should not affect other transactions.
- Durability: Once a transaction is committed, its results are permanently saved in the database.
1.2 Transaction Isolation Levels
- Read Uncommitted: Lowest level, may read dirty data.
- Read Committed: Can only read committed data.
- Repeatable Read: Multiple reads within the same transaction yield consistent results.
- Serializable: Highest level, completely serial execution.
2. What is a Distributed Transaction?
A distributed transaction refers to transaction operations involving multiple databases or services, requiring data consistency across multiple nodes.
2.1 Challenges of Distributed Transactions
- Network Partition: Network failures lead to communication interruption between nodes.
- Node Failure: A node crashes or restarts.
- Clock Skew: Inconsistent time across different nodes.
- Data Consistency: How to ensure data consistency across nodes.
2.2 CAP Theorem
- Consistency: All nodes see the same data at the same time (after updates return to the client).
- Availability: The system remains continuously available, without operation failures.
- Partition Tolerance: The system can tolerate network partition failures.
CAP Theorem: In a distributed system, at most two of the three CAP properties can be satisfied simultaneously.
2.3 BASE Theory (Engineering Trade-offs with CAP)
- Basically Available: In the event of a failure, the system is allowed to degrade and provide limited functionality (e.g., slower response, partial unavailability).
- Soft state: The system state is allowed to exist in an intermediate state for a period (not strongly consistent).
- Eventual consistency: Data eventually becomes consistent after a period (or retries/compensation).
In engineering practice: most internet businesses choose AP → guided by BASE theory, sacrificing strong consistency for high availability and scalability, achieving eventual consistency through "compensation, retries, deduplication, and reconciliation".
3. Distributed Transaction Solutions
3.1 Two-Phase Commit (2PC)
Principle:
- Prepare Phase: The coordinator asks all participants if they can commit.
- Commit Phase: Decides to commit or roll back based on participant responses.
Advantages: Strong consistency
Disadvantages: Poor performance, single point of failure, blocking issues
Detailed Process (Illustration):
- The coordinator sends a
preparerequest to participants, who reserve resources, write pre-commit logs, and return yes/no. - The coordinator aggregates: all yes → sends
commit; any no/timeout → sendsrollback. - Participants commit or roll back according to the instruction and acknowledge the coordinator.
Common issues: coordinator single point of failure, participant blocking (long-held locks), complex recovery during network partitions.
sequenceDiagram
participant C as 协调者(Coordinator)
participant P1 as 参与者1
participant P2 as 参与者2
C->>P1: prepare
C->>P2: prepare
P1-->>C: yes/预提交成功
P2-->>C: yes/预提交成功
alt 全部yes
C->>P1: commit
C->>P2: commit
P1-->>C: ack
P2-->>C: ack
else 任一no/超时
C->>P1: rollback
C->>P2: rollback
end
3.2 Three-Phase Commit (3PC)
Adds a pre-commit phase on top of 2PC to reduce blocking time, but still suffers from single point of failure issues.
3.3 TCC (Try-Confirm-Cancel)
Principle:
- Try: Attempt to execute business logic, reserve resources.
- Confirm: Confirm business execution, commit resources.
- Cancel: Cancel business execution, release resources.
Advantages: Good performance, non-blocking
Disadvantages: Complex implementation, requires business compensation
Key Implementation Points (Taking Order-Inventory-Payment as an Example):
- Try: Create order pre-status, pre-occupy inventory (deduct available inventory, increase pre-occupied inventory), pre-order payment.
- Confirm (payment success callback or asynchronous confirmation): Order changes to paid, inventory transitions from pre-occupied to formally deducted.
- Cancel (payment failure/timeout): Order cancellation, release pre-occupied inventory.
Implementation details: interface idempotency (deduplication table/unique business key), null rollback/dangling transaction handling, transaction log recording and retry tasks.
sequenceDiagram
participant Order as 订单服务
participant Inv as 库存服务
participant Pay as 支付服务
rect rgb(230,250,230)
Note over Order,Inv: Try 阶段(预留资源)
Order->>Inv: Try 预占库存
Order->>Pay: Try 预下单/冻结
end
alt 支付成功
rect rgb(230,230,255)
Note over Order,Inv: Confirm 阶段
Pay-->>Order: 支付成功回调
Order->>Inv: Confirm 扣减库存
Order->>Pay: Confirm 确认扣款
end
else 失败/超时
rect rgb(255,230,230)
Note over Order,Inv: Cancel 阶段
Order->>Inv: Cancel 释放预占
Order->>Pay: Cancel 解冻/撤销
end
end
3.4 Message-Based Eventual Consistency
Principle:
- Local transaction execution
- Send message to message queue
- Consumer processes messages, ensuring eventual consistency
Advantages: Good performance, relatively simple implementation
Disadvantages: Only guarantees eventual consistency
3.4.1 Local Message Table (Outbox Pattern)
Process: Write business data and outbox message table within the same database and transaction → Background forwarder polls and delivers to MQ → Consumer processes and persists to database → Send acknowledgment/reconciliation.
Key Points:
- Strong consistency on the producer side (business + message in the same database and transaction)
- Forwarding idempotency (delivery by message ID, consumer deduplication)
- Failed retries and dead-letter queue, manual reconciliation and repair
flowchart LR
A[应用/业务服务] -->|同库同事务| B[(业务表 + Outbox表)]
B -->|后台转发器扫描/拉取| MQ[消息队列]
MQ --> C[下游服务]
C --> D[(消费落库/去重表)]
subgraph 重试与对账
E[失败重投/死信队列]
F[对账/人工修复]
end
MQ --> E
E --> F
3.4.2 Reliable Message-Based Eventual Consistency (Commonly Used)
Process:
- Business requests "prepare message/half message" from MQ.
- After successful local business commit, call MQ to confirm (commit), otherwise roll back (rollback).
- MQ suspends unconfirmed half-messages and checks the final status with the business party (check), deciding whether to commit or discard.
Key Points:
- Relies on MQ's transaction message/callback capability (e.g., RocketMQ)
- Both producer and consumer sides require idempotent processing
sequenceDiagram
participant Biz as 业务服务
participant MQ as MQ(事务消息)
participant D as 下游服务
Biz->>MQ: 发送半消息(Prepare)
Biz->>Biz: 执行业务本地事务
alt 成功
Biz->>MQ: Commit 确认
else 失败
Biz->>MQ: Rollback 撤销
end
MQ->>D: 投递正式消息
D-->>MQ: Ack/重试
MQ->>Biz: 事务回查(Check) 未确认半消息
Biz-->>MQ: 返回最终状态(提交/回滚)
3.4.3 Best-Effort Notification
Process: After an event occurs, send a notification to downstream systems (HTTP/MQ). If it fails, retry several times according to a strategy. If the threshold is exceeded, it enters manual processing.
Applicable to scenarios with relatively relaxed consistency requirements (e.g., SMS, in-app messages, points issuance).
flowchart LR
A[事件源] --> B{通知}
B -->|HTTP/MQ| C[下游]
B --> R1[重试1]
R1 --> R2[重试2]
R2 --> R3[重试N]
R3 --> DLQ[死信/人工补偿]
C --> Idem[去重/幂等处理]
4. Transaction Handling in Order Systems
4.1 Inventory Deduction Issues
In order systems, inventory deduction is a critical operation:
// 库存扣减示例
func (s *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
// 开启事务
tx := global.DB.Begin()
if tx.Error != nil {
return nil, status.Error(codes.Internal, "开启事务失败")
}
// 遍历所有商品
for _, goodsInfo := range req.GoodsInvInfo {
var inv model.Inventory
// 使用行锁查询
result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where("goods_id = ?", goodsInfo.GoodsId).
First(&inv)
// 检查库存是否充足
if inv.Stock < goodsInfo.Num {
tx.Rollback()
return nil, status.Error(codes.ResourceExhausted, "库存不足")
}
// 使用乐观锁更新库存
updateResult := tx.Model(&model.Inventory{}).
Where("goods_id = ? AND version = ?", goodsInfo.GoodsId, inv.Version).
Updates(map[string]interface{}{
"stock": inv.Stock - goodsInfo.Num,
"version": inv.Version + 1,
})
}
// 提交事务
if err := tx.Commit().Error; err != nil {
return nil, status.Error(codes.Internal, "提交事务失败")
}
return &emptypb.Empty{}, nil
}
4.2 Distributed Locks for Concurrency Issues
// 基于Redis的分布式锁
func (s *InventoryServer) SellWithDistributedLock(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
// 获取分布式锁
lockKey := fmt.Sprintf("inventory_lock_%d", req.GoodsInvInfo[0].GoodsId)
lock := s.redisClient.NewMutex(lockKey, time.Second*10)
if err := lock.Lock(); err != nil {
return nil, status.Error(codes.Internal, "获取锁失败")
}
defer lock.Unlock()
// 执行库存扣减逻辑
return s.Sell(ctx, req)
}
5. Business Scenario Analysis
5.1 Unpaid Order Issues
Problem: Users place orders but do not pay, leading to inventory being occupied.
Solutions:
- Order Timeout Mechanism: Set an order timeout period, after which the order is automatically canceled.
- Inventory Pre-occupation: Pre-occupy inventory when an order is placed, confirm deduction after successful payment.
- Scheduled Task: Regularly clean up timed-out orders and release inventory.
5.2 Insufficient Inventory During Payment Issues
Problem: Inventory is sufficient when the order is placed, but insufficient during payment.
Solutions:
- Inventory Pre-occupation: Pre-occupy inventory when an order is placed to prevent overselling.
- Re-check during payment: Verify inventory again before payment.
- Compensation Mechanism: Provide alternative solutions if inventory is insufficient.
6. Best Practices
- Judicious use of transactions: Avoid long transactions, reduce lock contention.
- Choose appropriate isolation levels: Select based on business requirements.
- Use optimistic locking: Reduce lock contention, improve concurrency performance.
- Implement retry mechanisms: Handle transient failures.
- Monitoring and alerting: Timely detection and resolution of issues.
7. Summary
Transactions and distributed transactions are crucial mechanisms for ensuring data consistency. In a microservices architecture, it is necessary to choose appropriate distributed transaction solutions based on business scenarios, balancing consistency, availability, and performance. As a typical distributed transaction scenario, order systems require special attention to data consistency in critical operations such as inventory deduction and order status management.
Summary and Reflection
TCC Distributed Transaction Summary
To summarize, if you're going to implement TCC distributed transactions:
-
First, you need to choose a TCC distributed transaction framework, which will then run within each service.
-
Then, your original interface needs to be refactored into 3 logics: Try-Confirm-Cancel.
TCC Process:
- First, the service call chain sequentially executes the Try logic.
- If all are normal, the TCC distributed transaction framework proceeds to execute the Confirm logic, completing the entire transaction.
- If there's an issue with the Try logic of a certain service, the TCC distributed transaction framework will detect it and then proceed to execute the Cancel logic of each service, revoking all previously performed operations.
- This is what is known as TCC distributed transactions.
The core idea of TCC distributed transactions, simply put, is when encountering the following situations:
- The database of a certain service is down.
- A certain service itself crashed.
- Infrastructure like Redis, Elasticsearch, MQ of that service failed.
- Certain resources are insufficient, such as insufficient inventory.
Further explanation:
- First, Try it out. Before the full business logic is completed, just try to see if each service can basically operate normally and if the resources I need can be frozen first.
- If all Try operations are OK, meaning the underlying databases, Redis, Elasticsearch, and MQ can all write data, and you have reserved the necessary resources (e.g., frozen a portion of inventory).
Local Message Table-Based Eventual Consistency
Detailed Explanation of Local Message Table Solution
Solution Origin: The local message table solution was initially proposed by eBay and is a classic distributed transaction eventual consistency solution.
Core Principle:
- Ensures consistency between business operations and messages through local transactions.
- Uses scheduled tasks to send messages to a message middleware (e.g., MQ).
- Messages are only deleted after being successfully delivered to the consumer.
Implementation Process
1. Basic Process
- Business Operation: Execute business operations in the local database.
- Message Recording: Record the message in the local message table within the same local transaction.
- Message Sending: A scheduled task scans the local message table and sends unsent messages to MQ.
- Message Acknowledgment: The consumer sends an acknowledgment after processing the message.
- Message Cleanup: Upon receiving the acknowledgment, delete the message from the local message table.
2. Key Design Points
Local Message Table Structure:
CREATE TABLE local_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
business_id VARCHAR(64) NOT NULL, -- 业务ID
message_content TEXT NOT NULL, -- 消息内容
message_status TINYINT DEFAULT 0, -- 0:待发送 1:已发送 2:已确认
retry_count INT DEFAULT 0, -- 重试次数
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_created (message_status, created_time)
);
Idempotency Guarantee:
- The consumer side needs to implement idempotency handling.
- Use a unique business identifier to avoid duplicate processing.
- Record the ID or business key of processed messages.
Retry Mechanism:
- Retry when message sending fails.
- Set a maximum number of retries, after which the message enters a dead-letter queue.
- Adopt an exponential backoff strategy to avoid frequent retries.
Practical Application Example: Register to Get Points
Business Scenario: After a user successfully registers, points are automatically awarded.
Involved Services:
- User Service: Responsible for user registration.
- Points Service: Responsible for points management.
Implementation Steps:
- User Registration: The user service receives the registration request.
- Add New User: Insert a new user record into the user table.
- Record Message: In the same transaction, insert a "award points" message into the local message table.
- Send Message: A scheduled task sends the message to MQ.
- Process Points: The points service consumes the message and adds points to the user.
- Confirm Processing: The points service sends an acknowledgment after processing is complete.
- Clean Up Message: The user service deletes the local message upon receiving the acknowledgment.
Flowchart Explanation
sequenceDiagram
participant User as 用户
participant UserService as 用户服务
participant LocalDB as 本地数据库
participant MessageTable as 本地消息表
participant Scheduler as 定时任务
participant MQ as 消息队列
participant PointsService as 积分服务
User->>UserService: 1. 用户注册请求
UserService->>LocalDB: 2. 开启本地事务
UserService->>LocalDB: 3. 新增用户记录
UserService->>MessageTable: 4. 插入消息记录
UserService->>LocalDB: 5. 提交事务
Note over Scheduler: 定时扫描未发送消息
Scheduler->>MessageTable: 6. 查询待发送消息
Scheduler->>MQ: 7. 发送消息到MQ
Scheduler->>MessageTable: 8. 更新消息状态为已发送
MQ->>PointsService: 9. 投递消息
PointsService->>PointsService: 10. 处理积分逻辑
PointsService->>MQ: 11. 发送确认ACK
Note over Scheduler: 收到确认后清理
Scheduler->>MessageTable: 12. 删除已确认消息
Architecture Diagram
graph TB
subgraph "用户服务"
A[用户注册接口] --> B[本地数据库]
B --> C[本地消息表]
end
subgraph "消息处理"
D[定时任务扫描器] --> C
D --> E[消息队列MQ]
end
subgraph "积分服务"
E --> F[积分服务消费者]
F --> G[积分数据库]
end
subgraph "监控与重试"
H[重试机制] --> D
I[死信队列] --> D
J[监控告警] --> D
end
A --> D
F --> E
Pros and Cons Analysis
Advantages:
- Strong Consistency: Local transactions ensure consistency between business operations and message recording.
- High Reliability: Messages are persistently stored and will not be lost.
- Simple Implementation: Does not require complex transaction coordinators.
- Good Performance: Asynchronous processing, does not block the main business flow.
Disadvantages:
- Eventual Consistency: Only guarantees eventual consistency, not strong consistency.
- Message Delay: Scheduled task scanning introduces latency.
- Resource Consumption: Requires additional storage space and scheduled tasks.
- Increased Complexity: Needs to handle message retries, dead letters, and other scenarios.
Applicable Scenarios
- Business scenarios where consistency requirements are not particularly strict.
- Distributed systems that can accept eventual consistency.
- Scenarios where message processing can be asynchronous.
- Important businesses that need to ensure messages are not lost.
Best Practices
- Message Table Design: Reasonably design the message table structure and add necessary indexes.
- Batch Processing: Scheduled tasks process messages in batches to improve efficiency.
- Monitoring and Alerting: Monitor message backlog, processing failures, and other situations.
- Idempotent Design: The consumer side must implement idempotent processing.
- Exception Handling: Comprehensive exception handling and retry mechanisms.
Reliable Message-Based Eventual Consistency (Transactional Messages)
Detailed Explanation of RocketMQ Transactional Message Solution
Solution Features: Based on RocketMQ's transactional message mechanism, achieving eventual consistency for distributed transactions.
Core Principle:
- Ensures consistency between message sending and business operations through RocketMQ's transactional message feature.
- Utilizes the message check mechanism to ensure the eventual consistency of transactions.
- Supports Half Message and Transaction Check.
RocketMQ Transactional Message Flow
1. Basic Process
- Send Half Message: The business party sends a half message to RocketMQ.
- Execute Business Logic: Execute business logic (local transaction).
- Commit/Rollback: Commit or roll back the transactional message based on the business execution result.
- Message Delivery: RocketMQ delivers the committed message to consumers.
- Transaction Check: For unconfirmed half messages, RocketMQ will check the business party's status.
2. Key Concepts
Half Message:
- Messages that are invisible to consumers.
- Used to ensure consistency between message sending and business operations.
- Only delivered to consumers after the transaction is committed.
Transaction Check:
- RocketMQ actively checks the business party's transaction status.
- Used to handle scenarios such as network anomalies and service restarts.
- Ensures the eventual consistency of transactional messages.
RocketMQ Transactional Message Flowchart
sequenceDiagram
participant Producer as 业务生产者
participant RMQ as RocketMQ
participant Consumer as 业务消费者
participant LocalDB as 本地数据库
Note over Producer,LocalDB: 第一阶段:发送半消息
Producer->>RMQ: 1. 发送半消息(事务消息)
RMQ-->>Producer: 2. 返回半消息发送结果
Note over Producer,LocalDB: 第二阶段:执行业务逻辑
Producer->>LocalDB: 3. 开启本地事务
Producer->>LocalDB: 4. 执行业务操作
Producer->>LocalDB: 5. 记录事务状态
alt 业务执行成功
Note over Producer,LocalDB: 第三阶段:提交事务
Producer->>RMQ: 6a. 提交事务(commit)
RMQ->>RMQ: 7a. 将半消息转为正式消息
RMQ->>Consumer: 8a. 投递消息给消费者
Consumer->>Consumer: 9a. 处理业务逻辑
Consumer-->>RMQ: 10a. 返回消费确认
else 业务执行失败
Note over Producer,LocalDB: 第三阶段:回滚事务
Producer->>RMQ: 6b. 回滚事务(rollback)
RMQ->>RMQ: 7b. 丢弃半消息
end
Note over Producer,RMQ: 事务回查机制
RMQ->>Producer: 11. 事务回查(未收到commit/rollback)
Producer->>LocalDB: 12. 查询本地事务状态
Producer-->>RMQ: 13. 返回事务状态(提交/回滚)
alt 回查结果为commit
RMQ->>RMQ: 14a. 将半消息转为正式消息
RMQ->>Consumer: 15a. 投递消息
else 回查结果为rollback
RMQ->>RMQ: 14b. 丢弃半消息
end
Detailed Architecture Diagram
graph TB
subgraph "业务生产者"
A[业务应用] --> B[事务消息发送器]
B --> C[本地事务执行器]
C --> D[本地数据库]
C --> E[事务状态记录]
end
subgraph "RocketMQ集群"
F[NameServer] --> G[Broker Master]
G --> H[Broker Slave]
I[事务消息存储] --> G
J[消息队列] --> G
end
subgraph "业务消费者"
K[消息消费者] --> L[业务处理逻辑]
L --> M[消费者数据库]
end
subgraph "事务回查"
N[事务回查服务] --> E
N --> G
end
B --> F
G --> K
G --> N
style I fill:#e1f5fe
style N fill:#fff3e0
Implementation Example Code
Producer-Side Implementation
// RocketMQ 事务消息生产者
type TransactionProducer struct {
producer rocketmq.TransactionProducer
}
// 发送事务消息
func (tp *TransactionProducer) SendTransactionMessage(topic, message string) error {
msg := &rocketmq.Message{
Topic: topic,
Body: []byte(message),
}
// 发送事务消息
result, err := tp.producer.SendMessageInTransaction(msg, func(msg *rocketmq.Message) rocketmq.LocalTransactionState {
// 执行本地事务
return tp.executeLocalTransaction(msg)
})
if err != nil {
return err
}
log.Printf("事务消息发送结果: %s", result.String())
return nil
}
// 执行本地事务
func (tp *TransactionProducer) executeLocalTransaction(msg *rocketmq.Message) rocketmq.LocalTransactionState {
// 开启本地事务
tx := global.DB.Begin()
if tx.Error != nil {
return rocketmq.RollbackMessage
}
// 执行业务逻辑
if err := tp.doBusinessLogic(msg); err != nil {
tx.Rollback()
return rocketmq.RollbackMessage
}
// 记录事务状态
if err := tp.recordTransactionState(msg, "COMMIT"); err != nil {
tx.Rollback()
return rocketmq.RollbackMessage
}
tx.Commit()
return rocketmq.CommitMessage
}
// 事务回查
func (tp *TransactionProducer) checkLocalTransaction(msg *rocketmq.Message) rocketmq.LocalTransactionState {
// 查询本地事务状态
state, err := tp.getTransactionState(msg)
if err != nil {
return rocketmq.RollbackMessage
}
switch state {
case "COMMIT":
return rocketmq.CommitMessage
case "ROLLBACK":
return rocketmq.RollbackMessage
default:
return rocketmq.Unknow
}
}
Consumer-Side Implementation
// RocketMQ 消息消费者
type MessageConsumer struct {
consumer rocketmq.PushConsumer
}
// 消费消息
func (mc *MessageConsumer) ConsumeMessage(msgs []*rocketmq.MessageExt) rocketmq.ConsumeResult {
for _, msg := range msgs {
// 幂等性检查
if mc.isProcessed(msg) {
continue
}
// 处理业务逻辑
if err := mc.processBusinessLogic(msg); err != nil {
// 记录处理失败,稍后重试
mc.recordProcessFailure(msg, err)
return rocketmq.ConsumeRetryLater
}
// 记录处理成功
mc.recordProcessSuccess(msg)
}
return rocketmq.ConsumeSuccess
}
// 幂等性检查
func (mc *MessageConsumer) isProcessed(msg *rocketmq.MessageExt) bool {
// 检查消息是否已处理
var count int64
global.DB.Model(&ProcessedMessage{}).
Where("message_id = ?", msg.MsgId).
Count(&count)
return count > 0
}
Key Configurations
RocketMQ Configuration
# RocketMQ 配置
rocketmq:
name-server: '127.0.0.1:9876'
producer:
group: 'transaction-producer-group'
send-message-timeout: 3000
retry-times: 2
consumer:
group: 'business-consumer-group'
consume-timeout: 30000
max-reconsume-times: 3
Pros and Cons Analysis
Advantages:
- Strong Consistency Guarantee: Ensures consistency between messages and business operations through the transactional message mechanism.
- Automatic Check: RocketMQ automatically handles transaction checks, reducing business complexity.
- High Performance: Asynchronous processing, does not block the main business flow.
- High Reliability: Message persistence, supports cluster deployment.
Disadvantages:
- Dependency on Specific MQ: Requires a message queue that supports transactional messages.
- Complex Implementation: Requires handling transaction check logic.
- Eventual Consistency: Only guarantees eventual consistency, not strong consistency.
- Resource Consumption: Requires additional storage and processing resources.
Applicable Scenarios
- Distributed transaction scenarios with high consistency requirements.
- Business systems that can accept eventual consistency.
- Systems that use RocketMQ as their message middleware.
- Important businesses that need to ensure messages are not lost.
Best Practices
- Transaction Check Implementation: Must implement reliable transaction check logic.
- Idempotent Processing: The consumer side must implement idempotent processing.
- Exception Handling: Comprehensive exception handling and retry mechanisms.
- Monitoring and Alerting: Monitor the sending and consumption of transactional messages.
- Performance Optimization: Reasonably set message queue parameters to optimize performance.
Best-Effort Notification
Detailed Explanation of Best-Effort Notification Solution
Solution Features: Best-effort notification is a relatively relaxed distributed transaction solution, suitable for scenarios where consistency requirements are not particularly strict.
Core Principle:
- After completing its local transaction, the business party makes its best effort to notify other business parties.
- If the notification fails, it retries according to a strategy.
- After exceeding the maximum number of retries, it enters a manual processing workflow.
Payment Notification Scenario Analysis
Business Scenario
Taking e-commerce payment as an example:
- User Payment: The user completes the payment operation.
- Payment Success: The payment system confirms successful payment.
- Notify Order: The payment system notifies the order system to update the order status.
- Notify Inventory: The payment system notifies the inventory system to deduct inventory.
- Notify Points: The payment system notifies the points system to add points to the user.
Implementation Process
sequenceDiagram
participant User as 用户
participant Payment as 支付系统
participant Order as 订单系统
participant Inventory as 库存系统
participant Points as 积分系统
participant Notification as 通知服务
participant Retry as 重试服务
participant Manual as 人工处理
User->>Payment: 1. 发起支付
Payment->>Payment: 2. 处理支付逻辑
Payment->>Payment: 3. 更新支付状态
Note over Payment,Notification: 异步通知阶段
Payment->>Notification: 4. 发送通知任务
par 并行通知多个系统
Notification->>Order: 5a. 通知订单系统
Order-->>Notification: 6a. 返回处理结果
and
Notification->>Inventory: 5b. 通知库存系统
Inventory-->>Notification: 6b. 返回处理结果
and
Notification->>Points: 5c. 通知积分系统
Points-->>Notification: 6c. 返回处理结果
end
alt 通知成功
Notification->>Notification: 7a. 记录通知成功
else 通知失败
Notification->>Retry: 7b. 进入重试队列
Retry->>Retry: 8. 按策略重试
alt 重试成功
Retry->>Notification: 9a. 标记通知成功
else 超过最大重试次数
Retry->>Manual: 9b. 进入人工处理
Manual->>Manual: 10. 人工干预处理
end
end
Detailed Architecture Design
graph TB
subgraph "支付系统"
A[支付处理] --> B[支付状态更新]
B --> C[通知任务创建]
end
subgraph "通知服务"
D[通知调度器] --> E[通知执行器]
E --> F[通知记录表]
G[重试调度器] --> H[重试执行器]
H --> I[重试记录表]
end
subgraph "下游系统"
J[订单系统] --> K[订单状态更新]
L[库存系统] --> M[库存扣减]
N[积分系统] --> O[积分增加]
end
subgraph "监控告警"
P[失败监控] --> Q[告警通知]
R[人工处理] --> S[补偿操作]
end
C --> D
E --> J
E --> L
E --> N
F --> G
I --> P
P --> R
style F fill:#e8f5e8
style I fill:#fff3e0
style R fill:#ffebee
Core Implementation Code
1. Notification Task Model
// 通知任务模型
type NotificationTask struct {
ID int64 `json:"id" gorm:"primaryKey"`
BusinessID string `json:"business_id" gorm:"index"` // 业务ID
BusinessType string `json:"business_type"` // 业务类型
TargetURL string `json:"target_url"` // 通知目标URL
Payload string `json:"payload"` // 通知内容
Status int `json:"status"` // 0:待通知 1:通知成功 2:通知失败
RetryCount int `json:"retry_count"` // 重试次数
MaxRetry int `json:"max_retry"` // 最大重试次数
NextRetryAt time.Time `json:"next_retry_at"` // 下次重试时间
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// 通知记录模型
type NotificationLog struct {
ID int64 `json:"id" gorm:"primaryKey"`
TaskID int64 `json:"task_id" gorm:"index"`
Status int `json:"status"` // 0:成功 1:失败
Response string `json:"response"` // 响应内容
ErrorMsg string `json:"error_msg"` // 错误信息
CreatedAt time.Time `json:"created_at"`
}
2. Notification Service Implementation
// 通知服务
type NotificationService struct {
db *gorm.DB
client *http.Client
logger *log.Logger
}
// 创建通知任务
func (ns *NotificationService) CreateNotificationTask(businessID, businessType, targetURL, payload string) error {
task := &NotificationTask{
BusinessID: businessID,
BusinessType: businessType,
TargetURL: targetURL,
Payload: payload,
Status: 0, // 待通知
RetryCount: 0,
MaxRetry: 5, // 默认最大重试5次
NextRetryAt: time.Now(),
}
return ns.db.Create(task).Error
}
// 执行通知
func (ns *NotificationService) ExecuteNotification(task *NotificationTask) error {
// 构建HTTP请求
req, err := http.NewRequest("POST", task.TargetURL, strings.NewReader(task.Payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Business-ID", task.BusinessID)
req.Header.Set("X-Business-Type", task.BusinessType)
// 发送请求
resp, err := ns.client.Do(req)
if err != nil {
ns.recordNotificationLog(task.ID, 1, "", err.Error())
return ns.handleNotificationFailure(task)
}
defer resp.Body.Close()
// 读取响应
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode == 200 {
// 通知成功
ns.recordNotificationLog(task.ID, 0, string(body), "")
return ns.handleNotificationSuccess(task)
} else {
// 通知失败
ns.recordNotificationLog(task.ID, 1, string(body), fmt.Sprintf("HTTP %d", resp.StatusCode))
return ns.handleNotificationFailure(task)
}
}
// 处理通知成功
func (ns *NotificationService) handleNotificationSuccess(task *NotificationTask) error {
return ns.db.Model(task).Updates(map[string]interface{}{
"status": 1, // 通知成功
}).Error
}
// 处理通知失败
func (ns *NotificationService) handleNotificationFailure(task *NotificationTask) error {
task.RetryCount++
if task.RetryCount >= task.MaxRetry {
// 超过最大重试次数,标记为失败
return ns.db.Model(task).Updates(map[string]interface{}{
"status": 2, // 通知失败
}).Error
} else {
// 计算下次重试时间(指数退避)
nextRetryAt := time.Now().Add(time.Duration(math.Pow(2, float64(task.RetryCount))) * time.Minute)
return ns.db.Model(task).Updates(map[string]interface{}{
"retry_count": task.RetryCount,
"next_retry_at": nextRetryAt,
}).Error
}
}
// 记录通知日志
func (ns *NotificationService) recordNotificationLog(taskID int64, status int, response, errorMsg string) {
log := &NotificationLog{
TaskID: taskID,
Status: status,
Response: response,
ErrorMsg: errorMsg,
CreatedAt: time.Now(),
}
ns.db.Create(log)
}
3. Retry Service Implementation
// 重试服务
type RetryService struct {
db *gorm.DB
notificationSvc *NotificationService
logger *log.Logger
}
// 启动重试服务
func (rs *RetryService) Start() {
ticker := time.NewTicker(30 * time.Second) // 每30秒检查一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
rs.processRetryTasks()
}
}
}
// 处理重试任务
func (rs *RetryService) processRetryTasks() {
var tasks []NotificationTask
// 查询需要重试的任务
err := rs.db.Where("status = ? AND retry_count < max_retry AND next_retry_at <= ?",
0, time.Now()).Find(&tasks).Error
if err != nil {
rs.logger.Printf("查询重试任务失败: %v", err)
return
}
for _, task := range tasks {
go rs.retryNotification(&task)
}
}
// 重试通知
func (rs *RetryService) retryNotification(task *NotificationTask) {
rs.logger.Printf("开始重试通知任务: %d, 重试次数: %d", task.ID, task.RetryCount+1)
if err := rs.notificationSvc.ExecuteNotification(task); err != nil {
rs.logger.Printf("重试通知失败: %v", err)
}
}
4. Payment System Integration
// 支付服务
type PaymentService struct {
db *gorm.DB
notificationSvc *NotificationService
}
// 处理支付成功
func (ps *PaymentService) HandlePaymentSuccess(paymentID string, amount float64, userID int64) error {
// 开启事务
tx := ps.db.Begin()
if tx.Error != nil {
return tx.Error
}
// 更新支付状态
if err := tx.Model(&Payment{}).Where("id = ?", paymentID).
Updates(map[string]interface{}{
"status": "SUCCESS",
"paid_at": time.Now(),
}).Error; err != nil {
tx.Rollback()
return err
}
// 提交事务
if err := tx.Commit().Error; err != nil {
return err
}
// 异步创建通知任务
go ps.createNotificationTasks(paymentID, amount, userID)
return nil
}
// 创建通知任务
func (ps *PaymentService) createNotificationTasks(paymentID string, amount float64, userID int64) {
// 通知订单系统
orderPayload := map[string]interface{}{
"payment_id": paymentID,
"status": "PAID",
"amount": amount,
}
payload, _ := json.Marshal(orderPayload)
ps.notificationSvc.CreateNotificationTask(
paymentID, "PAYMENT_SUCCESS",
"http://order-service/api/payment/notify",
string(payload),
)
// 通知库存系统
inventoryPayload := map[string]interface{}{
"payment_id": paymentID,
"user_id": userID,
"action": "DEDUCT",
}
payload, _ = json.Marshal(inventoryPayload)
ps.notificationSvc.CreateNotificationTask(
paymentID, "PAYMENT_SUCCESS",
"http://inventory-service/api/payment/notify",
string(payload),
)
// 通知积分系统
pointsPayload := map[string]interface{}{
"user_id": userID,
"points": int(amount), // 1元=1积分
"source": "PAYMENT",
}
payload, _ = json.Marshal(pointsPayload)
ps.notificationSvc.CreateNotificationTask(
paymentID, "PAYMENT_SUCCESS",
"http://points-service/api/points/add",
string(payload),
)
}
Retry Strategy Configuration
# 通知配置
notification:
max_retry: 5 # 最大重试次数
retry_interval: 30s # 重试检查间隔
exponential_backoff: true # 是否使用指数退避
base_delay: 1m # 基础延迟时间
max_delay: 30m # 最大延迟时间
# 不同业务类型的重试策略
business_strategies:
PAYMENT_SUCCESS:
max_retry: 5
base_delay: 1m
ORDER_UPDATE:
max_retry: 3
base_delay: 30s
INVENTORY_DEDUCT:
max_retry: 5
base_delay: 2m
Monitoring and Alerting
// 监控服务
type MonitorService struct {
db *gorm.DB
}
// 检查失败通知
func (ms *MonitorService) CheckFailedNotifications() {
var failedCount int64
// 统计24小时内失败的通知
ms.db.Model(&NotificationTask{}).
Where("status = ? AND updated_at >= ?", 2, time.Now().Add(-24*time.Hour)).
Count(&failedCount)
if failedCount > 10 { // 失败数量超过阈值
ms.sendAlert(fmt.Sprintf("通知失败数量过多: %d", failedCount))
}
}
// 发送告警
func (ms *MonitorService) sendAlert(message string) {
// 发送邮件、短信、钉钉等告警
log.Printf("告警: %s", message)
}
Pros and Cons Analysis
Advantages:
- Simple Implementation: Clear logic, easy to understand and implement.
- Good Performance: Asynchronous processing, does not block the main business flow.
- High Flexibility: Different retry strategies can be set for different businesses.
- Strong Fault Tolerance: Improves success rate through retry mechanisms.
Disadvantages:
- Eventual Consistency: Only guarantees eventual consistency, not strong consistency.
- Data may be inconsistent: Data may be in an inconsistent state during retries.
- Requires manual intervention: Manual processing is required after exceeding the maximum number of retries.
- Complex Monitoring: Requires comprehensive monitoring and alerting mechanisms.
Applicable Scenarios
- Business scenarios with relatively relaxed consistency requirements.
- Distributed systems that can accept eventual consistency.
- Notification-type businesses: such as payment notifications, status change notifications, etc.
- Non-core businesses: such as points issuance, message pushing, etc.
Best Practices
- Reasonable retry strategy settings: Set different retry counts and intervals based on business importance.
- Implement idempotency: The receiver must implement idempotent processing.
- Comprehensive monitoring: Monitor key metrics such as notification success rate and failure rate.
- Manual processing workflow: Establish a comprehensive manual processing workflow and tools.
- Data reconciliation: Regularly perform data reconciliation to discover and resolve inconsistencies.
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6760