tft每日頭條

 > 科技

 > rocketmq架構分析

rocketmq架構分析

科技 更新时间:2025-01-23 12:28:20
一、引言

消息系統我們在項目中經常使用,但是如何自己設計一個消息系統呢?一個消息系統需要有消息的生産者、消費者,還需要具備消息的存儲功能,設計時要考慮如下問題:

  • 消息發送:生産者如何獲取消息服務器地址,消息如何發送,如何區分不同的消息,怎樣保證消息不丢失,如果消息服務器增減機器,生産者如何感知。
  • 消息存儲:消息如何存儲,如何寫文件,怎樣具備高可用,低延遲。
  • 消息消費:消費者怎樣找到消息,怎麼保證消息被消費者消費,消息是否可以消費多次。

我們從學習RocketMQ入手,來學習消息系統的設計。

二、RocketMQ架構

RocketMQ的邏輯部署圖:

rocketmq架構分析(跟RocketMQ學習消息系統設計)1

RocketMQ邏輯部署圖

RocketMQ的角色包含Broker、NameServer、Producer和Consumer。

  • Broker:用于接收Producer發送的消息,向Consumer轉發消息,進行消息的存儲,Broker啟動時會向NameServer注冊自己的信息。
  • NameServer:是RocketMQ的注冊中心,用于服務注冊、服務發現和路由管理。多個NameServer之間信息不共享,分别記錄Broker和隊列等信息。
  • Producer:消息的生産者。生産者發送某一主題的消息到Broker,開發者通過引入RocketMQ Client的jar包,使用send方法實現消息的發送功能。
  • Consumer:消息的消費者,連接Broker獲取topic下的消息。開發者通過引入RocketMQ Client的jar包,進行消息的監聽并進行消費。

rocketmq架構分析(跟RocketMQ學習消息系統設計)2

RocketMQ角色

三、RocketMQ功能3.1 路由注冊

路由注冊是指将Broker信息注冊到NameServer中,以便生産者、消費者可以從NameServer中獲取到Broker的信息,進行消息的發送或接收。這樣做的好處是Broker在擴容、縮容時,開發者對Broker的變化無感知。

rocketmq架構分析(跟RocketMQ學習消息系統設計)3

Broker注冊流程

Broker端
  • Broker啟動:從配置文件中讀取NameServer地址,遍曆NameServer,向每個NameServer發送心跳包進行注冊。Broker端使用是Netty框架與NameServer進行網絡通信,上報Broker及集群信息,Topic隊列等信息。
  • 定時任務:Broker在啟動之後,會每隔30s向NameServer發送心跳包,保持長連接。
NameServer端
  • NameServer收到上報信息後,将Broker等信息存儲在路由表中,更新路由表中的上報時間。

特殊說明

  • NameServer可以部署多個,Broker依次向每個NameServer機器上報信息,以保證高可用。
  • NameServer集群之間互不通信,也不進行信息共享,也就是說NameServer服務器之間在某一時刻的數據并不會完全相同,但這對消息生産者來說并不會影響消息發送。這極大的降低了NameServer實現的複雜度,性能也得到了提升,這與使用ZK作為注冊中心是不同的。
  • NameServer是内存式存儲路由信息,本身是無狀态的,也就是說NameServer中的Broker、Topic等狀态信息不會持久存儲(NameServer支持配置參數的持久化,一般用不到)。
3.2 路由删除

如果Broker宕機,NameServer無法收到心跳包,此時NameServer如何來剔除這些失效的Broker呢?

RocktMQ有兩個入口來觸發路由删除。

1)定時任務:NameServer每隔10s掃描路由表,檢測上次心跳包時間戳與當前系統時間的時間差,如果時間差大于120s,會從路由表中移除該Broker相關的信息并關閉Socket連接。

2)Broker在正常被關閉的情況下,會執行unregisterBroker指令,NameServer收到後會移除該Broker相關的信息并關閉Socket連接。

為什麼路由變化不會馬上通知消息生産者,而要等120s呢?這是為了降低NameServer實現的複雜性,路由變化由發送端提供容錯機制來保證消息發送的高可用性。

3.3 路由發現

路由發現是指讓生産者、消費者找到消息服務Broker的過程。RocketMQ路由發現是非實時的,當Topic路由出現變化後,NameServer不主動推送給客戶端,而是由客戶端定時拉取Topic最新的路由。

rocketmq架構分析(跟RocketMQ學習消息系統設計)4

路由發現的流程

1)定時任務

生産者啟動時會啟動定時任務,每30s執行一次路由表的動态更新,流程如下:

  • 首先生産者或消費者根據topic查找路由信息,從Client端内存中查找路由信息,路由信息包括Broker信息和隊列信息。
  • 如果内存中未找到路由信息,則訪問NameServer查詢路由信息,并與本地内存中的路由信息進行對比,如果路由信息發生變化,則更新路由表,返回路由信息。
  • 如果内存結構中包含路由信息,則返回路由信息。

2)消息發送時

  • 在消息發送前,會先從内存中查找路由信息。
  • 如果内存結構中包含路由信息,則返回路由信息(因為定時任務會更新路由表信息)。
  • 如果内存中未找到路由信息,則訪問NameServer查詢路由信息,并與本地内存中的路由信息進行對比,如果路由信息發生變化,則更新路由表,返回路由信息。

生産者和消費者獲取了broker地址和隊列信息後,如何發送消息呢?

3.4 消息發送消息隊列如何進行負載均衡

Producer的代碼在RocketMQ Client jar中,Producer啟動後,會創建MQClientInstance實例,同時啟動生産者和消費者。如果配置的NameServer地址相同,同一個JVM中的不同消費者和不同生産者在啟動時獲取到的MQClientInstane實例都是同一個。

一個Topic下可以配置多個消息隊列,以提高服務吞吐量,生産者拿到路由信息後,需要确定發送的隊列,通過過濾數據得到Master角色且具有寫權限的隊列,使用輪詢的方式,用自增1的值對隊列大小取模,确認要發送的隊列,讓消息平均落在不同的消息隊列上。返回的消息隊列按照broker、序号排序,格式如下:

[{"broker-Name":"broker-a","queueId":0},{"brokerName":"broker-a","queueId":1},{"brokerName":"broker-a","queueId":2},{"brokerName":"broker-a","queueId":3},{"brokerName":"broker-b","queueId":0},{"brokerName":"broker-b","queueId":1},{"brokerName":"broker-b","queueId":2},{"brokerName":"broker-b","queueId":3}]

選擇消息隊列算法:Math.abs(index ) % 消息隊列大小。

消息發送如何實現高可用

消息發送高可用主要通過兩個手段:消息重試與Broker規避。

規避有故障的Broker

如果上一次根據路由算法選擇的是宕機的Broker的第一個隊列,那麼随後的下次選擇的是宕機Broker的第二個隊列,消息發送很有可能會失敗,再次引發重試,帶來不必要的性能損耗。此時可以将該Broker進行規避,不再選擇該Broker,提高發送消息的成功率。

批量消息發送如何實現一緻性

批量消息發送是将同一主題的多條消息封裝成MessageBatch對象,一起打包發送到消息服務端,減少網絡調用次數,提高網絡傳輸效率。服務端按照同樣的結構進行解析即可。

3.5 消息存儲

RocketMQ主要存儲的文件包括CommitLog文件、ConsumeQueue文件、IndexFile文件。

CommitLog文件
  • 消息存儲文件,所有主題的消息都存儲在同一個CommitLog文件中。
  • 存儲到CommitLog文件中是串行的,順序寫文件,采用尾部追加的方式寫入,可以盡最大的能力确保消息發送的高性能與高吞吐量。
  • 單個CommitLog文件的大小為1G,一個文件寫滿後會再創建另外一個。以文件中第一個消息的物理偏移量為文件名,文件名長度20位,小于20位用0補齊。文件名示例如下:

rocketmq架構分析(跟RocketMQ學習消息系統設計)5

CommitLog文件

  • CommitLog文件每條消息中存儲了消體内容、topic名稱内容、queueId、消息大小、消息的存儲時間、消息被某個訂閱組重新消費了多少次、消息産生端的地址、消息的存儲時間等内容
  • 由于同一主題的消息不連續地存儲在CommitLog文件中,如果消費者直接從CommitLog文件中查找某一主題下的消息,效率将極其低下。RocketMQ設計了消息消費隊列文件(Consumequeue)和IndexFile索引文件。當消息到達CommitLog文件後,由專門的線程異步轉發,從而構建ConsumeQueue文件與IndexFile文件
ConsumeQueue文件
  • 消費隊列文件,是CommitLog中每個消息的索引文件,定位了當前這條消息在CommitLog中的位置。該文件由在CommitLog中的起始物理偏移量offset、消息大小size和消息Tag的HashCode值組成,文件采取定長設計,每一個條目共20個字節,格式如下:

rocketmq架構分析(跟RocketMQ學習消息系統設計)6

ConsumeQueue文件

  • 查找消息時,必須先從ConsumeQueue中去獲取消息存儲的物理地址,然後再從CommitLog中将數據取出。
  • 每個Topic包含多個消息隊列,每一個消息隊列有一個ConsumeQueue文件。單個文件由30W個條目組成,目的是為了可以像數組一樣随機訪問每一個條目。
  • 過濾tag也是通過遍曆ConsumeQueue來實現的(先比較hash(tag)符合條件的再到consumer比較tag原文)
IndexFile文件

消息索引文件,主要存儲消息Key與Offset的對應關系。生産者發送的消息包含key值,會用IndexFile存儲消息索引,主要用于使用key或時間戳來查詢消息。

rocketmq架構分析(跟RocketMQ學習消息系統設計)7

存儲文件

3.6 消息消費

消息消費有兩種模式:廣播模式與集群模式。

  • 集群模式:每條消息被消費者消費一次,服務端保存消費進度。
  • 廣播模式:同一消費者組内的每個消費者,都消費到Topic的所有消息。RocketMQ并不會對消費失敗的消息進行失敗重投,由消費者保存消息消費進度。
消息拉取

消費者消費消息,是主動從服務端獲取消息,通過“長輪詢”方式達到Push效果的方法。消費者從Broker查詢消息,當Broker服務端接到請求後,如果隊列裡沒有新消息,并不立刻返回,而是通過循環HOLD住客戶端一小段時間,在這個時間内有新消息到達,就利用現有的連接立刻返回消息給Consumer,如果沒有消息則返回空結果。

消息确認

如果消息監聽器返回的消費結果為RECONSUME_LATER,則需要将這些消息發送給Broker延遲消息。如果發送ACK消息失敗,将延遲5s後提交線程池進行消費。

消息過濾

消費者使用tag對消息進行過濾。如果不需要消費某個Topic下的所有消息,可以通過指定消息的Tag進行消息過濾,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示這個Consumer要消費“TopicTest”下帶有tag1或tag2或tag3的消息(Tag是在發送消息時設置的标簽)。在填寫Tag參數的位置,用null或者“*”表示要消費這個Topic的所有消息。

參考書籍

《RocketMQ技術内幕:RocketMQ架構設計與實現原理》

《RocketMQ實戰與原理解析》

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关科技资讯推荐

热门科技资讯推荐

网友关注

Copyright 2023-2025 - www.tftnews.com All Rights Reserved