分散式訊息佇列 RocketMQ原始碼解析:Message儲存

NO IMAGE

???關注微信公眾號:【芋艿的後端小屋】有福利:

RocketMQ / MyCAT / Sharding-JDBC 所有原始碼分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文註釋原始碼 GitHub 地址

您對於原始碼的疑問每條留言將得到認真回覆。甚至不知道如何讀原始碼也可以請教噢

新的原始碼解析文章實時收到通知。每週更新一篇左右


1、概述
2、CommitLog 結構

3、CommitLog 儲存訊息

CommitLog#putMessage(…)
MappedFileQueue#getLastMappedFile(…)
MappedFile#appendMessage(…)
DefaultAppendMessageCallback#doAppend(…)

FlushCommitLogService

MappedFile#落盤
FlushRealTimeService
CommitRealTimeService
GroupCommitService

結尾

1、概述

本文接《RocketMQ 原始碼分析 —— Message 傳送與接收》
主要解析 CommitLog


CommitLog

說明 :儲存訊息,並返回儲存結果。
第 2 行 :設定儲存時間等。
第 16 至 36 行 :事務訊息相關,暫未了解。
第 45 & 97 行 :獲取鎖與釋放鎖。
第 52 行 :再次設定儲存時間。目前會有多處地方設定儲存時間。
第 55 至 62 行 :獲取 MappedFile

說明 :獲取最後一個 MappedFile

第 30 至 35 行 :設定 MappedFile

說明 :插入訊息MappedFile

說明 :插入訊息到位元組緩衝區。
第 45 行 :計算物理位置。在 CommitLog

commit相關程式碼:

考慮到寫入效能,滿足 commitLeastPages * OS_PAGE_SIZE

FlushRealTimeService

訊息插入成功時,非同步刷盤時使用。

  1: class FlushRealTimeService extends FlushCommitLogService {
2:     /**
3:      * 最後flush時間戳
4:      */
5:     private long lastFlushTimestamp = 0;
6:     /**
7:      * print計時器。
8:      * 滿足print次數時,呼叫{@link #printFlushProgress()}
9:      */
10:     private long printTimes = 0;
11: 
12:     public void run() {
13:         CommitLog.log.info(this.getServiceName()   " service started");
14: 
15:         while (!this.isStopped()) {
16:             boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
17:             int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
18:             int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
19:             int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
20: 
21:             // Print flush progress
22:             // 當時間滿足flushPhysicQueueThoroughInterval時,即使寫入的數量不足flushPhysicQueueLeastPages,也進行flush
23:             boolean printFlushProgress = false;
24:             long currentTimeMillis = System.currentTimeMillis();
25:             if (currentTimeMillis >= (this.lastFlushTimestamp   flushPhysicQueueThoroughInterval)) {
26:                 this.lastFlushTimestamp = currentTimeMillis;
27:                 flushPhysicQueueLeastPages = 0;
28:                 printFlushProgress = (printTimes   % 10) == 0;
29:             }
30: 
31:             try {
32:                 // 等待執行
33:                 if (flushCommitLogTimed) {
34:                     Thread.sleep(interval);
35:                 } else {
36:                     this.waitForRunning(interval);
37:                 }
38: 
39:                 if (printFlushProgress) {
40:                     this.printFlushProgress();
41:                 }
42: 
43:                 // flush commitLog
44:                 long begin = System.currentTimeMillis();
45:                 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
46:                 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
47:                 if (storeTimestamp > 0) {
48:                     CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
49:                 }
50:                 long past = System.currentTimeMillis() - begin;
51:                 if (past > 500) {
52:                     log.info("Flush data to disk costs {} ms", past);
53:                 }
54:             } catch (Throwable e) {
55:                 CommitLog.log.warn(this.getServiceName()   " service has exception. ", e);
56:                 this.printFlushProgress();
57:             }
58:         }
59: 
60:         // Normal shutdown, to ensure that all the flush before exit
61:         boolean result = false;
62:         for (int i = 0; i < RETRY_TIMES_OVER && !result; i  ) {
63:             result = CommitLog.this.mappedFileQueue.flush(0);
64:             CommitLog.log.info(this.getServiceName()   " service shutdown, retry "   (i   1)   " times "   (result ? "OK" : "Not OK"));
65:         }
66: 
67:         this.printFlushProgress();
68: 
69:         CommitLog.log.info(this.getServiceName()   " service end");
70:     }
71: 
72:     @Override
73:     public String getServiceName() {
74:         return FlushRealTimeService.class.getSimpleName();
75:     }
76: 
77:     private void printFlushProgress() {
78:         // CommitLog.log.info("how much disk fall behind memory, "
79:         //   CommitLog.this.mappedFileQueue.howMuchFallBehind());
80:     }
81: 
82:     @Override
83:     @SuppressWarnings("SpellCheckingInspection")
84:     public long getJointime() {
85:         return 1000 * 60 * 5;
86:     }
87: }

說明:實時 flush

GroupCommitService

訊息插入成功時,同步刷盤時使用。

  1: class GroupCommitService extends FlushCommitLogService {
2:     /**
3:      * 寫入請求佇列
4:      */
5:     private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<>();
6:     /**
7:      * 讀取請求佇列
8:      */
9:     private volatile List<GroupCommitRequest> requestsRead = new ArrayList<>();
10: 
11:     /**
12:      * 新增寫入請求
13:      *
14:      * @param request 寫入請求
15:      */
16:     public synchronized void putRequest(final GroupCommitRequest request) {
17:         // 新增寫入請求
18:         synchronized (this.requestsWrite) {
19:             this.requestsWrite.add(request);
20:         }
21:         // 切換讀寫佇列
22:         if (hasNotified.compareAndSet(false, true)) {
23:             waitPoint.countDown(); // notify
24:         }
25:     }
26: 
27:     /**
28:      * 切換讀寫佇列
29:      */
30:     private void swapRequests() {
31:         List<GroupCommitRequest> tmp = this.requestsWrite;
32:         this.requestsWrite = this.requestsRead;
33:         this.requestsRead = tmp;
34:     }
35: 
36:     private void doCommit() {
37:         synchronized (this.requestsRead) {
38:             if (!this.requestsRead.isEmpty()) {
39:                 for (GroupCommitRequest req : this.requestsRead) {
40:                     // There may be a message in the next file, so a maximum of
41:                     // two times the flush (可能批量提交的messages,分佈在兩個MappedFile)
42:                     boolean flushOK = false;
43:                     for (int i = 0; i < 2 && !flushOK; i  ) {
44:                         // 是否滿足需要flush條件,即請求的offset超過flush的offset
45:                         flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
46:                         if (!flushOK) {
47:                             CommitLog.this.mappedFileQueue.flush(0);
48:                         }
49:                     }
50:                     // 喚醒等待請求
51:                     req.wakeupCustomer(flushOK);
52:                 }
53: 
54:                 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
55:                 if (storeTimestamp > 0) {
56:                     CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
57:                 }
58: 
59:                 // 清理讀取佇列
60:                 this.requestsRead.clear();
61:             } else {
62:                 // Because of individual messages is set to not sync flush, it
63:                 // will come to this process 不合法的請求,比如message上未設定isWaitStoreMsgOK。
64:                 // 走到此處的邏輯,相當於傳送一條訊息,落盤一條訊息,實際無批量提交的效果。
65:                 CommitLog.this.mappedFileQueue.flush(0);
66:             }
67:         }
68:     }
69: 
70:     public void run() {
71:         CommitLog.log.info(this.getServiceName()   " service started");
72: 
73:         while (!this.isStopped()) {
74:             try {
75:                 this.waitForRunning(10);
76:                 this.doCommit();
77:             } catch (Exception e) {
78:                 CommitLog.log.warn(this.getServiceName()   " service has exception. ", e);
79:             }
80:         }
81: 
82:         // Under normal circumstances shutdown, wait for the arrival of the
83:         // request, and then flush
84:         try {
85:             Thread.sleep(10);
86:         } catch (InterruptedException e) {
87:             CommitLog.log.warn("GroupCommitService Exception, ", e);
88:         }
89: 
90:         synchronized (this) {
91:             this.swapRequests();
92:         }
93: 
94:         this.doCommit();
95: 
96:         CommitLog.log.info(this.getServiceName()   " service end");
97:     }
98: 
99:     /**
100:      * 每次執行完,切換讀寫佇列
101:      */
102:     @Override
103:     protected void onWaitEnd() {
104:         this.swapRequests();
105:     }
106: 
107:     @Override
108:     public String getServiceName() {
109:         return GroupCommitService.class.getSimpleName();
110:     }
111: 
112:     @Override
113:     public long getJointime() {
114:         return 1000 * 60 * 5;
115:     }
116: }

說明:批量寫入執行緒服務。
第 16 至 25 行 :新增寫入請求。方法設定了 sync 的原因:this.requestsWrite 會和 this.requestsRead 不斷交換,無法保證穩定的同步。
第 27 至 34 行 :讀寫佇列交換。

第 38 至 60 行 :迴圈寫入佇列,進行 flush

第 43 行 :考慮到有可能每次迴圈的訊息寫入的訊息,可能分佈在兩個 MappedFile(寫第N個訊息時,MappedFile 已滿,建立了一個新的),所以需要有迴圈2次。
第 51 行 :喚醒等待寫入請求執行緒,通過 CountDownLatch 實現。

第 61 至 66 行 :直接刷盤。此處是由於傳送的訊息的 isWaitStoreMsgOK 未設定成 TRUE ,導致未走批量提交。
第 73 至 80 行 :每 10ms 執行一次批量提交。當然,如果 wakeup() 時,則會立即進行一次批量提交。當 Broker 設定成同步落盤 && 訊息 isWaitStoreMsgOK=true,訊息需要略大於 10ms 才能傳送成功。當然,效能相對非同步落盤較差,可靠性更高,需要我們在實際使用時去取捨。

結尾

寫的第二篇與RocketMQ原始碼相關的博文,看到有閱讀、點贊、收藏甚至訂閱,很受鼓舞。

《Message儲存》比起《Message傳送&接收》從難度上說是更大的,當然也是更有趣的,如果存在理解錯誤或者表達不清晰,還請大家多多包含。如果可以的話,還請麻煩新增 QQ:7685413 進行指出,避免自己的理解錯誤,給大家造成困擾。

推薦《Kafka設計解析(六)- Kafka高效能架構之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的說法:高一個喜馬拉雅山。?認真啃讀《Linux核心設計與實現(原書第3版)》,day day up。

再次感謝大家的閱讀、點贊、收藏。

下一篇:《RocketMQ 原始碼分析 —— Message 拉取與消費》 起航!