訊息服務百科全書——資訊生產與消費

生產者

1.1 負載均衡

生產者直接傳送訊息到分割槽的leader上,中間不需要任何路由選擇。 客戶端控制了訊息將被送到哪個分割槽上。通常能夠使用隨機負荷分擔的方式,或者使用某種標誌來分配。我們提供了介面,允許使用者指定關鍵字來hash到特定的分割槽(語義分割槽),比如,使用user id來作為關鍵字,那麼特定使用者的訊息就會被髮送到相同的分割槽中。這樣也可以允許消費者明確他們的消費,這種設計明確允許Locality-sensitive處理。

 

1.2 非同步傳送

批量處理可以極大的提升處理效率,在Kafka中,生產者有個選項(producer.type=async)可用指定使用非同步分發出產請求(produce request)。這樣就允許用一個記憶體佇列(in-memory queue)把生產請求放入緩衝區,然後再以某個時間間隔或者事先配置好的批量大小將資料批量傳送出去(如64K,10ms)。因為一般來說資料會從一組以不同的資料速度生產資料的異構的機器中釋出出,所以對於代理而言,這種非同步緩衝的方式有助於產生均勻一致的流量,因而會有更佳的網路利用率和更高的吞吐量。

1.3 新生產者

 

如上圖,藍色的框圖屬於Producer物件中的部件。客戶端呼叫Send介面傳送訊息,先會追加到RecordAccumulator中形成一個一個的RecordBatch,然後再由後臺的Sender Thread針對每個節點傳送請求,請求中帶有每個分割槽的RecordBatch。Kafka叢集給Sender Thread的請求迴響應,每個RecordBatch中的訊息要麼都寫入成功,要麼都失敗。

New Producer與Old Producer最主要的區別就是New Producer使用非阻塞式IO並且實現都是基於非同步的,同步介面也是基於非同步實現。而Old Producer的非同步介面是基於同步介面來實現。用下面的圖可以說明問題:

可以看出使用非阻塞式的設計可以使得Producer同樣傳送3個請求完成的時間大大減少,提升吞吐量。 具體傳送流程如下:

消費者

Kafka的消費者是通過往broker發起一個“fetch”操作來獲取分割槽中的資料的。每個請求中都攜帶有“offset”,分割槽將從這個位置返回一塊資料給消費者。因此,消費的主動權是在消費者上的,消費者甚至可以移動offset來重複消費。

 

2.1 Push vs Pull

到底是應該讓使用者從代理那裡吧資料Pull(拉)回來還是應該讓代理把資料Push(推)給使用者。和大部分訊息系統一樣,Kafka在這方面遵循了一種更加傳統的設計思路:由生產者將資料Push給代理,然後由使用者將資料代理那裡Pull回來。 近來有些系統,比如scribe和flume,更著重於日誌統計功能,遵循了一種非常不同的基於Push的設計思路,其中每個節點都可以作為代理,資料一直都是向下遊Push的。上述兩種方法都各有優缺點。然而,因為基於Push的系統中代理控制著資料的傳輸速率,因此它難以應付大量不同種類的使用者。我們的設計目標是,讓使用者能以它最大的速率使用資料。不幸的是,在Push系統中當資料的使用速率低於產生的速率時,使用者往往會處於超載狀態(這實際上就是一種拒絕服務攻擊)。基於Pull的系統在使用者的處理速度稍稍落後的情況下會表現更佳,而且還可以讓使用者在有能力的時候往往前趕趕。讓使用者採用某種退避協議(backoff protocol)向代理表明自己處於超載狀態,可以解決部分問題,但是,將傳輸速率調整到正好可以完全利用(但從不能過度利用)使用者的處理能力可比初看上去難多了。以前我們嘗試過多次,想按這種方式構建系統,得到的經驗教訓使得我們選擇了更加常規的Pull模型。 另外一個好處就是:基於pull的系統,可以批量打包資料送給消費者。一個PUSH系統,要麼立即送一個請求,要麼在不知道消費者能夠處理的情況下,聚集一些資料,延遲一次傳送。這樣會引入不必要的時延。基於PULL的設計,總是能夠立即去pull所有的可用的訊息,所以能夠得到最優的批量打包,而不會引入時延。 原生的pull系統一個缺點就是:當broker沒有訊息的時候,consumer會進入迴圈等待,忙等待資料達到。為了避免這種情況,可以在pull請求中設定“long pull”引數(等待給定數目的訊息可用)。

 

2.2 消費者狀態

追蹤(客戶)消費了什麼是一個訊息系統必須提供的一個關鍵功能之一。它並不直觀,但是記錄這個狀態是該系統的關鍵效能之一。

 大部分訊息系統保留著關於代理者使用(消費)的訊息的後設資料。也就是說,當訊息被交到客戶手上時,代理者自己記錄了整個過程。這是一個相當直觀的選擇,而且確實對於一個單機伺服器來說,它(資料)能去(放在)哪裡是不清晰的。又由於許多訊息系統儲存使用的資料結構規模小,所以這也是個實用的選擇–因為代理者知道什麼被消費了使得它可以立刻刪除它(資料),保持資料大小不過大。

也許不顯然的是,讓代理和使用者這兩者對訊息的使用情況做到一致表述絕不是一件輕而易舉的事情。如果代理每次都是在將訊息傳送到網路中後就將該訊息記錄為已使用的話,一旦使用者沒能真正處理到該訊息(比方說,因為它宕機或這請求超時了抑或別的什麼原因),就會出現訊息丟失的情況。為了解決此問題,許多訊息系新加了一個確認功能,當訊息發出後僅把它標示為已傳送而不是已使用,然後代理需要等到來自使用者的特定的確認資訊後才將訊息記錄為已使用。這種策略的確解決了丟失訊息的問題,但由此產生了新問題。首先,如果使用者已經處理了該訊息但卻未能傳送出確認資訊,那麼就會讓這一條訊息被處理兩次。第二個問題是關於效能的,這種策略中的代理必須為每條單個的訊息維護多個狀態(首先為了防止重複傳送就要將訊息鎖定,然後,然後還要將訊息標示為已使用後才能刪除該訊息)。另外還有一些棘手的問題需要處理,比如,對於那些以發出卻未得到確認的訊息該如何處理?

 Kafka有一些特別的做法,Topic被劃分為一系列有序的分割槽,每個分割槽在任何時候都只有一個消費者來消費。這意味著在每個分割槽中,消費者位置就是一個簡單的數值。這使得消費者位置確認的消耗非常小,一個分割槽一個數值而已。這個資料能被週期性的確認,非常容易。 這個決策還帶來一個額外的好處。使用者可用故意回退(rewind)到以前的偏移量處,再次使用一遍以前使用過的資料。雖然這麼做違背了佇列的一般協約(contract),但對很多使用者來講卻是個很基本的功能。舉個例子,如果使用者的程式碼裡有個Bug,而且是在它處理完一些訊息之後才被發現的,那麼當把Bug改正後,使用者還有機會重新處理一遍那些訊息。

 

2.3 新消費者

以前老的消費者主要碰到主要受到腦裂和羊群效應的影響,這些問題包括:

 

2.3.1 腦裂:

在一個大叢集中往往會有一個master存在,在長期執行過程中不可避免的會出現宕機等問題導致master不可用,在出現這樣的情況以後往往會對系統產生很大的影響,所以一般的分散式叢集中的master都採用了高可用的解決方案來避免這樣的情況發生。

 master-slaver方式,存在一個master節點,平時對外服務,同時有一個slaver節點,監控著master,同時有某種方式來進行資料的同步。如果在master掛掉以後slaver能很快獲知並迅速切換成為新的master。在以往master-slaver的監控切換是個很大的難題,但是現在有了Zookeeper的話能比較優雅的解決這一類問題。

 master-slaver結構 master-slaver實現起來非常簡單,而且在master上面的各種操作效率要較其他HA解決方案要高,早期的時候監控和切換很難控制,但是後來zookeeper出現了,他的watch和分散式鎖機制很好的解決了這一類問題。

我們的系統和同事的系統都是這種模式,但是後來都發現由於ZooKeeper使用上的問題存在腦裂的問題。 記得很久以前參加一個大牛的技術交流會他就提到過在叢集中假死問題是一個非常讓人頭痛的問題,假死也是導致腦裂的根源。

根據一個什麼樣的情況能判斷一個節點死亡了down掉了,人可能很容易判斷,但是對於在分散式系統中這些是有監控者來判斷的,對於監控者來說很難判定其他的節點的狀態,唯一可靠點途徑的就是心跳,包括ZooKeeper就是使用心跳來判斷客戶端是否仍然活著的,使用ZooKeeper來做master HA基本都是同樣的方式,每個節點都嘗試註冊一個象徵master的臨時節點其他沒有註冊成功的則成為slaver,並且通過watch機制監控著master所建立的臨時節點,Zookeeper通過內部心跳機制來確定master的狀態,一旦master出現意外Zookeeper能很快獲悉並且通知其他的slaver,其他slaver在之後作出相關反應。這樣就完成了一個切換。這種模式也是比較通用的模式,基本大部分都是這樣實現的,但是這裡面有個很嚴重的問題,如果注意不到會導致短暫的時間內系統出現腦裂,因為心跳出現超時可能是master掛了,但是也可能是master,zookeeper之間網路出現了問題,也同樣可能導致。這種情況就是假死,master並未死掉,但是與ZooKeeper之間的網路出現問題導致Zookeeper認為其掛掉瞭然後通知其他節點進行切換,這樣slaver中就有一個成為了master,但是原本的master並未死掉,這時候client也獲得master切換的訊息,但是仍然會有一些延時,zookeeper需要通訊需要一個一個通知,這時候整個系統就很混亂可能有一部分client已經通知到了連線到新的master上去了,有的client仍然連線在老的master上如果同時有兩個client需要對master的同一個資料更新並且剛好這兩個client此刻分別連線在新老的master上,就會出現很嚴重問題。

出現這種情況的主要原因在與Zookeeper叢集和Zookeeperclient判斷超時並不能做到完全同步(這些還依賴於作業系統排程等,很難保證),也就是說可能一前一後,如果是叢集先於client發現那就會出現上面的情況了。同時在發現並切換後通知各個客戶端也有先後快慢。出現這種情況的機率很小,需要master與zookeeper叢集網路斷開但是與其他叢集角色之間的網路沒有問題,還要滿足上面那些條件,但是一旦出現就會引發很嚴重的後果,資料不一致了。

 避免這種情況其實也很簡單,在slaver切換的時候不在檢查到老的master出現問題後馬上切換,而是在休眠一段足夠的時間,確保老的master已經獲知變更並且做了相關的shutdown清理工作了然後再註冊成為master就能避免這類問題了,這個休眠時間一般定義為與zookeeper定義的超時時間就夠了,但是這段時間內系統可能是不可用的,但是相對於資料不一致的後果我想還是值得的。

當然最徹底的解決這類問題的方案是將master HA叢集做成peer2peer的,遮蔽掉外部Zookeeper的依賴。每個節點都是對等的沒有主次,這樣就不會存在腦裂的問題,但是這種ha解決方案需要使用兩階段,paxos這類資料一致性保證協議來實現,不可避免的會降低系統資料變更的系統,如果系統中主要是對master的讀取操作很少更新就很適合了。

 

2.3.2 羊群效應

例如Zookeeper的一個羊群效應例子是當一個特定的znode 改變的時候ZooKeper 觸發了所有watches 的事件。

 舉個例子,如果有1000個客戶端watch 一個znode的exists呼叫,當這個節點被建立的時候,將會有1000個通知被髮送。這種由於一個被watch的znode變化,導致大量的通知需要被髮送,將會導致在這個通知期間的其他操作提交的延遲。因此,只要可能,我們都強烈建議不要這麼使用watch。僅僅有很少的客戶端同時去watch一個znode比較好,理想的情況是隻有1個。

 舉個例子,有n 個clients 需要去拿到一個全域性的lock.一種簡單的實現就是所有的client 去create 一個/lock znode.如果znode 已經存在,只是簡單的watch 該znode 被刪除。當該znode 被刪除的時候,client收到通知並試圖create /lock。這種策略下,就會存在上文所說的問題,每次變化都會通知所有的客戶端。

 New Consumer採用協調者來避免羊群效應,只由協調者來watch 分割槽的變化,不需要每個consumer都watch。

 

2.3.3 協調者

新消費者為了不依賴zk,需要使用kafka完成同一個消費組內消費者之間的對Topic和分割槽的分配工作。新消費者只需要使用其中一個broker作為分散式協調即可,每一個broker都可被選舉為一部分消費組的協調者(coordinator)。其主要職責在於策劃rebalance操作時分割槽的分配,同時負責把分割槽的分配資訊告知消費者。

 

2.3.4 消費流程