rocket mq的工作原理

##一、整體介紹 在RocketMQ裡,有以下幾個核心的模組:Producer,Consumer,Broker,NameSrv。他們之間的關係如下: arch

先簡單瞭解一下各個模組的功能,下面會有章節詳細介紹各個模組的功能。

  • Producer和Consumer很好理解,顧名思義就是生產者和消費者,生產者負責生產訊息,消費者負責消費訊息,這2塊的邏輯都是由業務使用者定義的。

  • Broker是RocketMQ的核心,Broker實現了訊息的儲存、拉取等功能。Broker通常以叢集方式啟動,並可配置主從,每個Broker上提供對指定topic的服務。理解了Broker的原理,以及和其他服務互動的方式就基本弄懂了整個訊息中介軟體的原理。

  • NameSrv是一個無狀態的名稱服務,可以叢集部署。所有Broker啟動的時候會向NameSrv註冊自己的資訊。Producer會根據目標topic從NameSrv獲取到達指定Broker的路由資訊,Consumer同理。

對於Producer端RocketMQ採用了輪詢的方式保證了負載均衡,Consumer端通常採用cluster叢集方式消費訊息,我們可以自己定義訊息在訊息端的分配方式。另外,MQ還提供了順序訊息的特性,簡單瞭解一下MQ提供的特性即可,具體實現後面章節會進行闡述。

原始碼目錄結構介紹&Remoting通訊層

一:原始碼目錄結構介紹 RocketMQ原始碼分為以下幾個package:

  • rocketmq-broker:整個mq的核心,他能夠接受producer和consumer的請求,並呼叫store層服務對訊息進行處理。HA服務的基本單元,支援同步雙寫,非同步雙寫等模式。
  • rocketmq-client::mq客戶端實現,目前官方僅僅開源了java版本的mq客戶端,c ,go客戶端有社群開源貢獻。
  • rocketmq-common:一些模組間通用的功能類,比如一些配置檔案、常量。
  • rocketmq-example:官方提供的例子,對典型的功能比如order message,push
    consumer,pull consumer的用法進行了示範。
  • rocketmq-filtersrv:訊息過濾服務,相當於在broker和consumer中間加入了一個filter代理。
  • rocketmq-remoting:基於netty的底層通訊實現,所有服務間的互動都基於此模組。
  • rocketmq-srvut:解析命令列的工具類。
  • rocketmq-store:儲存層實現,同時包括了索引服務,高可用HA服務實現。
  • rocketmq-tools:mq叢集管理工具,提供了訊息查詢等功能。

底層基於Netty網路庫驅動

producer 

producer 1.啟動流程

Producer如何感知要傳送訊息的broker即brokerAddrTable中的值是怎麼獲得的,

1.      傳送訊息的時候指定會指定topic,如果producer集合中沒有會根據指定topic到namesrv獲取topic釋出資訊TopicPublishInfo,並放入本地集合
2.      定時從namesrv更新topic路由資訊,

Producer與broker間的心跳

Producer定時傳送心跳將producer資訊(其實就是procduer的group)定時傳送到, brokerAddrTable集合中列出的broker上去
Producer傳送訊息只傳送到master的broker機器,在通過broker的主從複製機制拷貝到broker的slave上去

producer 2.如何傳送訊息

Producer輪詢某topic下的所有佇列的方式來實現傳送方的負載均衡

 

 

1)  Topic下的所有佇列如何理解:

複製程式碼
比如broker1, broker2, borker3三臺broker機器都配置了Topic_A
Broker1 的佇列為queue0 , queue1
Broker2 的佇列為queue0, queue2, queue3,
Broker3 的佇列為queue0
當然一般情況下的broker的配置都是一樣的
以上當broker啟動的時候註冊到namesrv的Topic_A佇列為共6個分別為:
broker1_queue0, broker1_queue1,
broker2_queue0, broker2_queue1, broker2_queue2,
broker3_queue0,
複製程式碼

2)  Producer如何實現輪詢佇列:

複製程式碼
Producer從namesrv獲取的到Topic_A路由資訊TopicPublishInfo
--List<MessageQueue>messageQueueList  //Topic_A的所有的佇列
--AtomicIntegersendWhichQueue        //自增整型
方法selectOneMessageQueue方法用來選擇一個傳送佇列
(  sendWitchQueue)% messageQueueList.size為佇列集合的下標
每次獲取queue都會通過sendWhichQueue加一來實現對所有queue的輪詢
如果入參lastBrokerName不為空,代表上次選擇的queue傳送失敗,這次選擇應該避開同一個queue
複製程式碼

3)  Producer發訊息系統重試:

複製程式碼
傳送失敗後,重試幾次retryTimesWhenSendFailed = 2
傳送訊息超時sendMsgTimeout = 3000
Producer通過selectOneMessageQueue方法獲取一個MessagQueue物件
--topic            //Topic_A
--brokerName           //代表傳送訊息到達的broker
--queueId              //代表傳送訊息的在指定broker上指定topic下的佇列編號
向指定broker的指定topic的指定queue傳送訊息
傳送失敗(1)重試次數不到兩次(2)傳送此條訊息花費時間還沒有到3000(毫秒), 換個佇列繼續傳送。 
複製程式碼

 producer傳送普通訊息

producer 3.如何傳送順序訊息

複製程式碼
Rocketmq能夠保證訊息嚴格順序,但是Rocketmq需要producer保證順序訊息按順序傳送到同一個queue中,比如購買流程(1)下單(2)支付(3)支付成功,
這三個訊息需要根據特定規則將這個三個訊息按順序傳送到一個queue 如何實現把順序訊息傳送到同一個queue: 一般訊息是通過輪詢所有佇列傳送的,順序訊息可以根據業務比如說訂單號orderId相同的訊息傳送到同一個佇列, 或者同一使用者userId傳送到同一佇列等等 messageQueueList [orderId%messageQueueList.size()] messageQueueList [userId%messageQueueList.size()]
複製程式碼

 

 

producer 4.如何釋出分散式事務訊息

先引入官方文件圖:

  

分散式事物是基於二階段提交的

複製程式碼
1)      一階段,向broker傳送一條prepared的訊息,返回訊息的offset即訊息地址commitLog中訊息偏移量。Prepared狀態訊息不被消費
傳送訊息ok,執行本地事物分支, 本地事物方法需要實現rocketmq的回撥介面

2) LocalTransactionExecuter,
處理本地事物邏輯返回處理的事物狀態LocalTransactionState 3) 二階段,處理完本地事物中業務得到事物狀態, 根據offset查詢到commitLog中的prepared訊息,設定訊息狀態commitType或者rollbackType,
讓後將資訊新增到commitLog中, 其實二階段生成了兩條訊息
複製程式碼

事物訊息傳送

 

producer 5.訊息在落地broker落地之普通訊息

Broker根據producer請求的RequestCode.SEND_MESSAGE選擇對應的處理器SendMessageProcessor
根據請求訊息內容構建訊息內部結構MessageExtBrokerInner
調DefaultMessageStore加訊息寫入commitlog

 

producer 6.訊息在落地broker落地之事務訊息

1. 訊息落地

commitLog針對事物訊息的處理,訊息的第20位開始的八位記錄是的訊息在邏輯佇列中的queueoffset,
但是針對事物訊息為preparedType和rollbackType的儲存的是事物狀態表的索引偏移量

2. 分發事物訊息:   

複製程式碼
  分發訊息位置資訊到ConsumeQueue: 事物狀態為preparedType和rollbackType的訊息不會將請求分發到ConsumeQueue中去,即不處理,所以不會被訊息
更新transactionstable table:如果是prepared訊息記,通過TransactionStateService服務將訊息加到儲存事務狀態的表格tranStateTable的檔案中;
如果是commitType和rollbackType訊息, 修改事物狀態表格tranStateTable中的訊息狀態。 記錄Transaction Redo Log日誌: 記錄了 commitLogOffset, msgSize,preapredTransactionOffset, storeTimestamp。
複製程式碼

3. 事物狀態表

         事物狀態表是有MapedFileQueue將多個檔案組成一個連續的佇列,它的儲存單元是定長為24個位元組的資料,
tranStateTableOffset可以認為是事物狀態訊息的個數,索引偏移量, 它的值是 tranStateTable.getMaxOffset()/ TSStoreUnitSize     

 

 

3. 事物回查

複製程式碼
定時回查執行緒會定時掃描(預設每分鐘)每個儲存事務狀態的表格檔案,遍歷儲存事務狀態的表格記錄
如果是已經提交或者回滾的訊息調過過,
如果是prepared狀態的如果訊息小於事務回查至少間隔時間(預設是一分鐘)跳出終止遍歷
調transactionCheckExecuter.gotocheck方法向producer回查事物狀態,
根據group隨機選擇一臺producer
查詢訊息,根據commitLogOffset和msgSize到commitlog查詢訊息
向Producder發起請求,請求code型別為CHECK_TRANSACTION_STATE,producer的DefaultMQProducerImpl.checkTransactionState()方法
來處理broker定時回撥的請求,這裡構建一個Runnable任務非同步執行producer註冊的回撥介面,處理回撥,在調endTransactionOneway向broker
傳送請求更新事物訊息的最終狀態 無Prepared訊息,且遍歷完,則終止掃描這個檔案的定時任務
複製程式碼

 

4. 事物訊息的load&recover

TransactionStateService.load ()事物狀態服務載入, 載入只是建立檔案對映
redoLog佇列恢復,載入本地redoLog檔案
tranStateTable事物狀態表, 載入本地tranStateTable檔案

 recover:

複製程式碼
正常恢復:
利用tranRedoLog檔案的recover
利用tranStateTable檔案重建事物狀態表
異常恢復:
先按照正常流程恢復TranRedo Log
commitLog異常恢復,commitLog根據checkpoint時間點重新生成 redolog,重新分發訊息DispatchRequest,
分發訊息到位置資訊到ConsumeQueue
更新Transaction State Table
記錄TransactionRedo Log
刪除事物狀態表tranStateTable
通過RedoLog全量恢復StateTable
重頭掃描RedoLog, 過濾出所有prepared狀態的訊息, 將commit或者rollback的訊息對應的prepared訊息刪除
重建StateTable,  將上面過濾出的prepared訊息,新增到事物狀態表檔案中
複製程式碼
這個事物狀態表transstable的作用是定期(1分鐘)將狀態為prepared事物回查producer端redolog這個佇列其實標記消費到哪了, 
事物狀態的恢復根本上是有commitlog來做的

 

 consumer主要2中模式

pull  and push

 

 

標籤: MQ NIO
0
0
« 上一篇:分散式開放訊息系統(RocketMQ)的原理與實踐
» 下一篇:大型網站技術架構(1)
posted @ 2016-11-08 10:19 Givefine 閱讀(2175)
評論(0) 編輯 收藏