tft每日頭條

 > 職場

 > 程序發送全局消息什麼原理

程序發送全局消息什麼原理

職場 更新时间:2024-08-21 11:42:07

本篇将重點關注 DefaultMQProducer 中的相關屬性,以便從這些屬性窺探 RocketMQ 消息發送較為底層的原理。

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)1

從 DefaultMQProducer 的類圖就可以看出其屬性主要來源于 ClientConfig、DefaultMQProducer,故接下來将分兩部分進行介紹。

DefaultMQProducer 參數一覽

DefaultMQProducer 的參數如下:

InternalLogger log = ClientLogger.getLog()

客戶端的日志實現類,RocketMQ 客戶端的日志路徑為 ${user.home}/logs/rocketmqlogs/rocketmq_client.log。在排查問題時可以從日志文件下手,尋找錯誤日志,為解決問題提供必要的信息。其中 user.home 為用戶的主目錄。

producerGroup

發送者所屬組,開源版本的 RocketMQ,發送者所屬組主要的用途是事務消息,Broker 需要向消息發送者回查事務狀态。可以通過相關命令或 RocketMQ-Console 查看某一個 Topic 指定消費組的客戶端,如下圖所示:

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)2

defaultTopicQueueNums = 4

通過生産者創建 Topic 時默認的隊列數量。

sendMsgTimeout = 3000

消息發送默認超時時間,單位為毫秒。值得注意的是在 RocketMQ 4.3.0 版本之前,由于存在重試機制,設置的設計為單次重試的超時時間,即如果設置重試次數為 3 次,則 DefaultMQProducer#send 方法可能會超過 9s 才返回;該問題在 RocketMQ 4.3.0 版本進行了優化,設置的超時時間為總的超時時間,即如果超時時間設置 3s,重試次數設置為 10 次,可能不會重試 10 次,例如在重試到第 5 次的時候,已經超過 3s 了,試圖嘗試第 6 次重試時會退出,抛出超時異常,停止重試。

compressMsgBodyOverHowmuch

壓縮的闊值,默認為 4k,即當消息的消息體超過 4k,則會使用 zip 對消息體進行壓縮,會增加 Broker 端的 CPU 消耗,但能提高網絡方面的開銷。

retryTimesWhenSendFailed

同步消息發送重試次數。RocketMQ 客戶端内部在消息發送失敗時默認會重試 2 次。請主要該參數與 sendMsgTimeout 會聯合起來生效,詳情請參照上文所述。

retryTimesWhenSendAsyncFailed

異步消息發送重試次數,默認為 2,即重試 2 次,通常情況下有三次機會。

retryAnotherBrokerWhenNotStoreOK

該參數的本意是如果客戶端收到的結果不是 SEND_OK,應該是不問源由的繼續向另外一個 Broker 重試,但根據代碼分析,目前這個參數并不能按預期運作,應該是一個 Bug。

int maxMessageSize

允許發送的最大消息體,默認為 4M,服務端(Broker)也有 maxMessageSize 這個參數的設置,故客戶端的設置不能超過服務端的配置,最佳實踐為客戶端的配置小于服務端的配置。

sendLatencyFaultEnable

是否開啟失敗延遲規避機制。RocketMQ 客戶端内部在重試時會規避上一次發送失敗的 Broker,如果開啟延遲失敗規避,則在未來的某一段時間内不向該 Broker 發送消息,具體機制在本篇的第三部分詳細展開。默認為 false,不開啟。

notAvailableDuration

不可用的延遲數組,默認值為 {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L},即每次觸發 Broker 的延遲時間是一個階梯的,會根據每次消息發送的延遲時間來選擇在未來多久内不向該 Broker 發送消息。

latencyMax

設置消息發送的最大延遲級别,默認值為 {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L},個數與 notAvailableDuration 對應,關于 Broker 的延遲關閉機制将在本文第三部詳細探讨。

ClientConfig 參數一覽

ClientConfig 顧名思義,客戶端的配置,在 RocketMQ 中消息發送者(Producer)和消息消費者(Consumer),即上面的配置生産者、消費者是通用的。

namesrvAddr

NameServer 的地址列表。

clientIP

客戶端 IP,通過 RemotingUtil#getLocalAddress 方法獲取,在 4.7.0 版本中優先會返回不是 127.0.0.1 和 192.168 開頭的最後一個 IPV4 或第一個 IPV6。客戶端 IP 主要是用來定位消費者的,clientIP 會當成客戶端 id 的組成部分。

如下圖所示:在菜單 [Consumer] 列表中點擊一個消費組,點擊按鈕 [client] 可以查閱其客戶端(消費者)。

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)3

instanceName

客戶端實例名稱,是客戶端标識 CID 的組成部分,在第三篇會詳細其 CID 與場景的使用問題。

unitName

定義一個單元,主要用途:客戶端 CID 的組成部分;如果獲取 NameServer 的地址是通過 URL 進行動态更新的話,會将該值附加到當中,即可以區分不同的獲取 NameServer 地址的服務。

clientCallbackExecutorThreads

客戶端 public 回調的線程池線程數量,默認為 CPU 核數,不建議改變該值。

namespace

客戶端命名空間,從 4.5.1 版本被引入,在第三篇中已詳細介紹。

pollNameServerInterval

客戶端從 NameServer 更新 Topic 的間隔,默認值 30s,就 Producer、Consumer 會每隔 30s 向 NameServer 更新 Topic 的路由信息,該值不建議修改。

heartbeatBrokerInterval

客戶端向 Broker 發送心跳包的時間間隔,默認為 30s,該值不建議修改。

persistConsumerOffsetInterval

客戶端持久化消息消費進度的間隔,默認為 5s,該值不建議修改。

核心參數工作機制與使用建議

消息發送高可用設計與故障規避機制

熟悉 RocketMQ 的小夥伴應該都知道,RocketMQ Topic 路由注冊中心 NameServer 采用的是最終一緻性模型,而且客戶端是定時向 NameServer 更新 Topic 的路由信息,即客戶端(Producer、Consumer)是無法實時感知 Broker 宕機的,這樣消息發送者會繼續向已宕機的 Broker 發送消息,造成消息發送異常。那 RocketMQ 是如何保證消息發送的高可用性呢?

RocketMQ 為了保證消息發送的高可用性,在内部引入了重試機制,默認重試 2 次。RocketMQ 消息發送端采取的隊列負載均衡默認采用輪循。

在 RocketMQ 中消息發送者是線程安全的,即一個消息發送者可以在多線程環境中安全使用。每一個消息發送者全局會維護一個 Topic 上一次選擇的隊列,然後基于這個序号進行遞增輪循,引入了 ThreadLocal 機制,即每一個發送者線程持有一個上一次選擇的隊列,用 sendWhichQueue 表示。

接下來舉例消息隊列負載機制,例如 topicA 的路由信息如下圖所示:

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)4

正如上圖所 topicA 在 broker-a、broker-b 上分别創建了 4 個隊列,例如一個線程使用 Producer 發送消息時,通過對 sendWhichQueue getAndIncrement() 方法獲取下一個隊列。

例如在發送之前 sendWhichQueue 該值為 broker-a 的 q1,如果由于此時 broker-a 的突發流量異常大導緻消息發送失敗,會觸發重試,按照輪循機制,下一個選擇的隊列為 broker-a 的 q2 隊列,此次消息發送大概率還是會失敗,即盡管會重試 2 次,但都是發送給同一個 Broker 處理,此過程會顯得不那麼靠譜,即大概率還是會失敗,那這樣重試的意義将大打折扣。

故 RocketMQ 為了解決該問題,引入了故障規避機制,在消息重試的時候,會盡量規避上一次發送的 Broker,回到上述示例,當消息發往 broker-a q1 隊列時返回發送失敗,那重試的時候,會先排除 broker-a 中所有隊列,即這次會選擇 broker-b q1 隊列,增大消息發送的成功率。

上述規避思路是默認生效的,即無需幹預。

但 RocketMQ 提供了兩種規避策略,該參數由 sendLatencyFaultEnable 控制,用戶可幹預,表示是否開啟延遲規避機制,默認為不開啟。

  • sendLatencyFaultEnable 設置為 false:默認值,不開啟,延遲規避策略隻在重試時生效,例如在一次消息發送過程中如果遇到消息發送失敗,規避 broekr-a,但是在下一次消息發送時,即再次調用 DefaultMQProducer 的 send 方法發送消息時,還是會選擇 broker-a 的消息進行發送,隻要繼續發送失敗後,重試時再次規避 broker-a。
  • sendLatencyFaultEnable 設置為 true:開啟延遲規避機制,一旦消息發送失敗會将 broker-a “悲觀”地認為在接下來的一段時間内該 Broker 不可用,在為未來某一段時間内所有的客戶端不會向該 Broker 發送消息。這個延遲時間就是通過 notAvailableDuration、latencyMax 共同計算的,就首先先計算本次消息發送失敗所耗的時延,然後對應 latencyMax 中哪個區間,即計算在 latencyMax 的下标,然後返回 notAvailableDuration 同一個下标對應的延遲值。

溫馨提示:如果所有的 Broker 都觸發了故障規避,并且 Broker 隻是那一瞬間壓力大,那豈不是明明存在可用的 Broker,但經過你這樣規避,反倒是沒有 Broker 可用來,那豈不是更糟糕了?針對這個問題,會退化到隊列輪循機制,即不考慮故障規避這個因素,按自然順序進行選擇進行兜底。

筆者實戰經驗分享

按照筆者的實踐經驗,RocketMQ Broker 的繁忙基本都是瞬時的,而且通常與系統 PageCache 内核的管理相關,很快就能恢複,故不建議開啟延遲機制。因為一旦開啟延遲機制,例如 5 分鐘内不會向一個 Broker 發送消息,這樣會導緻消息在其他 Broker 激增,從而會導緻部分消費端無法消費到消息,增大其他消費者的處理壓力,導緻整體消費性能的下降。

客戶端 ID 與使用陷進

介紹客戶端 ID 主要的目的是,能在如下場景正确使用消息發送與消費。

  • 同一套代碼能否在同一台機器上部署多個實例?
  • 同一套代碼能向不同的 NameServer 集群發送消息、消費消息嗎?

本篇的試驗環境部署架構如下:

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)5

部署了兩套 RocketMQ 集群,在 DefaultCluster 集群上創建 Topic——dw_test_01,并在 DefaultClusterb 上創建 Topic——dw_test_02,現在的需求是 order-service-app 要向 dw_test_01、dw_test_02 上發送消息。給出的示例代碼如下:

public static void main(String[] args) throws Exception{ // 創建第一個生産者 DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group1"); producer.setNamesrvAddr("192.168.3.10:9876"); producer.start(); // 創建第二個生産者 DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2"); producer2.setNamesrvAddr("192.168.3.19:9876"); producer2.start(); try { // 向第一個 RocketMQ 集群發送消息 SendResult result1 = producer.send( new Message("dw_test_01" , "hello 192.168.3.10 nameserver".getBytes())); System.out.printf("%s%n", result1); } catch (Throwable e) { System.out.println("-----first------------"); e.printStackTrace(); System.out.println("-----first------------"); } try { // 向第一個 RocketMQ 集群發送消息 SendResult result2 = producer2.send( new Message("dw_test_02" , "hello 192.168.3.19 nameserver".getBytes())); System.out.printf("%s%n", result2); } catch (Throwable e) { System.out.println("-----secornd------------"); e.printStackTrace(); System.out.println("-----secornd------------"); } //睡眠 10s,簡單延遲該任務的結束 Thread.sleep(10000); }

運行結果如下圖所示:

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)6

在向集群 2 發送消息時出現 Topic 不存在,但明明創建了 dw_test_02,而且如果單獨向集群 2 的 dw_test_02 發送消息确能成功,初步排查是創建了兩個到不同集群的 Producer 引起的,那這是為什麼呢?如果解決呢?

1. 問題分析

要解決該問題,首先得理解 RocketMQ Client 的核心組成部分,如下圖所示:

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)7

上述中幾個核心關鍵點如下:

  • MQClientInstance:RocketMQ 客戶端一個非常重要的對象,代表一個 MQ 客戶端,并且其唯一标識為 clientId。該對象中會持有衆多的消息發送者客戶端 producerTable,其鍵為消息發送者組;同樣可以創建多個消費組,以消費組為鍵存儲在 consumerTable 中。
  • 一個 JVM 進程中,即一個應用程序中是否能創建多個 MQClientInstance 呢?同樣是可以的,MQClientManager 對象持有一個 MQClientInstance 容器,鍵為 clientId。

那既然一個 JVM 中能支持創建多個生産者,那為什麼上面的示例中創建了兩個生産者,并且生産者組也不一樣,那為什麼不能正常工作呢?

這是因為上述兩個 Producer 對應的 clinetId 相同,會對應同一個 MQClientInstance 對象,這樣兩個生産者都會注冊到一個 MQClientInstance,即這兩個生産者使用的配置為第一個生産者的配置,即配置的 nameserver 地址為 192.168.3.10:9876,而在集群 1 上并沒有創建 topic——dw_test_02,故無法找到對應的主題,而抛出上述錯誤。

我們可以通過調用 DefaultMQProducer 的 buildMQClientId() 方法,查看其生成的 clientId,運行後的結果如下圖所示:

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)8

那解決思路就非常清晰了,我們隻需要改變兩者的 clientId 即可,故接下來看一下 RocketMQ 中 clientId 的生成規則。

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)9

溫馨提示:該方法定義在 ClientConfig 中,RocketMQ 生産者、消費者都是 ClientConfig 的子類。

clientId 的生成策略如下:

  • clientIp:客戶端的 IP 地址。
  • instanceName:實例名稱,默認值為 DEFAULT,但在真正 clientConfig 的 getInstanceName 方法時如果實例名稱為 DEFAULT,會自動将其替換為進程的 PID。
  • unitName:單元名稱,如果不為空,則會追加到 clientId 中。

了解到 clientId 的生成規則後,提出解決方案已是水到渠成的事情了。

2. 解決方案

結合 clientId 三個組成部分,我不建議修改 instanceName,讓其保持默認值 DEFAULT,這樣在真正的運行過程中會自動變更為進程的 pid,這樣能解決同一套代碼在同一台機器上部署多個進程,這樣 clientId 并不會重複,故我建議大家修改 unitName,可以考慮将其修改為集群的名稱,修改後的代碼如下所示:

public static void main(String[] args) throws Exception{ //省略代碼 DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2"); producer2.setNamesrvAddr("192.168.3.19:9876"); producer2.setUnitName("DefaultClusterb"); producer2.start();   //省略代碼

運行結果如下圖所示:

程序發送全局消息什麼原理(消息發送核心參數與工作原理詳解)10

完美解決。

小結

本篇首先介紹了消息發送者所有的配置參數及其基本含義,緊接着詳細介紹了 RocketMQ 消息發送故障規避機制、消息客戶端 ID 的生成策略,以及實戰中如何使用,并且告知如何避坑。

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关職場资讯推荐

热门職場资讯推荐

网友关注

Copyright 2023-2024 - www.tftnews.com All Rights Reserved