發布訂閱系統在我們日常的工作中經常會使用到,這種場景大部分情況我們都是使用消息隊列的,常用的消息隊列有 Kafka,RocketMQ,RabbitMQ,每一種消息隊列都有其特性,關于 Kafka 的使用和源碼分析,公号前面有相關的文章,大家可以前往回顧一下,另外兩款消息隊列大家有需要可以自行研究,後續我們會出相應的介紹文章。這篇文章主要是給大家介紹 Redis 的發布訂閱系統,很多時候我們可能不需要獨立部署相應的消息隊列,隻是簡單的使用,而且數據量也不會太大,這種情況下,我們就可以使用 Redis 的 Pub/Sub 模型。
Redis 的發布訂閱功能主要由 PUBLISH,SUBSCRIBE,PSUBSCRIBE 命令組成,一個或者多個客戶端訂閱某個或者多個頻道,當其他客戶端向該頻道發送消息的時候,訂閱了該頻道的客戶端都會收到對應的消息。
上圖中有四個客戶端,Client 02,Client 03,Client 04 訂閱了同一個 sport 頻道(Channel),這時當 Client 01 向 Sport Channel 發送消息 “basketball” 的時候,02-04 這三個客戶端都同時收到了這條消息。
整個過程的執行命令如下:
首先開四個 Redis 的客戶端,然後在 Client 02,Client 03,Client 04 中輸入subscribe sport 命令,表示訂閱 sport 這個頻道
然後在 Client 01 的客戶端中輸入publish sport basketball 表示向 sport 頻道發送消息 "basketball"
這個時候我們在去看下 Client 02-04 的客戶端,可以看到已經收到了消息了,每個訂閱了這個頻道的客戶端都是一樣的。
這裡 Client 02-Client 04 三個客戶端訂閱了 Sport 頻道,我們叫做訂閱者(subscriber),Client 01 發布消息,我們叫做發布者(publisher),發送的消息就是 message。
前面我們看到的是一個客戶端訂閱了一個 Channel,事實上單個客戶端也可以同時訂閱多個 Channel,采用模式匹配的方式,一個客戶端可以同時訂閱多個 Channel。
如上圖 Client 05 通過命令subscribe run 訂閱了 run 頻道,Client 06 通過命令psubscribe run* 訂閱了 run* 匹配的頻道。當 Client 07 向 run頻道發送消息 666 的時候,05 和 06 兩個客戶端都收到消息了;接下來 Client 07 向 run1 和 run_sport 兩個頻道發送消息的時候,Client 06 依舊可以收到消息,而 Client 05 就收不到了消息了。
Client 05 訂閱run 頻道和接收到消息:
Client 06 訂閱run* 頻道和接收到消息:
Client 07 向多個頻道發送消息:
通過上面的案例,我們學會了一個客戶端可以訂閱單個或者多個頻道,分别通過subscribe,psubscribe 命令,客戶端可以通過 publish 發送相應的消息。
在命令行中我們可以用 Ctrl C 來取消相關訂閱,對應的命令時 unsubscribe channelName。
在 Redis 的底層結構中,客戶端和頻道的訂閱關系是通過一個字典加鍊表的結構保存的,形式如下:
在 Redis 的底層結構中,Redis 服務器結構體中定義了一個pubsub_channels 字典
structredisServer{
//用于保存所有頻道的訂閱關系
dict *pubsub_channels;
}
在這個字典中,key 代表的是頻道名稱,value 是一個鍊表,這個鍊表裡面存放的是所有訂閱這個頻道的客戶端。
所以當有客戶端執行訂閱頻道的動作的時候,服務器就會将客戶端與被訂閱的頻道在 pubsub_channels 字典中進行關聯。
這個時候有兩種情況:
比如,如果有一個新的客戶端 Client 08 要訂閱 run 渠道,那麼上圖就會變成
如果 Client 08 要訂閱一個新的渠道 new_sport ,那麼就會變成
整個訂閱的過程可以采用下面僞代碼來實現
Map<String, List<Object>> pubsub_channels = new HashMap<>();
publicvoidsubscribe(String[] subscribeList, Object client){
//遍曆所有訂閱的 channel,檢查是否在 pubsub_channels 中,不在則創建新的 key 和空鍊表
for (int i = 0; i < subscribeList.length; i ) {
if (!pubsub_channels.containsKey(subscribeList[i])) {
pubsub_channels.put(subscribeList[i], new ArrayList<>());
}
pubsub_channels.get(subscribeList[i]).add(client);
}
}
3.2 取消訂閱
上面介紹的是單個 Channel 的訂閱,相反的如果一個客戶端要取消訂閱相關 Channel,則無非是找到對應的 Channel 的鍊表,從中删除對應的客戶端,如果該客戶端已經是最後一個了,則将對應 Channel 也删除。
模式渠道的訂閱與單個渠道的訂閱類似,不過服務器是将所有模式的訂閱關系都保存在服務器狀态的pubsub_patterns 屬性裡面。
與訂閱單個 Channel 不同的是,pubsub_patterns 屬性是一個鍊表,不是字典。節點的結構如下:
其實 client 屬性是用來存放對應客戶端信息,pattern 是用來存放客戶端對應的匹配模式。
所以對應上面的 Client-06 模式匹配的結構存儲如下
在pubsub_patterns鍊表中有一個節點,對應的客戶端是 Client-06,對應的匹配模式是run*。
當某個客戶端通過命令psubscribe 訂閱對應模式的 Channel 時候,服務器會創建一個節點,并将 Client 屬性設置為對應的客戶端,pattern 屬性設置成對應的模式規則,然後添加到鍊表尾部。
對應的僞代碼如下:
退訂模式的命令是punsubscribe,客戶端使用這個命令來退訂一個或者多個模式 Channel。服務器接收到該命令後,會遍曆pubsub_patterns鍊表,将匹配到的 client 和 pattern 屬性的節點給删掉。這裡需要判斷 client 屬性和 pattern 屬性都合法的時候再進行删除。
僞代碼如下:
遍曆所有的節點,當匹配到相同 client 屬性和 pattern 屬性的時候就進行節點删除。
發布消息比較好容易理解,當一個客戶端執行了publish channelName message 命令的時候,服務器會從pubsub_channels和pubsub_patterns 兩個結構中找到符合channelName 的所有 Channel,進行消息的發送。在 pubsub_channels 中隻要找到對應的 Channel 的 key 然後向對應的 value 鍊表中的客戶端發送消息就好。
這篇文章主要給大家介紹了一下 Redis 的發布/訂閱的使用方式和底層的存儲結構以及部分僞代碼的實現,希望對大家有幫助。
,更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!