分散式訊息佇列 RocketMQ原始碼解析:Message拉取&消費(下)

NO IMAGE

???關注微信公眾號:【芋艿的後端小屋】有福利:

RocketMQ / MyCAT / Sharding-JDBC 所有原始碼分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文註釋原始碼 GitHub 地址

您對於原始碼的疑問每條留言將得到認真回覆。甚至不知道如何讀原始碼也可以請教噢

新的原始碼解析文章實時收到通知。每週更新一篇左右


1、概述

本文接:《RocketMQ 原始碼分析 —— Message 拉取與消費(上)》

主要解析 Consumer

說明 :訂閱 Topic

說明 :根據 Topic

說明 :註冊訊息監聽器。

5、PushConsumer 訊息佇列分配

RebalanceService&PushConsumer分配佇列

RebalanceService

// 省略程式碼....

說明 :均衡訊息佇列服務,負責分配當前 Consumer

說明 :遍歷當前 Client

說明:執行訊息佇列分配。
第 3 行 :呼叫 RebalanceImpl#doRebalance(...)

#doRebalance(...)

#rebalanceByTopic(...)

說明 :移除不需要的訊息佇列相關的資訊,並返回是否移除成功。
第 2 至 4 行 :同步佇列的消費進度,並移除之。
第 5 至 27 行 :順序消費

說明 :移除不需要的訊息佇列相關的資訊,並返回移除成功。RebalancePushImpl#removeUnnecessaryMessageQueue(...)

說明 :發起訊息拉取請求。該呼叫是PushConsumer

說明 :提交拉取請求。提交後,PullMessageService

說明 :平均分配佇列策略。
第 7 至 25 行 :引數校驗。

第 26 至 36 行 :平均分配訊息佇列。

第 27 行 :index

說明 :平均分配可消費的 Broker

說明 :環狀分配訊息佇列。

AllocateMessageQueueByConfig

// 省略程式碼....

說明 :分配配置的訊息佇列。
疑問 :該分配策略的使用場景。

5、PushConsumer 消費進度讀取

RebalancePushImpl#computePullFromWhere(…)

// 省略程式碼....

說明 :計算訊息佇列開始消費位置。

PushConsumer

說明 :拉取訊息服務,不斷不斷不斷從 Broker

#pullMessage(...)

第 150 至 159 行 :有新訊息但是不匹配( NO_MATCHED_MSG

第 196 至 204 行 :發生異常,提交延遲拉取訊息請求。

#correctTagsOffset(...)

說明 :拉取訊息核心方法。該方法引數較多,可以看下程式碼註釋上每個引數的說明?。

第 34 至 43 行 :獲取 Broker

說明 :計算訊息佇列拉取訊息對應的 Broker

說明 :獲取 Broker

說明 :處理拉取結果。

更新訊息佇列拉取訊息 Broker

總結

如果用最簡單粗暴的方式描述 PullConsumer

6、PushConsumer 消費訊息

DefaultMQPushConsumerImpl消費訊息

ConsumeMessageConcurrentlyService 提交消費請求

ConsumeMessageConcurrentlyService#submitConsumeRequest(…)

// 省略程式碼....

說明 :提交立即消費請求。
第 16 至 22 行 :提交訊息小於等於批量消費數,直接提交消費請求。

第 23 至 47 行 :當提交訊息大於批量消費數,進行分拆成多個請求。

第 25 至 33 行 :計算當前拆分請求包含的訊息。
第 35 至 38 行 :提交拆分消費請求。
第 39 至 44 行 :提交請求被拒絕,則將當前拆分訊息 剩餘訊息提交延遲消費請求,結束拆分迴圈。

ConsumeMessageConcurrentlyService#submitConsumeRequestLater

// 省略程式碼....

說明 :提交延遲消費請求。
第 34 行 :直接呼叫 ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);

說明 :消費請求。提交請求執行消費。
第 24 至 28 行 :廢棄處理佇列不進行消費。
第 34 至 44 行 :Hook。
第 51 行 :當訊息為重試訊息,設定 Topic

說明 :處理消費結果。
第 8 至 10 行 :消費請求訊息未空時,直接返回。

第 12 至 32 行 :計算 ackIndex

ConsumeMessageConcurrentlyService#cleanExpireMsg(…)

// 省略程式碼....

說明 :定時清理過期訊息,預設週期:15min。

ProcessQueue#cleanExpiredMsg(…)

// 省略程式碼....

說明 :移除過期訊息。
第 2 至 5 行 :順序消費時,直接返回。
第 7 至 9 行 :迴圈移除訊息。預設最大迴圈次數:16次。
第 10 至 25 行 :獲取第一條訊息。判斷是否超時,若不超時,則結束迴圈。
第 29 行 :發回超時訊息到Broker

說明 :發回訊息。
第 4 至 8 行 :Consumer

8、Consumer 消費進度

OffsetStore

OffsetStore類圖.png

RemoteBrokerOffsetStore

說明 :從本地檔案載入消費進度到記憶體。

OffsetSerializeWrapper

// 省略程式碼....

說明 :本地 Offset

RemoteBrokerOffsetStore#load(…)

// 省略程式碼....

說明 :不進行載入,實際讀取消費進度時,從 Broker

第 16 行 :從 檔案

第 16 行 :從 Broker

OffsetStore#persistAll(…)

LocalFileOffsetStore#persistAll(…)

// 省略程式碼....

說明 :持久化消費進度。將消費進度寫入檔案

RemoteBrokerOffsetStore#persistAll(…)

// 省略程式碼....

說明 :持久化指定訊息佇列陣列的消費進度到 Broker

說明 :定時進行持久化,預設週期:5000ms。

重要說明 :

消費進度持久化不僅僅只有定時持久化,拉取訊息、分配訊息佇列等等操作,都會進行消費進度持久化。
消費進度持久化不僅僅只有定時持久化,拉取訊息、分配訊息佇列等等操作,都會進行消費進度持久化。
消費進度持久化不僅僅只有定時持久化,拉取訊息、分配訊息佇列等等操作,都會進行消費進度持久化。

9、結尾

?可能是本系列最長的一篇文章,如有表達錯誤和不清晰,請多多見諒。
感謝對本系列的閱讀、收藏、點贊、分享,特別是翻到結尾。?真的有丟丟長。