Kafka 是一個消息系統,原本開發自 LinkedIn,用作 LinkedIn 的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。
圖片來自 Pexels
現在它已被多家不同類型的公司作為多種類型的數據管道和消息系統使用。活動流數據是幾乎所有站點在對其網站使用情況做報表時都要用到的數據中最常規的部分。
活動數據包括頁面訪問量(Page View)、被查看内容方面的信息以及搜索情況等内容。
這種數據通常的處理方式是先把各種活動以日志的形式寫入某種文件,然後周期性地對這些文件進行統計分析。
運營數據指的是服務器的性能數據(CPU、IO 使用率、請求時間、服務日志等等數據)。運營數據的統計方法種類繁多。
近年來,活動和運營數據處理已經成為了網站軟件産品特性中一個至關重要的組成部分,這就需要一套稍微更加複雜的基礎設施對其提供支持。
Kafka 基礎概念
Kafka 是一種分布式的,基于發布/訂閱的消息系統,主要設計目标如下:
生産者與消費者
對于 Kafka 來說客戶端有兩種基本類型:
除此之外,還有用來做數據集成的 Kafka Connect API 和流式處理的 Kafka Streams 等高階客戶端,但這些高階客戶端底層仍然是生産者和消費者 API,它們隻不過是在上層做了封裝。
這很容易理解,生産者(也稱為發布者)創建消息,而消費者(也稱為訂閱者)負責消費 or 讀取消息。
主題(Topic)與分區(Partition)
在 Kafka 中,消息以主題(Topic)來分類,每一個主題都對應一個「消息隊列」,這有點兒類似于數據庫中的表。
但是如果我們把所有同類的消息都塞入到一個“中心”隊列中,勢必缺少可伸縮性,無論是生産者/消費者數目的增加,還是消息數量的增加,都可能耗盡系統的性能或存儲。
我們使用一個生活中的例子來說明:現在 A 城市生産的某商品需要運輸到 B 城市,走的是公路。
那麼單通道的高速公路不論是在「A 城市商品增多」還是「現在 C 城市也要往 B 城市運輸東西」這樣的情況下都會出現「吞吐量不足」的問題。
所以我們現在引入分區(Partition)的概念,類似“允許多修幾條道”的方式對我們的主題完成了水平擴展。
Broker 和集群(Cluster)
一個 Kafka 服務器也稱為 Broker,它接受生産者發送的消息并存入磁盤;Broker 同時服務消費者拉取分區消息的請求,返回目前已經提交的消息。
使用特定的機器硬件,一個 Broker 每秒可以處理成千上萬的分區和百萬量級的消息。(現在動不動就百萬量級,我特地去查了一把,好像确實集群的情況下吞吐量挺高的。)
若幹個 Broker 組成一個集群(Cluster),其中集群内某個 Broker 會成為集群控制器(Cluster Controller),它負責管理集群,包括分配分區到 Broker、監控 Broker 故障等。
在集群内,一個分區由一個 Broker 負責,這個 Broker 也稱為這個分區的 Leader。
當然一個分區可以被複制到多個 Broker 上來實現冗餘,這樣當存在 Broker 故障時可以将其分區重新分配到其他 Broker 來負責。
下圖是一個樣例:
Kafka 的一個關鍵性質是日志保留(Retention),我們可以配置主題的消息保留策略,譬如隻保留一段時間的日志或者隻保留特定大小的日志。
當超過這些限制時,老的消息會被删除。我們也可以針對某個主題單獨設置消息過期策略,這樣對于不同應用可以實現個性化。
多集群
随着業務發展,我們往往需要多集群,通常處于下面幾個原因:
當構建多個數據中心時,往往需要實現消息互通。舉個例子,假如用戶修改了個人資料,那麼後續的請求無論被哪個數據中心處理,這個更新需要反映出來。又或者,多個數據中心的數據需要彙總到一個總控中心來做數據分析。
上面說的分區複制冗餘機制隻适用于同一個 Kafka 集群内部,對于多個 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。
本質上來說,MirrorMaker 隻是一個 Kafka 消費者和生産者,并使用一個隊列連接起來而已。它從一個集群中消費消息,然後往另一個集群生産消息。
Kafka 的設計與實現
上面我們知道了 Kafka 中的一些基本概念,但作為一個成熟的「消息隊列」中間件,其中有許多有意思的設計值得我們思考,下面我們簡單列舉一些。
Kafka 存儲在文件系統上
是的,您首先應該知道 Kafka 的消息是存在于文件系統之上的。Kafka 高度依賴文件系統來存儲和緩存消息,一般的人認為 “磁盤是緩慢的”,所以對這樣的設計持有懷疑态度。
實際上,磁盤比人們預想的快很多也慢很多,這取決于它們如何被使用;一個好的磁盤結構設計可以使之跟網絡速度一樣快。
現代的操作系統針對磁盤的讀寫已經做了一些優化方案來加快磁盤的訪問速度。
比如,預讀會提前将一個比較大的磁盤快讀入内存。後寫會将很多小的邏輯寫操作合并起來組合成一個大的物理寫操作。
并且,操作系統還會将主内存剩餘的所有空閑内存空間都用作磁盤緩存,所有的磁盤讀寫操作都會經過統一的磁盤緩存(除了直接 I/O 會繞過磁盤緩存)。
綜合這幾點優化特點,如果是針對磁盤的順序訪問,某些情況下它可能比随機的内存訪問都要快,甚至可以和網絡的速度相差無幾。
上述的 Topic 其實是邏輯上的概念,面向消費者和生産者,物理上存儲的其實是 Partition,每一個 Partition 最終對應一個目錄,裡面存儲所有的消息和索引文件。
默認情況下,每一個 Topic 在創建時如果不指定 Partition 數量時隻會創建 1 個 Partition。
比如,我創建了一個 Topic 名字為 test ,沒有指定 Partition 的數量,那麼會默認創建一個 test-0 的文件夾,這裡的命名規則是:- 。
任何發布到 Partition 的消息都會被追加到 Partition 數據文件的尾部,這樣的順序寫磁盤操作讓 Kafka 的效率非常高(經驗證,順序寫磁盤效率比随機寫内存還要高,這是 Kafka 高吞吐率的一個很重要的保證)。
每一條消息被發送到 Broker 中,會根據 Partition 規則選擇被存儲到哪一個 Partition。如果 Partition 規則設置的合理,所有消息可以均勻分布到不同的 Partition中。
Kafka 中的底層存儲設計
假設我們現在 Kafka 集群隻有一個 Broker,我們創建 2 個 Topic 名稱分别為:「Topic1」和「Topic2」,Partition 數量分别為 1、2。
那麼我們的根目錄下就會創建如下三個文件夾:
| --topic1-0 | --topic2-0 | --topic2-1
在 Kafka 的文件存儲中,同一個 Topic 下有多個不同的 Partition,每個 Partition 都為一個目錄。
而每一個目錄又被平均分配成多個大小相等的 Segment File 中,Segment File 又由 index file 和 data file 組成,他們總是成對出現,後綴 ".index" 和 ".log" 分表表示 Segment 索引文件和數據文件。
現在假設我們設置每個 Segment 大小為 500 MB,并啟動生産者向 topic1 中寫入大量數據,topic1-0 文件夾中就會産生類似如下的一些文件:
| --topic1-0 | --00000000000000000000.index | --00000000000000000000.log | --00000000000000368769.index | --00000000000000368769.log | --00000000000000737337.index | --00000000000000737337.log | --00000000000001105814.index | --00000000000001105814.log | --topic2-0 | --topic2-1
Segment 是 Kafka 文件存儲的最小單位。Segment 文件命名規則:Partition 全局的第一個 Segment 從 0 開始,後續每個 Segment 文件名為上一個 Segment 文件最後一條消息的 offset 值。
數值最大為 64 位 long 大小,19 位數字字符長度,沒有數字用 0 填充。如 00000000000000368769.index 和 00000000000000368769.log。
以上面的一對 Segment File 為例,說明一下索引文件和數據文件對應關系:
其中以索引文件中元數據 <3, 497> 為例,依次在數據文件中表示第 3 個 Message(在全局 Partition 表示第 368769 3 = 368772 個 message)以及該消息的物理偏移地址為 497。
注意該 Index 文件并不是從0開始,也不是每次遞增 1 的,這是因為 Kafka 采取稀疏索引存儲的方式,每隔一定字節的數據建立一條索引。
它減少了索引文件大小,使得能夠把 Index 映射到内存,降低了查詢時的磁盤 IO 開銷,同時也并沒有給查詢帶來太多的時間消耗。
因為其文件名為上一個 Segment 最後一條消息的 Offset ,所以當需要查找一個指定 Offset 的 Message 時,通過在所有 Segment 的文件名中進行二分查找就能找到它歸屬的 Segment。
再在其 Index 文件中找到其對應到文件上的物理位置,就能拿出該 Message。
由于消息在 Partition 的 Segment 數據文件中是順序讀寫的,且消息消費後不會删除(删除策略是針對過期的 Segment 文件),這是順序磁盤 IO 存儲設計師 Kafka 高性能很重要的原因。
Kafka 是如何準确的知道 Message 的偏移的呢?這是因為在 Kafka 定義了标準的數據存儲結構,在 Partition 中的每一條 Message 都包含了以下三個屬性:
生産者設計概要
當我們發送消息之前,先問幾個問題:每條消息都是很關鍵且不能容忍丢失麼?偶爾重複消息可以麼?我們關注的是消息延遲還是寫入消息的吞吐量?
舉個例子,有一個信用卡交易處理系統,當交易發生時會發送一條消息到 Kafka,另一個服務來讀取消息并根據規則引擎來檢查交易是否通過,将結果通過 Kafka 返回。
對于這樣的業務,消息既不能丢失也不能重複,由于交易量大因此吞吐量需要盡可能大,延遲可以稍微高一點。
再舉個例子,假如我們需要收集用戶在網頁上的點擊數據,對于這樣的場景,少量消息丢失或者重複是可以容忍的,延遲多大都不重要隻要不影響用戶體驗,吞吐則根據實時用戶數來決定。
不同的業務需要使用不同的寫入方式和配置。具體的方式我們在這裡不做讨論,現在先看下生産者寫消息的基本流程:
流程如下:
消費者設計概要
①消費者與消費組
假設這麼個場景:我們從 Kafka 中讀取消息,并且進行檢查,最後産生結果數據。
我們可以創建一個消費者實例去做這件事情,但如果生産者寫入消息的速度比消費者讀取的速度快怎麼辦呢?
這樣随着時間增長,消息堆積越來越嚴重。對于這種場景,我們需要增加多個消費者來進行水平擴展。
Kafka 消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題時,每個消費者會收到不同分區的消息。
假設有一個 T1 主題,該主題有 4 個分區;同時我們有一個消費組 G1,這個消費組隻有一個消費者 C1。
那麼消費者 C1 将會收到這 4 個分區的消息,如下所示:
如果我們增加新的消費者 C2 到消費組 G1,那麼每個消費者将會分别收到兩個分區的消息,如下所示:
如果增加到 4 個消費者,那麼每個消費者将會分别收到一個分區的消息,如下所示:
但如果我們繼續增加消費者到這個消費組,剩餘的消費者将會空閑,不會收到任何消息:
總而言之,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。
這也是為什麼建議創建主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。
另外,消費者的數量不應該比分區數多,因為多出來的消費者是空閑的,沒有任何幫助。
Kafka 一個很重要的特性就是,隻需寫入一次消息,可以支持任意多的應用讀取這個消息。
換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。
對于上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼會是這樣的:
在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬于不同的應用。
最後,總結起來就是:如果應用需要讀取全量消息,那麼請為該應用設置一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裡增加消費者。
②消費組與分區重平衡
可以看到,當新的消費者加入消費組,它會消費一個或多個分區,而這些分區之前是由其他消費者負責的。
另外,當消費者離開消費組(比如重啟、宕機等)時,它所消費的分區會分配給其他分區。
這種現象稱為重平衡(Rebalance)。重平衡是 Kafka 一個很重要的性質,這個性質保證了高可用和水平擴展。
不過也需要注意到,在重平衡期間,所有消費者都不能消費消息,因此會造成整個消費組短暫的不可用。
而且,将分區進行重平衡也會導緻原來的消費者狀态過期,從而導緻消費者需要重新更新狀态,這段期間也會降低消費性能。後面我們會讨論如何安全的進行重平衡以及如何盡可能避免。
消費者通過定期發送心跳(Hearbeat)到一個作為組協調者(Group Coordinator)的 Broker 來保持在消費組内存活。
這個 Broker 不是固定的,每個消費組都可能不同。當消費者拉取消息或者提交時,便會發送心跳。
如果消費者超過一定時間沒有發送心跳,那麼它的會話(Session)就會過期,組協調者會認為該消費者已經宕機,然後觸發重平衡。
可以看到,從消費者宕機到會話過期是有一定時間的,這段時間内該消費者的分區都不能進行消息消費。
通常情況下,我們可以進行優雅關閉,這樣消費者會發送離開的消息到組協調者,這樣組協調者可以立即進行重平衡而不需要等待會話過期。
在 0.10.1 版本,Kafka 對心跳機制進行了修改,将發送心跳與拉取消息進行分離,這樣使得發送心跳的頻率不受拉取的頻率影響。
另外更高版本的 Kafka 支持配置一個消費者多長時間不拉取消息但仍然保持存活,這個配置可以避免活鎖(livelock)。活鎖,是指應用沒有故障但是由于某些原因不能進一步消費。
③Partition 與消費模型
上面提到,Kafka 中一個 Topic 中的消息是被打散分配在多個 Partition(分區)中存儲的, Consumer Group 在消費時需要從不同的 Partition 獲取消息,那最終如何重建出 Topic 中消息的順序呢?
答案是:沒有辦法。Kafka 隻會保證在 Partition 内消息是有序的,而不管全局的情況。
下一個問題是:Partition 中的消息可以被(不同的 Consumer Group)多次消費,那 Partition中被消費的消息是何時删除的?Partition 又是如何知道一個 Consumer Group 當前消費的位置呢?
無論消息是否被消費,除非消息到期 Partition 從不删除消息。例如設置保留時間為 2 天,則消息發布 2 天内任何 Group 都可以消費,2 天後,消息自動被删除。
Partition 會為每個 Consumer Group 保存一個偏移量,記錄 Group 消費到的位置。如下圖:
④為什麼 Kafka 是 Pull 模型
消費者應該向 Broker 要數據(Pull)還是 Broker 向消費者推送數據(Push)?
作為一個消息系統,Kafka 遵循了傳統的方式,選擇由 Producer 向 Broker Push 消息并由 Consumer 從 Broker Pull 消息。
一些 logging-centric system,比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 Push 模式。事實上,Push 模式和 Pull 模式各有優劣。
Push 模式很難适應消費速率不同的消費者,因為消息發送速率是由 Broker 決定的。
Push 模式的目标是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。
而 Pull 模式則可以根據 Consumer 的消費能力以适當的速率消費消息。
對于 Kafka 而言,Pull 模式更合适。Pull 模式可簡化 Broker 的設計,Consumer 可自主控制消費消息的速率。
同時 Consumer 可以自己控制消費方式,即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
Kafka 如何保證可靠性
當我們讨論可靠性的時候,我們總會提到保證*這個詞語。可靠性保證是基礎,我們基于這些基礎之上構建我們的應用。
比如關系型數據庫的可靠性保證是 ACID,也就是原子性(Atomicity)、一緻性(Consistency)、隔離性(Isolation)和持久性(Durability)。
Kafka 中的可靠性保證有如下 4 點:
這裡的寫入有可能隻是寫入到文件系統的緩存,不一定刷新到磁盤。生産者可以等待不同時機的确認,比如等待分區主副本寫入即返回,後者等待所有 in-sync 狀态副本寫入才返回。
使用這些基礎保證,我們構建一個可靠的系統,這時候需要考慮一個問題:究竟我們的應用需要多大程度的可靠性?
可靠性不是無償的,它與系統可用性、吞吐量、延遲和硬件價格息息相關,得此失彼。因此,我們往往需要做權衡,一味的追求可靠性并不實際。
動手搭一個 Kafka
通過上面的描述,我們已經大緻了解到了 Kafka 是何方神聖了,現在我們開始嘗試自己動手本地搭一個來實際體驗一把。
第一步:下載 Kafka
這裡以 Mac OS 為例,在安裝了 Homebrew 的情況下執行下列代碼:
brew install kafka
由于 Kafka 依賴了 Zookeeper,所以在下載的時候會自動下載。
第二步:啟動服務
我們在啟動之前首先需要修改 Kafka 的監聽地址和端口為 localhost:9092:
vi /usr/local/etc/kafka/server.properties
然後修改成下圖的樣子:
依次啟動 Zookeeper 和 Kafka:
brew services start zookeeper brew services start kafka
然後執行下列語句來創建一個名字為 "test" 的 Topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
我們可以通過下列的命令查看我們的 Topic 列表:
kafka-topics --list --zookeeper localhost:2181
第三步:發送消息
然後我們新建一個控制台,運行下列命令創建一個消費者關注剛才創建的 Topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
用控制台往剛才創建的 Topic 中添加消息,并觀察剛才創建的消費者窗口:
kafka-console-producer --broker-list localhost:9092 --topic test
能通過消費者窗口觀察到正确的消息:
參考資料:
更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!