RocketMQ Quick Start
Refer to our various configurations (podman) for installation instructions.
Concept Introduction
RocketMQ is a distributed messaging middleware open-sourced by Alibaba and an Apache top-level project. Its core components are:
- NameServer: Service discovery and routing
- Broker: Message storage, delivery, and fetching
- Producer: Message producer (sends messages)
- Consumer: Message consumer (subscribes to and consumes messages)
- Topic/Tag: Used for message grouping and filtering
Production and Consumption Model: The Producer sends messages to a Topic; the Broker persists them and makes them available for the Consumer to fetch; the Consumer consumes messages in either clustering or broadcasting mode.
Code examples in this chapter use Go (pseudocode/illustration). Method names may vary slightly across different SDKs; please refer to your actual version.
Categorized by Sending Characteristics
1. Synchronous Sending
Synchronous sending waits for the Broker to return the sending result, suitable for scenarios requiring high reliability (e.g., placing an order, creating an order event).
// Synchronous sending
msg := rocketmq.NewMessage("OrderTopic", []byte("order-created"))
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
// Failure handling/retry
}
log.Printf("SendOK: %v", res)
2. Asynchronous Sending
Asynchronous sending does not block the main thread and obtains results via callbacks, suitable for scenarios with long processing chains or high throughput requirements.
// Asynchronous sending
msg := rocketmq.NewMessage("LogTopic", []byte("user-action"))
producer.SendAsync(context.Background(), msg, func(res *SendResult, err error) {
if err != nil {
// Log failure, retry later
return
}
log.Printf("AsyncSendOK: %v", res)
})
3. One-Way Sending (OneWay)
One-way sending only attempts to send messages on a 'best-effort' basis without caring about the result, suitable for scenarios with low reliability requirements such as log collection and event tracking.
// One-way sending
_ = producer.SendOneWay(context.Background(), rocketmq.NewMessage("TraceTopic", []byte("trace")))
Categorized by Functional Characteristics
1. Normal Messages (Subscription)
The most common publish/subscribe model. Consumers can operate in clustering mode (load balancing) or broadcasting mode (each consumer receives the message).
// Consumer subscribes to normal messages
consumer.Subscribe("OrderTopic", rocketmq.FilterByTag("created"), func(msg *MessageExt) ConsumeResult {
// Idempotent processing
// Business logic...
return ConsumeSuccess
})
Key points:
- Idempotency: Use a unique business key or a deduplication table to avoid duplicate consumption
- Retries and Dead-Letter Queue (DLQ): Failed messages are retried, and if they exceed the threshold, they enter the DLQ
2. Ordered Messages
Ordered messages are divided into global order and partition order. A common practice is to route messages by a business key (e.g., order ID) to the same queue, ensuring that messages for the 'same order' are processed in order.
// Producer selects queue by business key (illustration)
shardingKey := orderID
msg := rocketmq.NewMessage("OrderSeqTopic", []byte("status-changed"))
msg.WithShardingKey(shardingKey)
_, _ = producer.SendSync(ctx, msg)
Note: To ensure that messages with the same business key land in the same queue, consumers typically process them in a single thread or serially per queue.
3. Delayed Messages (Scheduled/Delayed)
Used to deliver messages to consumers after a specified time, such as 'order timeout cancellation' or 'check payment result later'.
// Send a delayed message visible after 30s (different SDKs may use delayLevel or deliverTime)
msg := rocketmq.NewMessage("DelayTopic", []byte("close-order"))
msg.SetDelay(time.Second * 30)
_, _ = producer.SendSync(ctx, msg)
Practical considerations:
- Appropriate delay level/absolute delivery time
- Consumer still requires idempotency and compensation
4. Transactional Messages (Distributed Transactions)
Used to ensure eventual consistency of 'local transaction + message'. Process: Send half message → Execute local transaction → Commit/Rollback based on result; if the Broker does not receive confirmation, it will check the business status.
sequenceDiagram
participant P as Producer
participant MQ as RocketMQ
participant DB as LocalDB
P->>MQ: Send half message
P->>DB: Execute local transaction
alt Success
P->>MQ: Commit
MQ->>C: Deliver formal message
else Failure
P->>MQ: Rollback
end
MQ->>P: Check unconfirmed transactions
For more details, refer to the 'Transactional Messages' and 'TCC/Local Message Table' sections in 013.md in this repository.
Producer and Consumer Quick Example
// Producer initialization (illustration)
producer, _ := rocketmq.NewProducer(rocketmq.ProducerConfig{
NameServer: []string{"127.0.0.1:9876"},
Group: "demo-producer-group",
})
defer producer.Shutdown()
// Consumer initialization (illustration)
consumer, _ := rocketmq.NewPushConsumer(rocketmq.ConsumerConfig{
NameServer: []string{"127.0.0.1:9876"},
Group: "demo-consumer-group",
Model: rocketmq.Clustering, // or Broadcasting
})
defer consumer.Shutdown()
Advantages of Distributed Transactional Messages
- Decoupling: Upstream and downstream collaborate via events, reducing tight coupling
- Elasticity and Scalability: Asynchronous peak shaving, supporting high concurrency
- Reliability: Message persistence, retry/reconciliation on failure
- Eventual Consistency: Achieved through compensation and callbacks under AP trade-offs
Applicable scenarios: Order creation/payment, inventory deduction, points/coupon issuance, fund accounting, status synchronization, etc.
Common Practice Recommendations
- Consumer idempotency: Unique business key, deduplication table, optimistic lock
- Failure retry and Dead-Letter Queue (DLQ) configuration
- Monitoring and alerting: Backlog, failure rate, latency
- Combine with delayed messages to implement 'timeout closure/callback'
- Transactional messages are only used in critical paths; for others, use local message tables or best-effort notification
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6761