消息系統我們在項目中經常使用,但是如何自己設計一個消息系統呢?一個消息系統需要有消息的生産者、消費者,還需要具備消息的存儲功能,設計時要考慮如下問題:
我們從學習RocketMQ入手,來學習消息系統的設計。
二、RocketMQ架構RocketMQ的邏輯部署圖:
RocketMQ邏輯部署圖
RocketMQ的角色包含Broker、NameServer、Producer和Consumer。
RocketMQ角色
三、RocketMQ功能3.1 路由注冊路由注冊是指将Broker信息注冊到NameServer中,以便生産者、消費者可以從NameServer中獲取到Broker的信息,進行消息的發送或接收。這樣做的好處是Broker在擴容、縮容時,開發者對Broker的變化無感知。
Broker注冊流程
Broker端特殊說明
如果Broker宕機,NameServer無法收到心跳包,此時NameServer如何來剔除這些失效的Broker呢?
RocktMQ有兩個入口來觸發路由删除。
1)定時任務:NameServer每隔10s掃描路由表,檢測上次心跳包時間戳與當前系統時間的時間差,如果時間差大于120s,會從路由表中移除該Broker相關的信息并關閉Socket連接。
2)Broker在正常被關閉的情況下,會執行unregisterBroker指令,NameServer收到後會移除該Broker相關的信息并關閉Socket連接。
為什麼路由變化不會馬上通知消息生産者,而要等120s呢?這是為了降低NameServer實現的複雜性,路由變化由發送端提供容錯機制來保證消息發送的高可用性。
3.3 路由發現路由發現是指讓生産者、消費者找到消息服務Broker的過程。RocketMQ路由發現是非實時的,當Topic路由出現變化後,NameServer不主動推送給客戶端,而是由客戶端定時拉取Topic最新的路由。
路由發現的流程
1)定時任務
生産者啟動時會啟動定時任務,每30s執行一次路由表的動态更新,流程如下:
2)消息發送時
生産者和消費者獲取了broker地址和隊列信息後,如何發送消息呢?
3.4 消息發送消息隊列如何進行負載均衡Producer的代碼在RocketMQ Client jar中,Producer啟動後,會創建MQClientInstance實例,同時啟動生産者和消費者。如果配置的NameServer地址相同,同一個JVM中的不同消費者和不同生産者在啟動時獲取到的MQClientInstane實例都是同一個。
一個Topic下可以配置多個消息隊列,以提高服務吞吐量,生産者拿到路由信息後,需要确定發送的隊列,通過過濾數據得到Master角色且具有寫權限的隊列,使用輪詢的方式,用自增1的值對隊列大小取模,确認要發送的隊列,讓消息平均落在不同的消息隊列上。返回的消息隊列按照broker、序号排序,格式如下:
[{"broker-Name":"broker-a","queueId":0},{"brokerName":"broker-a","queueId":1},{"brokerName":"broker-a","queueId":2},{"brokerName":"broker-a","queueId":3},{"brokerName":"broker-b","queueId":0},{"brokerName":"broker-b","queueId":1},{"brokerName":"broker-b","queueId":2},{"brokerName":"broker-b","queueId":3}]
選擇消息隊列算法:Math.abs(index ) % 消息隊列大小。
消息發送如何實現高可用消息發送高可用主要通過兩個手段:消息重試與Broker規避。
規避有故障的Broker
如果上一次根據路由算法選擇的是宕機的Broker的第一個隊列,那麼随後的下次選擇的是宕機Broker的第二個隊列,消息發送很有可能會失敗,再次引發重試,帶來不必要的性能損耗。此時可以将該Broker進行規避,不再選擇該Broker,提高發送消息的成功率。
批量消息發送如何實現一緻性批量消息發送是将同一主題的多條消息封裝成MessageBatch對象,一起打包發送到消息服務端,減少網絡調用次數,提高網絡傳輸效率。服務端按照同樣的結構進行解析即可。
3.5 消息存儲RocketMQ主要存儲的文件包括CommitLog文件、ConsumeQueue文件、IndexFile文件。
CommitLog文件
CommitLog文件
ConsumeQueue文件
消息索引文件,主要存儲消息Key與Offset的對應關系。生産者發送的消息包含key值,會用IndexFile存儲消息索引,主要用于使用key或時間戳來查詢消息。
存儲文件
3.6 消息消費消息消費有兩種模式:廣播模式與集群模式。
消費者消費消息,是主動從服務端獲取消息,通過“長輪詢”方式達到Push效果的方法。消費者從Broker查詢消息,當Broker服務端接到請求後,如果隊列裡沒有新消息,并不立刻返回,而是通過循環HOLD住客戶端一小段時間,在這個時間内有新消息到達,就利用現有的連接立刻返回消息給Consumer,如果沒有消息則返回空結果。
消息确認如果消息監聽器返回的消費結果為RECONSUME_LATER,則需要将這些消息發送給Broker延遲消息。如果發送ACK消息失敗,将延遲5s後提交線程池進行消費。
消息過濾消費者使用tag對消息進行過濾。如果不需要消費某個Topic下的所有消息,可以通過指定消息的Tag進行消息過濾,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示這個Consumer要消費“TopicTest”下帶有tag1或tag2或tag3的消息(Tag是在發送消息時設置的标簽)。在填寫Tag參數的位置,用null或者“*”表示要消費這個Topic的所有消息。
參考書籍《RocketMQ技術内幕:RocketMQ架構設計與實現原理》
《RocketMQ實戰與原理解析》
,更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!