RocketMQ原始碼解析:事務訊息

NO IMAGE

???關注微信公眾號:【芋艿的後端小屋】有福利:

RocketMQ / MyCAT / Sharding-JDBC 所有原始碼分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文註釋原始碼 GitHub 地址

您對於原始碼的疑問每條留言將得到認真回覆。甚至不知道如何讀原始碼也可以請教噢

新的原始碼解析文章實時收到通知。每週更新一篇左右


1. 概述

2. 事務訊息傳送

2.1 Producer 傳送事務訊息
2.2 Broker 處理結束事務請求
2.3 Broker 生成 ConsumeQueue

3. 事務訊息回查

3.1 Broker 發起【事務訊息回查】

3.1.1 官方V3.1.4:基於檔案系統

3.1.1.1 儲存訊息到 CommitLog
3.1.1.2 寫【事務訊息】狀態儲存(TranStateTable)
3.1.1.3 【事務訊息】回查
3.1.1.4 初始化【事務訊息】狀態儲存(TranStateTable)
3.1.1.5 補充

3.1.2 官方V4.0.0:基於資料庫

3.2 Producer 接收【事務訊息回查】

1. 概述

必須必須必須 前置閱讀內容:

《事務訊息(阿里雲)》

2. 事務訊息傳送

2.1 Producer 傳送事務訊息

活動圖如下(結合 核心程式碼

2.2 Broker 處理結束事務請求

? 查詢請求的訊息,進行提交 / 回滾。實現程式碼如下:

  1: // ⬇️⬇️⬇️【EndTransactionProcessor.java】
2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
3:     final RemotingCommand response = RemotingCommand.createResponseCommand(null);
4:     final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
5: 
6:     // 省略程式碼 =》列印日誌(只處理 COMMIT / ROLLBACK)
7: 
8:     // 查詢提交的訊息
9:     final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
10:     if (msgExt != null) {
11:         // 省略程式碼 =》校驗訊息
12: 
13:         // 生成訊息
14:         MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
15:         msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
16:         msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
17:         msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
18:         msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
19:         if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
20:             msgInner.setBody(null);
21:         }
22: 
23:         // 儲存生成訊息
24:         final MessageStore messageStore = this.brokerController.getMessageStore();
25:         final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
26: 
27:         // 處理儲存結果
28:         if (putMessageResult != null) {
29:             switch (putMessageResult.getPutMessageStatus()) {
30:                 // Success
31:                 case PUT_OK:
32:                 case FLUSH_DISK_TIMEOUT:
33:                 case FLUSH_SLAVE_TIMEOUT:
34:                 case SLAVE_NOT_AVAILABLE:
35:                     response.setCode(ResponseCode.SUCCESS);
36:                     response.setRemark(null);
37:                     break;
38:                 // Failed
39:                 case CREATE_MAPEDFILE_FAILED:
40:                     response.setCode(ResponseCode.SYSTEM_ERROR);
41:                     response.setRemark("create maped file failed.");
42:                     break;
43:                 case MESSAGE_ILLEGAL:
44:                 case PROPERTIES_SIZE_EXCEEDED:
45:                     response.setCode(ResponseCode.MESSAGE_ILLEGAL);
46:                     response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
47:                     break;
48:                 case SERVICE_NOT_AVAILABLE:
49:                     response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
50:                     response.setRemark("service not available now.");
51:                     break;
52:                 case OS_PAGECACHE_BUSY:
53:                     response.setCode(ResponseCode.SYSTEM_ERROR);
54:                     response.setRemark("OS page cache busy, please try another machine");
55:                     break;
56:                 case UNKNOWN_ERROR:
57:                     response.setCode(ResponseCode.SYSTEM_ERROR);
58:                     response.setRemark("UNKNOWN_ERROR");
59:                     break;
60:                 default:
61:                     response.setCode(ResponseCode.SYSTEM_ERROR);
62:                     response.setRemark("UNKNOWN_ERROR DEFAULT");
63:                     break;
64:             }
65: 
66:             return response;
67:         } else {
68:             response.setCode(ResponseCode.SYSTEM_ERROR);
69:             response.setRemark("store putMessage return null");
70:         }
71:     } else {
72:         response.setCode(ResponseCode.SYSTEM_ERROR);
73:         response.setRemark("find prepared transaction message failed");
74:         return response;
75:     }
76: 
77:     return response;
78: }

2.3 Broker 生成 ConsumeQueue

? 事務訊息,提交(COMMIT

3. 事務訊息回查

【事務訊息回查】功能曾經開源過,目前(V4.0.0)暫未開源。如下是該功能的開源情況:

版本【事務訊息回查】
官方V3.0.4 ~ V3.1.4基於 檔案系統 實現已開源
官方V3.1.5 ~ V4.0.0基於 資料庫 實現未完全開源

我們來看看兩種情況下是怎麼實現的。

3.1 Broker 發起【事務訊息回查】

3.1.1 官方V3.1.4:基於檔案系統

倉庫地址:https://github.com/YunaiV/roc…

相較於普通訊息,【事務訊息】多依賴如下三個元件:

TransactionStateService :事務狀態服務,負責對【事務訊息】進行管理,包括儲存與更新事務訊息狀態、回查事務訊息狀態等等。

TranStateTable :【事務訊息】狀態儲存。基於 MappedFileQueue

3.1.1.2 寫【事務訊息】狀態儲存(TranStateTable)

?處理【Half訊息】時,新增【事務訊息】狀態儲存(TranStateTable

3.1.1.3 【事務訊息】回查

?TranStateTable

3.1.1.4 初始化【事務訊息】狀態儲存(TranStateTable)

?根據最後 Broker 關閉是否正常,會有不同的初始化方式。

核心程式碼如下:

  1: // ⬇️⬇️⬇️【TransactionStateService.java】
2: /**
3:  * 初始化 TranRedoLog
4:  * @param lastExitOK 是否正常退出
5:  */
6: public void recoverStateTable(final boolean lastExitOK) {
7:     if (lastExitOK) {
8:         this.recoverStateTableNormal();
9:     } else {
10:         // 第一步,刪除State Table
11:         this.tranStateTable.destroy();
12:         // 第二步,通過RedoLog全量恢復StateTable
13:         this.recreateStateTable();
14:     }
15: }
16: 
17: /**
18:  * 掃描 TranRedoLog 重建 StateTable
19:  */
20: private void recreateStateTable() {
21:     this.tranStateTable = new MapedFileQueue(StorePathConfigHelper.getTranStateTableStorePath(defaultMessageStore
22:                 .getMessageStoreConfig().getStorePathRootDir()), defaultMessageStore
23:                 .getMessageStoreConfig().getTranStateTableMapedFileSize(), null);
24: 
25:     final TreeSet<Long> preparedItemSet = new TreeSet<Long>();
26: 
27:     // 第一步,從頭掃描RedoLog
28:     final long minOffset = this.tranRedoLog.getMinOffsetInQuque();
29:     long processOffset = minOffset;
30:     while (true) {
31:         SelectMapedBufferResult bufferConsumeQueue = this.tranRedoLog.getIndexBuffer(processOffset);
32:         if (bufferConsumeQueue != null) {
33:             try {
34:                 long i = 0;
35:                 for (; i < bufferConsumeQueue.getSize(); i  = ConsumeQueue.CQStoreUnitSize) {
36:                     long offsetMsg = bufferConsumeQueue.getByteBuffer().getLong();
37:                     int sizeMsg = bufferConsumeQueue.getByteBuffer().getInt();
38:                     long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
39: 
40:                     if (TransactionStateService.PreparedMessageTagsCode == tagsCode) { // Prepared
41:                         preparedItemSet.add(offsetMsg);
42:                     } else { // Commit/Rollback
43:                         preparedItemSet.remove(tagsCode);
44:                     }
45:                 }
46: 
47:                 processOffset  = i;
48:             } finally { // 必須釋放資源
49:                 bufferConsumeQueue.release();
50:             }
51:         } else {
52:             break;
53:         }
54:     }
55:     log.info("scan transaction redolog over, End offset: {},  Prepared Transaction Count: {}", processOffset, preparedItemSet.size());
56: 
57:     // 第二步,重建StateTable
58:     Iterator<Long> it = preparedItemSet.iterator();
59:     while (it.hasNext()) {
60:         Long offset = it.next();
61:         MessageExt msgExt = this.defaultMessageStore.lookMessageByOffset(offset);
62:         if (msgExt != null) {
63:             this.appendPreparedTransaction(msgExt.getCommitLogOffset(), msgExt.getStoreSize(),
64:                 (int) (msgExt.getStoreTimestamp() / 1000),
65:                 msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP).hashCode());
66:             this.tranStateTableOffset.incrementAndGet();
67:         }
68:     }
69: }
70: 
71: /**
72:  * 載入(解析)TranStateTable 的 MappedFile
73:  * 1. 清理多餘 MappedFile,設定最後一個 MappedFile的寫入位置(position
74:  * 2. 設定 TanStateTable 最大物理位置(可寫入位置)
75:  */
76: private void recoverStateTableNormal() {
77:     final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();
78:     if (!mapedFiles.isEmpty()) {
79:         // 從倒數第三個檔案開始恢復
80:         int index = mapedFiles.size() - 3;
81:         if (index < 0) {
82:             index = 0;
83:         }
84: 
85:         int mapedFileSizeLogics = this.tranStateTable.getMapedFileSize();
86:         MapedFile mapedFile = mapedFiles.get(index);
87:         ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
88:         long processOffset = mapedFile.getFileFromOffset();
89:         long mapedFileOffset = 0;
90:         while (true) {
91:             for (int i = 0; i < mapedFileSizeLogics; i  = TSStoreUnitSize) {
92: 
93:                 final long clOffset_read = byteBuffer.getLong();
94:                 final int size_read = byteBuffer.getInt();
95:                 final int timestamp_read = byteBuffer.getInt();
96:                 final int groupHashCode_read = byteBuffer.getInt();
97:                 final int state_read = byteBuffer.getInt();
98: 
99:                 boolean stateOK = false;
100:                 switch (state_read) {
101:                 case MessageSysFlag.TransactionPreparedType:
102:                 case MessageSysFlag.TransactionCommitType:
103:                 case MessageSysFlag.TransactionRollbackType:
104:                     stateOK = true;
105:                     break;
106:                 default:
107:                     break;
108:                 }
109: 
110:                 // 說明當前儲存單元有效
111:                 if (clOffset_read >= 0 && size_read > 0 && stateOK) {
112:                     mapedFileOffset = i   TSStoreUnitSize;
113:                 } else {
114:                     log.info("recover current transaction state table file over,  "   mapedFile.getFileName()   " "
115:                               clOffset_read   " "   size_read   " "   timestamp_read);
116:                     break;
117:                 }
118:             }
119: 
120:             // 走到檔案末尾,切換至下一個檔案
121:             if (mapedFileOffset == mapedFileSizeLogics) {
122:                 index  ;
123:                 if (index >= mapedFiles.size()) { // 迴圈while結束
124:                     log.info("recover last transaction state table file over, last maped file "   mapedFile.getFileName());
125:                     break;
126:                 } else { // 切換下一個檔案
127:                     mapedFile = mapedFiles.get(index);
128:                     byteBuffer = mapedFile.sliceByteBuffer();
129:                     processOffset = mapedFile.getFileFromOffset();
130:                     mapedFileOffset = 0;
131:                     log.info("recover next transaction state table file, "   mapedFile.getFileName());
132:                 }
133:             } else {
134:                 log.info("recover current transaction state table queue over "   mapedFile.getFileName()   " "   (processOffset   mapedFileOffset));
135:                 break;
136:             }
137:         }
138: 
139:         // 清理多餘 MappedFile,設定最後一個 MappedFile的寫入位置(position
140:         processOffset  = mapedFileOffset;
141:         this.tranStateTable.truncateDirtyFiles(processOffset);
142: 
143:         // 設定 TanStateTable 最大物理位置(可寫入位置)
144:         this.tranStateTableOffset.set(this.tranStateTable.getMaxOffset() / TSStoreUnitSize);
145:         log.info("recover normal over, transaction state table max offset: {}", this.tranStateTableOffset.get());
146:     }
147: }

3.1.1.5 補充

為什麼 V3.1.5 開始,使用 資料庫 實現【事務狀態】的儲存?如下是來自官方文件的說明,可能是一部分原因:

RocketMQ 這種實現事務方式,沒有通過 KV 儲存做,而是通過 Offset 方式,存在一個顯著缺陷,即通過 Offset 更改資料,會令系統的髒頁過多,需要特別關注。

3.1.2 官方V4.0.0:基於資料庫

倉庫地址:https://github.com/apache/inc…

官方V4.0.0 暫時未完全開源【事務訊息回查】功能,So 我們需要進行一些猜想,可能不一定正確?

?我們來對比【官方V3.1.4:基於檔案】的實現。

TransactionRecord :記錄每條【事務訊息】。類似 TranStateTable