Go工程師體系課 014

rocketmq 快速入門

去我們的各種配置(podman)看是怎麼安裝的


概念介紹

RocketMQ 是阿里開源、Apache 頂級項目的分佈式消息中間件,核心組件:

  • NameServer:服務發現與路由
  • Broker:消息存儲、投遞、拉取
  • Producer:消息生產者(發送消息)
  • Consumer:消息消費者(訂閱並消費消息)
  • Topic/Tag:主題/標籤,用於消息分組與過濾

生產與消費模型:Producer 將消息發送到某個 Topic;Broker 進行持久化並供 Consumer 拉取;Consumer 以集羣或廣播模式消費。

代碼示例本章以 Go 爲例(僞代碼/示意),不同 SDK 方法名略有差異,請以實際版本爲準。


按照發送的特點分

1. 同步發送

同步發送會等待 Broker 返回發送結果,適合對可靠性有要求的場景(如下單、創建訂單事件)。

// 同步發送
msg := rocketmq.NewMessage("OrderTopic", []byte("order-created"))
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
    // 失敗處理/重試
}
log.Printf("SendOK: %v", res)

2. 異步發送

異步發送不會阻塞主線程,通過回調獲取結果,適合鏈路較長或吞吐要求高的場景。

// 異步發送
msg := rocketmq.NewMessage("LogTopic", []byte("user-action"))
producer.SendAsync(context.Background(), msg, func(res *SendResult, err error) {
    if err != nil {
        // 記錄失敗,後續重試
        return
    }
    log.Printf("AsyncSendOK: %v", res)
})

3. 單向發送(OneWay)

單向發送只負責把消息“盡力而爲”地發出,不關心結果,適用於日誌收集、埋點等對可靠性要求低的場景。

// 單向發送
_ = producer.SendOneWay(context.Background(), rocketmq.NewMessage("TraceTopic", []byte("trace")))

按照使用功能特點分

1. 普通消息(訂閱)

最常見的發佈/訂閱模型。消費者可採用集羣模式(負載均衡)或廣播模式(每個消費者都收到)。

// 消費者訂閱普通消息
consumer.Subscribe("OrderTopic", rocketmq.FilterByTag("created"), func(msg *MessageExt) ConsumeResult {
    // 冪等處理
    // 業務邏輯...
    return ConsumeSuccess
})

要點:

  • 冪等性:用業務唯一鍵或去重表避免重複消費
  • 重試與死信:失敗返回重試,超過閾值進入 DLQ

2. 順序消息

順序消息分爲全局順序和分區順序。常見做法是按業務鍵(如訂單號)將消息路由到同一個隊列,保證“同一訂單”的消息有序。

// 生產者按業務鍵選擇隊列(示意)
shardingKey := orderID
msg := rocketmq.NewMessage("OrderSeqTopic", []byte("status-changed"))
msg.WithShardingKey(shardingKey)
_, _ = producer.SendSync(ctx, msg)

注意:要保證同一業務鍵落在同一隊列,消費者通常單線程或按隊列串行處理。

3. 延時消息(定時/延遲)

用於在指定時間後再投遞給消費者,例如“訂單超時取消”“支付結果稍後檢查”等。

// 發送 30s 後可見的延時消息(不同 SDK 可用 delayLevel 或 deliverTime)
msg := rocketmq.NewMessage("DelayTopic", []byte("close-order"))
msg.SetDelay(time.Second * 30)
_, _ = producer.SendSync(ctx, msg)

實踐要點:

  • 合理的延遲等級/絕對投遞時間
  • 消費端仍需冪等與補償

4. 事務消息(分佈式事務)

用於保證“本地事務 + 消息”最終一致。流程:發送半消息 → 執行本地事務 → 根據結果 Commit/Rollback;Broker 未收到確認會回查業務狀態。

sequenceDiagram
  participant P as Producer
  participant MQ as RocketMQ
  participant DB as LocalDB
  P->>MQ: 發送半消息
  P->>DB: 執行本地事務
  alt 成功
    P->>MQ: Commit
    MQ->>C: 投遞正式消息
  else 失敗
    P->>MQ: Rollback
  end
  MQ->>P: 回查未確認事務

更多細節可參考本倉庫 013.md 中“事務消息”與“TCC/本地消息表”等章節。


生產者與消費者快速示例

// Producer 初始化(示意)
producer, _ := rocketmq.NewProducer(rocketmq.ProducerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-producer-group",
})
defer producer.Shutdown()

// Consumer 初始化(示意)
consumer, _ := rocketmq.NewPushConsumer(rocketmq.ConsumerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-consumer-group",
    Model:      rocketmq.Clustering, // 或 Broadcasting
})
defer consumer.Shutdown()

分佈式事務消息的優勢

  • 解耦:上下游通過事件協作,降低強耦合
  • 彈性與可擴展:異步削峯,支持高併發
  • 可靠性:消息持久化,失敗可重試/對賬
  • 最終一致:在 AP 取捨下通過補償與回查達到一致

適用場景:訂單創建/支付、庫存扣減、積分/優惠券發放、資金記賬、狀態同步等。


常見實踐建議

  • 消費端冪等:唯一業務鍵、去重表、樂觀鎖
  • 失敗重試與死信隊列(DLQ)配置
  • 監控與告警:積壓、失敗率、耗時
  • 結合延時消息實現“超時關閉/回查”
  • 事務消息只在關鍵鏈路使用,其餘用本地消息表或最大努力通知

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6780

(0)
Walker的頭像Walker
上一篇 2026年3月8日 15:11
下一篇 2026年3月9日 12:56

相關推薦

  • Go工程師體系課 014

    rocketmq 快速入門 去我們的各種配置(podman)看是怎麼安裝的 概念介紹 RocketMQ 是阿里開源、Apache 頂級項目的分佈式消息中間件,核心組件: NameServer:服務發現與路由 Broker:消息存儲、投遞、拉取 Producer:消息生產者(發送消息) Consumer:消息消費者(訂閱並消費消息) Topic/Tag:主題/…

    後端開發 2026年3月7日
    14200
  • Go工程師體系課 013

    訂單事務 先扣庫存 後扣庫存 都會對庫存和訂單都會有影響, 所以要使用分佈式事務 業務(下單不對付)業務問題 支付成功再扣減(下單了,支付時沒庫存了) 訂單扣減,不支付(訂單超時歸還)【常用方式】 事務和分佈式事務 1. 什麼是事務? 事務(Transaction)是數據庫管理系統中的一個重要概念,它是一組數據庫操作的集合,這些操作要麼全部成功執行,要麼全部…

    後端開發 2026年3月7日
    5700
  • Go資深工程師講解(慕課) 006_函數式編程

    Go 函數式編程 對應視頻 Ch6(6-2 函數式編程例一),在 002.md 基礎上擴展更多函數式編程模式 1. 回顧:Go 中函數是一等公民 Go 不是純函數式語言,但函數可以作爲:- 變量- 參數- 返回值- 存放在數據結構中 // 函數作爲變量 var add = func(a, b int) int { return a + b } // 函數作爲…

    後端開發 2026年3月6日
    5800
  • Go工程師體系課 012

    Go 中集成 Elasticsearch 1. 客戶端庫選擇 1.1 主流 Go ES 客戶端 olivere/elastic:功能最全面,API 設計優雅,支持 ES 7.x/8.x elastic/go-elasticsearch:官方客戶端,輕量級,更接近原生 REST API go-elasticsearch/elasticsearch:社區維護的官…

    後端開發 2026年3月7日
    5400
  • Go工程師體系課 017

    限流、熔斷與降級入門(含 Sentinel 實戰) 結合課件第 3 章(3-1 ~ 3-9)的視頻要點,整理一套面向初學者的服務保護指南,幫助理解“爲什麼需要限流、熔斷和降級”,以及如何用 Sentinel 快速上手。 學習路線速覽 3-1 理解服務雪崩與限流、熔斷、降級的背景 3-2 Sentinel 與 Hystrix 對比,明確技術選型 3-3 Sen…

    後端開發 2026年3月7日
    8800
簡體中文 繁體中文 English