工作中經常用到消息中間件來解決系統間的解耦問題或者高并發消峰問題,但是消息的可靠性如何保證一直是個很大的問題,萬一消息丢了怎麼辦?什麼情況下消息就不見了呢?下面通過這篇文章,我們就聊聊RabbitMQ 中消息的可靠性如何解決的?
本文分三部分說明
- mq消息丢失場景有哪些?
- 如何避免消息丢失?
- 大廠如何解決這些問題的?
mq消息丢失場景有哪些?
首先我們看下消息周期投遞過程:
解決RabbitMQ消息丢失問題和保證消息可靠性
我們把該圖分三部分,左中右三部分,每部分都會導緻消息丢失情況:
1.生産者生産消息到RabbitMQ-server 消息丢失場景
- 外界環境問題導緻:發生網絡丢包、網絡故障等造成消息丢失
- 代碼層面,配置層面,考慮不全導緻消息丢失
發送端使用confirm模式,方案不夠嚴謹,比如MQ Server接收消息失敗發送 nack給發送端後,發送端監聽失敗或者沒做任何事情,消息丢失的情況;
再比如發送消息到exchange後,發下路由和queue沒有綁定,消息會存在丢失情況,下面會講到具體的例子。
2.RabbitMQ-Server中存儲的消息丢失
- 消息沒有持久化導緻丢失
- 單節點或者集群模式沒有鏡像模式消息丢失
- 個别磁盤意外損害導緻消息同步失敗
- 機房被炸
3.RabbitMQ-Server到消費者消息丢失
- 消費者接收到相關消息之後,還沒來得及處理就宕機了,消息丢失
如何避免消息丢失?
下面也是從三個方面介紹:
- 生産者生産消息到RabbitMQ-Server 可靠性保證
- RabbitMQ-Server中存儲的消息如何保證
- RabbitMQ-Server到消費者消息如何不丢
1. 生産者生産消息到RabbitMQ-Server可靠性保證
這個過程,消息可能會丢,比如發生網絡丢包、網絡故障等造成消息丢失,一般情況下如果不采取措施,生産者無法感知消息是否已經正确無誤的發送到exchange中,如果生産者能感知到的話,它可以進行進一步的處理動作,比如重新投遞相關消息以确保消息的可靠性。
1.1 别擔心,有一種方案可以解決:就是 AMQP協議提供的一個事務機制
RabbitMQ客戶端中channel 接口提供了幾個事務機制相關的方法: channel.txSelect channel.txCommit channel.txRollback 源碼截圖如下:com.rabbitmq.client 包中public interface Channel extendsShutdownNotifier {}接口
在生産者發送消息之前,通過channel.txSelect開啟一個事務,接着發送消息, 如果消息投遞server失敗,進行事務回滾channel.txRollback,然後重新發送, 如果server收到消息,就提交事務channel.txCommit但是,很少有人這麼幹,因為這是同步操作,一條消息發送之後會使發送端阻塞,以等待RabbitMQ-Server的回應,之後才能繼續發送下一條消息,生産者生産消息的吞吐量和性能都會大大降低。
1.2 不過幸運的是RabbitMQ提供了一個改進方案,即發送方确認機制(publisher confirm)
首先生産者通過調用channel.confirmSelect方法将信道設置為confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之後,RabbitMQ就會發送一個确認(Basic.Ack)給生産者(包含消息的唯一deliveryTag和multiple參數),這就使得生産者知曉消息已經正确到達了目的地了。
其實Confirm模式有三種方式實現:
- 串行confirm模式:producer每發送一條消息後,調用waitForConfirms()方法,等待broker端confirm,如果服務器端返回false或者超時時間内未返回,客戶端進行消息重傳。
- 批量confirm模式:producer每發送一批消息後,調用waitForConfirms()方法,等待broker端confirm。
- 異步confirm模式:提供一個回調方法,broker confirm了一條或者多條消息後producer端會回調這個方法。 我們分别來看看這三種confirm模式
串行confirm
批量confirm模式
上面代碼是簡單版本的,生産環境絕對不是循環發送的,而是根據業務情況, 各個客戶端程序需要定期(每x秒)或定量(每x條)或者兩者結合來pubish消息,然後等待服務器端confirm。相比普通confirm模式,批量可以極大提升confirm效率。
但是有沒有發現什麼問題?
問題1: 批量發送的邏輯複雜話了。
問題2: 一旦出現confirm返回false或者超時的情況時,客戶端需要将這一批次的消息全部重發,這會帶來明顯的重複消息數量,并且,當消息經常丢失時,批量confirm性能應該是不升反降的。
異步confirm模式
異步模式需要自己多寫一部分複雜的代碼實現,異步監聽類,監聽server端的通知消息,異步的好處性能會大幅度提升,發送完畢之後,可以繼續發送其他消息。 MQServer通知生産端ConfirmListener監聽類:用戶可以繼承接口實現自己的實現類,處理消息确認機制,此處繼承類代碼省略,就是上面 ProxiedConfirmListener 類: 下面貼下要實現的接口:
上面的接口很有意思,如果是你的話,怎麼實現? 消息投遞前如何存儲消息,ack 和 nack 如何處理消息?
下面看下異步confirm的消息投遞流程:
解決RabbitMQ消息丢失問題和保證消息可靠性
解釋下這張圖片:
channerl1 連續發類1,2,3條消息到RabbitMQ-Server,RabbitMQ-Server通知返回一條通知,裡面包含回傳給生産者的确認消息中的deliveryTag包含了确認消息的序号,此外還有一個參數multiple=true,表示到這個序号之前的所有消息都已經得到了處理。這樣客戶端和服務端通知的次數就減少類,提升類性能。
加點消息存儲和删除邏輯
事務機制和publisher confirm機制确保的是消息能夠正确的發送至RabbitMQ,這裡的“發送至RabbitMQ”的含義是指消息被正确的發往至RabbitMQ的交換器,如果此交換器沒有匹配的隊列的話,那麼消息也将會丢失,怎麼辦?
這裡有兩個解決方案,
1. 使用mandatory 設置true
2. 利用備份交換機(alternate-exchange):實現沒有路由到隊列的消息
我們看下RabbitMQ客戶端代碼方法
Channel 類中 發布消息方法
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
解釋下:basicPublish 方法中的,mandatory和immediate
/** * 當mandatory标志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue, 那麼會調用basic.return方法将消息返回給生産者<br> * 當mandatory設置為false時,出現上述情形broker會直接将消息扔掉。 */ @Setter(AccessLevel.PACKAGE) private boolean mandatory = false; /** * 當immediate标志位設置為true時,如果exchange在将消息路由到queue(s)時發現對于的queue上沒有消費者, 那麼這條消息不會放入隊列中。 當immediate标志位設置為false時,exchange路由的隊列沒有消費者時,該消息會通過basic.return方法返還給生産者。 * RabbitMQ 3.0版本開始去掉了對于immediate參數的支持,對此RabbitMQ官方解釋是:這個關鍵字違背了生産者和消費者之間解耦的特性,因為生産者不關心消息是否被消費者消費掉 */ @Setter(AccessLevel.PACKAGE) private boolean immediate;
所以為了保證消息的可靠性,需要設置發送消息代碼邏輯。如果不單獨形式設置mandatory=false
使用mandatory 設置true的時候有個關鍵點要調整,生産者如何獲取到沒有被正确路由到合适隊列的消息呢?通過調用channel.addReturnListener來添加ReturnListener監聽器實現,隻要發送的消息,沒有路由到具體的隊列,ReturnListener就會收到監聽消息。
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP .BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); //進入該方法表示,沒路由到具體的隊列 //監聽到消息,可以重新投遞或者其它方案來提高消息的可靠性。 System.out.println("Basic.Return返回的結果是:" message); } });
此時有人問了,不想複雜化生産者的編程邏輯,又不想消息丢失,那麼怎麼辦? 還好RabbitMQ提供了一個叫做alternate-exchange東西,翻譯下就是備份交換器,這個幹什麼用呢?很簡單,它可以将未被路由的消息存儲在另一個exchange隊列中,再在需要的時候去處理這些消息。
那如何實現呢?
簡單一點可以通過webui管理後台設置,當你新建一個exchange業務的時候,可以給它設置Arguments,這個參數就是 alternate-exchange,其實alternate-exchange就是一個普通的exchange,類型最好是fanout 方便管理
解決RabbitMQ消息丢失問題和保證消息可靠性
當你發送消息到你自己的exchange時候,對應key沒有路由到queue,就會自動轉移到alternate-exchange對應的queue,起碼消息不會丢失。
下面一張圖看下投遞過程:
解決RabbitMQ消息丢失問題和保證消息可靠性
那麼有人有個疑問,上面介紹了,兩種方式處理,發送的消息無法路由到隊列的方案, 如果備份交換器和mandatory參數一起使用,會有什麼效果?
答案是:mandatory參數無效
總結下上面内容,主要如何保證消息從生産者到RabbitMQ Server 端可靠性
1. Transaction: 消息落盤,隻能同步開啟、提交及回滾。
2. Confirm:消息進入緩沖區,支持同步、異步、批量确認。
3. Transaction和publisher confirm機制兩者是互斥的
4. 一般在生産者這塊避免數據丢失,都是用 Confirm 機制的。
2.RabbitMQ-Server中存儲的消息如何保證
一般消息都是存内存中的,如果消息沒有持久化硬盤,一天機器需要重啟,獲取意外停電,重啟機器後,消息全丢了,所以消息持久化是必備。
,更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!