Go Engineer Systematic Course 014

RocketMQ Quick Start

Refer to our various configurations (podman) to see how it's installed.


Concept Introduction

RocketMQ is a distributed messaging middleware open-sourced by Alibaba and an Apache top-level project. Its core components are:

  • NameServer: Service Discovery and Routing
  • Broker: Message Storage, Delivery, and Fetching
  • Producer: Message Producer (sends messages)
  • Consumer: Message Consumer (subscribes to and consumes messages)
  • Topic/Tag: Used for message grouping and filtering

Production and Consumption Model: The Producer sends messages to a Topic; the Broker persists them for the Consumer to pull; the Consumer consumes in cluster or broadcast mode.

Code examples in this chapter use Go (pseudocode/illustration). Method names may vary slightly across different SDKs; please refer to the actual version.


Categorized by Sending Characteristics

1. Synchronous Sending

Synchronous sending waits for the Broker to return the sending result, suitable for scenarios requiring high reliability (e.g., placing an order, creating an order event).

// 同步发送
msg := rocketmq.NewMessage("OrderTopic", []byte("order-created"))
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
    // 失败处理/重试
}
log.Printf("SendOK: %v", res)

2. Asynchronous Sending

Asynchronous sending does not block the main thread and obtains results via callbacks, suitable for scenarios with long chains or high throughput requirements.

// 异步发送
msg := rocketmq.NewMessage("LogTopic", []byte("user-action"))
producer.SendAsync(context.Background(), msg, func(res *SendResult, err error) {
    if err != nil {
        // 记录失败,后续重试
        return
    }
    log.Printf("AsyncSendOK: %v", res)
})

3. One-way Sending (OneWay)

One-way sending only attempts to send messages on a 'best-effort' basis without caring about the result, suitable for scenarios with low reliability requirements such as log collection and event tracking.

// 单向发送
_ = producer.SendOneWay(context.Background(), rocketmq.NewMessage("TraceTopic", []byte("trace")))

Categorized by Functional Characteristics

1. Normal Messages (Subscription)

The most common publish/subscribe model. Consumers can adopt cluster mode (load balancing) or broadcast mode (each consumer receives the message).

// 消费者订阅普通消息
consumer.Subscribe("OrderTopic", rocketmq.FilterByTag("created"), func(msg *MessageExt) ConsumeResult {
    // 幂等处理
    // 业务逻辑...
    return ConsumeSuccess
})

Key points:

  • Idempotency: Use a unique business key or a deduplication table to avoid duplicate consumption
  • Retries and Dead-Letter Queue (DLQ): Failed messages are retried, and if they exceed the threshold, they enter the DLQ

2. Sequential Messages

Sequential messages are divided into global order and partitioned order. A common practice is to route messages by business key (e.g., order ID) to the same queue, ensuring that messages for the 'same order' are in order.

// 生产者按业务键选择队列(示意)
shardingKey := orderID
msg := rocketmq.NewMessage("OrderSeqTopic", []byte("status-changed"))
msg.WithShardingKey(shardingKey)
_, _ = producer.SendSync(ctx, msg)

Note: To ensure that messages with the same business key fall into the same queue, consumers typically process them in a single thread or serially by queue.

3. Delayed Messages (Scheduled/Delayed)

Used to deliver messages to consumers after a specified time, such as 'order timeout cancellation' or 'payment result check later'.

// 发送 30s 后可见的延时消息(不同 SDK 可用 delayLevel 或 deliverTime)
msg := rocketmq.NewMessage("DelayTopic", []byte("close-order"))
msg.SetDelay(time.Second * 30)
_, _ = producer.SendSync(ctx, msg)

Practical considerations:

  • Appropriate delay level/absolute delivery time
  • Consumer still requires idempotency and compensation

4. Transactional Messages (Distributed Transactions)

Used to ensure eventual consistency of 'local transaction + message'. Process: Send half message → Execute local transaction → Commit/Rollback based on the result; if the Broker does not receive confirmation, it will check the business status.

sequenceDiagram
  participant P as Producer
  participant MQ as RocketMQ
  participant DB as LocalDB
  P->>MQ: 发送半消息
  P->>DB: 执行本地事务
  alt 成功
    P->>MQ: Commit
    MQ->>C: 投递正式消息
  else 失败
    P->>MQ: Rollback
  end
  MQ->>P: 回查未确认事务

For more details, refer to the 'Transactional Messages' and 'TCC/Local Message Table' sections in 013.md in this repository.


Producer and Consumer Quick Example

// Producer 初始化(示意)
producer, _ := rocketmq.NewProducer(rocketmq.ProducerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-producer-group",
})
defer producer.Shutdown()

// Consumer 初始化(示意)
consumer, _ := rocketmq.NewPushConsumer(rocketmq.ConsumerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-consumer-group",
    Model:      rocketmq.Clustering, // 或 Broadcasting
})
defer consumer.Shutdown()

Advantages of Distributed Transactional Messages

  • Decoupling: Upstream and downstream collaborate through events, reducing tight coupling
  • Elasticity and Scalability: Asynchronous peak shaving, supporting high concurrency
  • Reliability: Message persistence, retry/reconciliation on failure
  • Eventual Consistency: Achieved through compensation and callbacks under AP trade-offs

Applicable scenarios: Order creation/payment, inventory deduction, points/coupon issuance, fund accounting, status synchronization, etc.


Common Practical Recommendations

  • Consumer idempotency: Unique business key, deduplication table, optimistic lock
  • Failure retry and Dead-Letter Queue (DLQ) configuration
  • Monitoring and alerting: Backlog, failure rate, latency
  • Combine with delayed messages to achieve 'timeout closure/callback'
  • Transactional messages are only used in critical paths; others use local message tables or best-effort notifications

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6780

(0)
Walker的头像Walker
上一篇 11 hours ago
下一篇 12 hours ago

Related Posts

EN
简体中文 繁體中文 English