tft每日頭條

 > 圖文

 > rabbitmq如何保證消息的可靠傳輸

rabbitmq如何保證消息的可靠傳輸

圖文 更新时间:2025-01-23 11:15:20

最近業務中有有這樣一個場景,就是用戶在商城下單之後,如果30分鐘沒有付款,那麼就需要将這個訂單處理掉,要麼直接删除,要麼直接标識為失效狀态,為什麼要這麼做?

  • 1、庫存,用戶在下單之後,會鎖定一個庫存,如果用戶一直不支付,那麼就會占用庫存,影響别的用戶購買,
  • 2、随着業務的發展,用戶量的增加,我們的訂單數據會越來越多,那麼我們要及時的清理無效的訂單,提升系統的性能;
曾經的純潔無瑕

首先說下,我曾經那些純潔無瑕的想法,第一次看到這種需求的時候,如果要清理失效的訂單,那我直接寫一個定時任務,5分鐘或者10分鐘跑一次,删除過期的訂單,增加庫存。

定時任務确實可以解決上面的問題,但是存在一個現實的問題,那就是數據庫壓力,甚至影響整個系統的性能,如果你是幾百、幾千個訂單還好,要是十幾萬、甚至上百萬,那麼此方法肯定是行不通的。

進階版本

既然發現了上面的方法不行,那麼就重新想辦法,有什麼辦法可以不用查詢數據庫,就可以知道哪些訂單快要過期,再這樣的思考下,我用Redis做了一個消息隊列,當生成訂單的時候,生成一個延遲10分鐘消息,10分鐘結束的時候,就會從Redis的隊列中将這個訂單取出來,然後這樣我就可以将這個訂單删除,增加庫存。

Redis做消息隊列存在的問題

之前用一個全新的Redis做消息隊列,存儲的數據也不多,感覺還好,當随着業務增加,Redis存儲超過90%的時候,大量的消息沒有被消費,就是消息丢失很嚴重。那麼這樣肯定是不行的。

删除訂單,增加庫存這是不能有太多誤差的事情,所以Redis消息隊列已經不能滿足我的需求,那麼就需要可靠性高的消息隊列,也就是我們這次要介紹的RabbitMQ。

RabbitMQ安裝與面闆介紹

這裡我就不跟大家介紹如何安裝RabbitMQ了,網上其實有很多這種教程,所以大家自行搜索吧。重點要跟大家說下,RabbitMQ的面闆,我們的消息隊列,以及消息都是可以在面闆上看到的。我是用的MQ的版本是3.8,各個版本之間的面闆多多少少可能有點不太一樣。

rabbitmq如何保證消息的可靠傳輸(詳解如何利用RabbitMQ生産一個簡單的消息)1

第一次接觸的話,我們不要想着全部我一次性都看懂,都知道是幹嘛的,我覺得沒必要,先熟悉最基礎的,我在上面圈了兩個地方Queue、Admin和Add a new queue 這個三個是最基本的,我們學習必須要用的東西。

  • Queue:這個就是我們聲明的消息隊列;
  • Admin:用戶管理,RabbitMQ默認有一個用戶是guest,但是RabbitMQ神奇的就是每個庫都必須創建一個用戶角色;
  • Add a new queue:這個就是創建一個新的隊列,但是我們一般不這麼直接創建,而是在代碼中創建;

rabbitmq如何保證消息的可靠傳輸(詳解如何利用RabbitMQ生産一個簡單的消息)2

再來補充下Admin吧,首先要告訴大家一個基本的東西,就是如果想要聲明一個隊列,那麼你必須要有一個庫(比喻手法),隊列存在于庫中,可以想象下mysql,是不是得先有庫,再有表。MQ也是這樣的。

右邊的Virtual Hosts就是創建庫,Name就是庫名,寫的時候前面必須加/

如果細心的朋友可以看到第一張圖中兩個隊列前面就是庫名,标識隊列存在與xiaoshuo庫中。

一個簡單的消息隊列

rabbitmq如何保證消息的可靠傳輸(詳解如何利用RabbitMQ生産一個簡單的消息)3

當生産者生産出消息之後,發送到隊列中,消費者監聽到隊列中有消息進行消費,那麼我們本篇就先實現一個簡單的消息隊列。

代碼

我們不使用SpringBoot框架,我們就從基本的寫,人生原生的API,這樣以後才懂得什麼是怎麼回事。

1、引入需要的pom文件

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version></dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version></dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version></dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version></dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version></dependency>

2、連接RabbitMQ

/** * @description: TODO MQ連接工廠 * @author: bingfeng * @create: 2020-05-07 08:55 */public class MQConnectUtil { public static Connection getConnection() throws IOException, TimeoutException { // 定義連接工廠 Connectionfactory factory = new ConnectionFactory(); // 設置連接地址 factory.setHost("127.0.0.1"); // 設置端口 factory.setPort(5672); // 選擇 vhost factory.setVirtualHost("/xiaoshuo"); // 設置用戶名 factory.setUsername("bingfeng"); // 密碼 factory.setPassword("123"); return factory.newConnection(); }}

3、發送消息

/** * @description: TODO 模拟發送消息 * @author: bingfeng * @create: 2020-05-07 09:01 */public class Producer { /** * 隊列名 */ public static final String QUEUE_NAME = "simple_queue_test"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = MQConnectUtil.getConnection(); // 從連接中獲取一個通道 channel channel = connection.createChannel(); // 創建隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String msg = "你好,冰峰!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("發送消息:" msg); channel.close(); connection.close(); }}

4、消費消息

/** * @description: TODO 消費MQ * @author: bingfeng * @create: 2020-05-07 09:11 */public class Consumer { public static final String QUEUE_NAME = "simple_queue_test"; public static void main(String[] args) throws Exception { // 獲取連接 Connection connection = MQConnectUtil.getConnection(); // 創建頻道 Channel channel = connection.createChannel(); // 隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("消費消息:" msg); } }; // 監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); }}

當我們發送一個消息之後,我們可以直接在面闆看到隊列中的消息數

rabbitmq如何保證消息的可靠傳輸(詳解如何利用RabbitMQ生産一個簡單的消息)4

點進來之後,拉到下面,Messages就是我們想要查看的消息數量,就是你想看幾條消息填幾就行了,填完之後點擊下面的Get Messages,我們最近的消息就會顯示在下面。

rabbitmq如何保證消息的可靠傳輸(詳解如何利用RabbitMQ生産一個簡單的消息)5

當我們消費了這個消息之後,隊列中就沒有這個消息了。

最後的話

今天就寫到這,我打算把RabbitMQ從入門到項目中的實戰用法全部一級一級分享給大家,一是防止自己以後忘記可以回來翻翻看,二是分享給大家有興趣的朋友可以一起學習,後面我也會把我之前說的,訂單過期删除的業務場景也會寫一遍。

熟悉RabbitMQ的朋友,可能看下來解決寫的東西很簡單,但是畢竟也有很多人實際工作中并沒有用過MQ,自己可能也沒有了解過,對于沒有了解過的朋友來說,我覺得入個門還是挺不錯的。

有什麼問題,歡迎大家下方solo。

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关圖文资讯推荐

热门圖文资讯推荐

网友关注

Copyright 2023-2025 - www.tftnews.com All Rights Reserved