淺談Kafka特性與架構

NO IMAGE

Kafka特性

kafka作為一種消息中間件,有以下特性:

  • 高吞吐量:吞吐量高達數十萬
  • 高併發:支持數千個客戶端同時讀寫
  • 低延遲:延遲最低只有幾毫秒
  • 消息持久性和可靠性:消息被持久化到本地磁盤,同時支持數據備份
  • 集群容錯性:允許n-1個節點失敗(n為副本個數)
  • 可擴展性:支持集群動態擴展

應用場景

根據Kafka的特性,有以下應用場景:

  • 消息中間件:kafka本身作為標準的消息中間件,可以用於producer和consumer之間的異步消息通信
  • 日誌:出於kafka的高吞吐量特性,可以進行高效地日誌收集
  • 數據收集:出於高吞吐量和高併發特性,可以使用kafka記錄用戶/系統的一些實時數據

主要名詞

  • Broker:每一臺Kafka服務器就叫做一個Broker,支持水平擴展,一個集群中通常有多臺Broker,各個Broker地位一致,不存在主從關係
  • Coordinator:集群的協調者,kafka會將負載最小的broker指定為Coordinator
  • Topic:所有消息都有自己的所屬分類,這個分類就叫做Topic。一個Topic下的消息可以保存在多個Broker上(對於Producer和Consumer是無感知的)
  • Producer:產生消息的主體叫做Producer,負責發佈消息到指定Topic中
  • Consumer:消費對象的主體叫做Consumer,負責消費指定Topic中的消息
  • ConsumerGroup(CG):每一個Consumer均屬於一個特定CG,一個Topic可以對應多個CG,Topic的消息會發送到所有CG,但是CG可以選擇發送給所有Consumer還是指定的Consumer,通過這種方式可以方便的實現單播和廣播。同時,同一個CG下的Consumer可以實現負載均衡
  • Partition:存放數據的具體物理實體,每一個Topic會分為多個Partition。每一個Partition對應一個文件夾,在文件夾下存放數據和索引文件。每一個Partition中的消息是有序的,但是不同Partition的數據不能確定順序
  • Replication:Partition的備份,一個Partition會有多個Replication,存放在不同的Broker上
  • Segment:指每一個數據文件,一個Partition對應多個Segment,每一個Segment會有一個索引文件與之對應
  • Offset:指消息的序列號,是連續遞增的,Partition中的每一個消息都會有自己的Offset,用於唯一標識一條消息。因為是有序的,所以可以根據Offset快速定位一個數據文件

基本架構

kafka是一個天然支持分佈式架構的發佈訂閱模式的rpc通信框架,kafka集群為典型的去中心化的設計,主體設計如下:

淺談Kafka特性與架構

生產者向Kafka集群提供數據,消費者從Kafka集群拉取數據,Kafka集群的調度由Zookeeper負責

Zookeeper

Kafka集群的元數據保存在Zookeeper中,除此之外不存儲任何消息數據。每一個Broker都需要在Zookeeper上註冊並不斷在上面更新自己的元數據(Topic和Partition信息),Zookeeper會使用這些數據信息來實現動態的集群擴容

Producer和Consumer都會在Zookeeper上註冊監聽器(Watcher),用於在Zookeeper發生變化時作出響應的調整。同時,Consumer還會向Zookeeper中註冊自己消費的Partition列表,用於發現Broker並與Partition建立socket連接

核心組件

Partition

Kafka中的Topic是以Partition的形式存放的,一個Topic會被拆分為多個Partition,存放在多臺服務器上。Producer在生產數據時會根據一定的規則將數據寫入指定Topic下的Partition中

可以設置每一個Topic的Partition數量,但是需要注意的是,一個Partition只能供一個Consumer消費,如果Partition過少,就可能會有Consumer消費不到數據。另外,建議partition的數量也需要大於集群中Broker的數量,這樣可以讓Partition Leader儘量均勻地分佈在各個Broker中。同時也需要注意,拆分的Partiton越多,也就意味著需要更多的空間

通常一個Partition需要有數個副本(Replication),Kafka允許用戶設置一份數據的備份個數,副本會存儲在不同的Broker上。在所有的副本中(包括自己),會存在一個Partition Leader用於進行讀寫,Leader的選舉調度等操作由Zookeeper來完成

Producer

Producer直接將消息發送到Broker的Partition Leader上,不需要經過代理中轉等操作,因為在設計時,Kafka集群中的每一個Broker都可以單獨響應Producer的操作,並返回Topic的一些信息(存活的機器/Leader位置/…)

Producer客戶端負責採用指定的負載均衡算法,管理消息會被推送到哪些Partition上。同時Producer可以將消息在內存中累計到一定數量時,作為一個Batch進行發送,能夠有效減少IO次數,進而提高效率。具體的Batch參數可以手動設置,可以是累計的數量大小/時間間隔等

Producer可以異步地向Kafka發送數據,在發送後會收到一個Futrue響應,包含offset值等信息。可以通過指定acks參數來控制Producer要求收到的確認消息個數

  • acks參數為n時:只有當n個partition副本收到消息後,producer才會收到broker的確認
  • acks參數為-1時:producer會在所有partition副本收到消息後得到broker的確認
  • acks參數為0時:producer不會等待broker的響應,可以得到最大的吞吐量,但是可能會導致數據丟失

Consumer

Kafka中,讀取消息的offset值由Consumer進行維護,因此consumer可以自由選取讀取消息的方式。同時,不管消息有沒有被消費,數據都會在kafka中保存一段時間

Kafka提供了兩種consumer api,分別是high-level api和sample api。Sample api只維持了和單一Broker的連接,同時是無狀態的,每次請求都需要指定offset值,所以也更為靈活

High-Level api封裝了對集群中broker的訪問,可以透明的訪問一個topic,同時也維持了已消費消息的狀態,每次消費的都是下一個消息。High-Level api還支持以組(CG)的形式消費消息,消息會被髮送給所有的CG,CG內部會選擇按順序發送給所有Consumer或是指定的Consumer

核心機制

消息壓縮

Kafka可以以集合(batch)形式發送數據,在此基礎上,kafka可以對batch進行壓縮。在producer端進行壓縮後,在consumer進行解壓,減少了傳輸所需的數據量,減輕對網絡的壓力。kafka在消息頭部增加了一個字節用於描述壓縮屬性,這個字節後兩位表示壓縮採用的編碼,如果後兩位為0,表示消息未被壓縮

消息可靠性

最理想的情況是消息發送成功,並且只發送了一次,這種情況叫做exactly-once,但是不可避免的會發生消息發送失敗以及消息重複發送的情況

為了解決這類問題,在producer端,當一個消息被髮送後,producer會等待broker發送響應,收到響應後producer會確認消息已經被正確發送給kafka,否則就會重新發送

在consumer端,因為broker記錄了partition中的offset值,這個值指向consumer下一個消費的消息,如果consumer收到消息但是消費失敗,broker可以根據offset值來找到上一個消息,同時consumer還可以控制offset值,來對消息進行任意處理

備份機制

(在“核心組件-Partition”中已經對此部分做了敘述)

消息消費策略

消費策略分類

固定分區消費

consumer在進行消息消費時,可以指定消息某分區的消息

Rebalance分區消費

一般地,一個topic下會有多個partition,而一個partition只能被一個CG中的consumer消費,可以通過指定rebalance策略,來採用不同的消費方式。Rebalance策略有兩種,範圍分區(Range)和輪詢分區(RoundRobin),範圍分區策略,即對topic下的partition進行排序,將partition數量除以CG下的consumer數量,從而得出每一個consumer消費哪幾個分區

輪詢分區策略則是將partition按照hashcode進行排序,然後通過分區取模來給consumer分配partition

Rebalance的觸發時機

當以下三種情況發生時,會觸發rebalance操作,重新指定分區:

  • CG內部加入了新的consumer
  • consumer離開CG
  • topic新增partition

Rebalance的執行過程

rebalance的執行由CG Leader來完成,並負責在執行結束後將執行結果通過broker集群中的coordinator廣播到CG。當CG的第一個consumer啟動後,這個consumer會和kafka確定組內的coordinator,之後CG內的所有成員都會和該coordinator進行通信

CG Leader的選舉有兩個階段,Join GroupSynchronizing Group State

  1. Join Group階段,所有成員都會向coordinator發送JoinGroup請求,當所有consumer都發送請求後, coordinator會選擇一個consumer擔任leader,並把CG的信息發送給該leader
  2. Synchronizing Group State階段,所有consumer都會向coordinator發送SynchronizingGroupState請求,而leader則將分區方案發送給coordinator,coordinator會在接受到分區方案後,將分區結果返回給所有consumer,這樣就完成分區方案的同步

高效性設計

消息持久化

消息的持久化並不僅僅是出於數據備份的需要,一個事實是,線性讀寫的時間遠遠高於隨機讀寫,對磁盤的線性讀所消耗的時間在有些情況下可以比內存的隨機訪問更快,所以現代很多操作系統會把空閒的內存用作磁盤緩存,儘管會在內存回收時帶來性能損耗,但是在讀寫上帶來的效率提升是顯著的

基於這樣的事實,利用文件系統依靠頁緩存來維護數據,會比維護一個內存緩存更好,因為採用了更為緊湊的數據結構。不同於維護儘可能多的內存緩存,如果我們將數據寫入到一個持久化日誌中,不調用刷新程序,這意味著數據將被傳輸到內核中並在稍後被刷新,我們也可以通過配置來控制數據在什麼時候刷新到物理磁盤上

常數時間的保證

kafka中持久化消息隊列採用對文件的讀寫來實現,類似日誌的形式。儘管這種操作不支持豐富的語義,但是可以很高效的進行並行操作,並且所有的操作都是常數時間,最終系統的性能和數據大小完全無關,可以充分利用硬盤來進行高效的消息服務

字節拷貝

為了解決字節拷貝的問題,kafka採用“標準字節消息”這種消息格式,這種格式在producer、consumer和broker間共享,kafka的日誌文件都是按“標準字節消息”這種格式寫入磁盤中。unix系統為了提高頁面緩存和socket之間的數據傳遞效率,使用了“零拷貝”機制,即sendfile system call 系統調用,java中也提供了訪問這個系統調用的接口

為了解釋為什麼這種方式能解決字節拷貝帶來的性能損耗,我們先來描述將數據從文件發送到socket的一般步驟:

  1. os將數據從磁盤讀到內核空間的頁緩存中
  2. 應用將數據從內核空間讀到用戶空間的頁緩存中
  3. 應用將數據寫回內核空間的socket緩存中
  4. os將數據從socket緩存寫到網卡緩存中
  5. 數據經網絡發出

我們可以發現這個過程至少涉及4次字節拷貝,2次系統調用,2次內核態到用戶態的切換,而如果我們能夠直接將數據寫入socket緩存中,就能減少很多不必要的切換。如果使用了sendfile的方式,數據可以直接由內核頁緩存直接拷貝到內核socket緩存中,不需要進行額外的系統狀態切換。通過這種方式,即使下游有很多consumer,也不會對集群服務造成壓力

想更詳細瞭解零拷貝機制的可見我的另一篇文章:淺談零拷貝機制

頻繁小IO

頻繁的小io可以通過一次性發送一個消息集合,而不是隻發送一條消息來解決,消息在服務器以消息塊的形式添加到日誌中。同時consumer在查詢時也會一次查詢大量的線性數據塊。消息集合(Message Set)將一個字節數組或文件進行打包,同時可以有選擇地進行反序列化

相關文章

手寫源碼(四):自己實現Mybatis

手寫源碼(三):自己實現SpringMVC

手寫源碼(二):自己實現SpringIOC

手寫源碼(一):自己實現Spring事務