分散式訊息佇列-RocketMQ原始碼解析:Message傳送&接收

NO IMAGE

???關注微信公眾號:【芋艿的後端小屋】有福利:

RocketMQ / MyCAT / Sharding-JDBC 所有原始碼分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文註釋原始碼 GitHub 地址

您對於原始碼的疑問每條留言將得到認真回覆。甚至不知道如何讀原始碼也可以請教噢

新的原始碼解析文章實時收到通知。每週更新一篇左右


1、概述

2、Producer 傳送訊息

DefaultMQProducer#send(Message)

DefaultMQProducerImpl#sendDefaultImpl()

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

MQFaultStrategy

MQFaultStrategy
LatencyFaultTolerance
LatencyFaultToleranceImpl
FaultItem

DefaultMQProducerImpl#sendKernelImpl()

3、Broker 接收訊息

SendMessageProcessor#sendMessage

AbstractSendMessageProcessor#msgCheck

DefaultMessageStore#putMessage

4、某種結尾

1、概述

Producer

說明:傳送同步訊息,DefaultMQProducer#send(Message)

說明 :傳送訊息。步驟:獲取訊息路由資訊,選擇要傳送到的訊息佇列,執行訊息傳送核心方法,並對傳送結果進行封裝返回。
第 1 至 7 行:對sendsendDefaultImpl(...)

說明 :獲得 Topic釋出資訊。優先從快取topicPublishInfoTable

說明 :Producer

LatencyFaultTolerance

  1: public interface LatencyFaultTolerance<T> {
2: 
3:     /**
4:      * 更新對應的延遲和不可用時長
5:      *
6:      * @param name 物件
7:      * @param currentLatency 延遲
8:      * @param notAvailableDuration 不可用時長
9:      */
10:     void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
11: 
12:     /**
13:      * 物件是否可用
14:      *
15:      * @param name 物件
16:      * @return 是否可用
17:      */
18:     boolean isAvailable(final T name);
19: 
20:     /**
21:      * 移除物件
22:      *
23:      * @param name 物件
24:      */
25:     void remove(final T name);
26: 
27:     /**
28:      * 獲取一個物件
29:      *
30:      * @return 物件
31:      */
32:     T pickOneAtLeast();
33: }

說明 :延遲故障容錯介面

LatencyFaultToleranceImpl

  1: public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
2: 
3:     /**
4:      * 物件故障資訊Table
5:      */
6:     private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
7:     /**
8:      * 物件選擇Index
9:      * @see #pickOneAtLeast()
10:      */
11:     private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
12: 
13:     @Override
14:     public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
15:         FaultItem old = this.faultItemTable.get(name);
16:         if (null == old) {
17:             // 建立物件
18:             final FaultItem faultItem = new FaultItem(name);
19:             faultItem.setCurrentLatency(currentLatency);
20:             faultItem.setStartTimestamp(System.currentTimeMillis()   notAvailableDuration);
21:             // 更新物件
22:             old = this.faultItemTable.putIfAbsent(name, faultItem);
23:             if (old != null) {
24:                 old.setCurrentLatency(currentLatency);
25:                 old.setStartTimestamp(System.currentTimeMillis()   notAvailableDuration);
26:             }
27:         } else { // 更新物件
28:             old.setCurrentLatency(currentLatency);
29:             old.setStartTimestamp(System.currentTimeMillis()   notAvailableDuration);
30:         }
31:     }
32: 
33:     @Override
34:     public boolean isAvailable(final String name) {
35:         final FaultItem faultItem = this.faultItemTable.get(name);
36:         if (faultItem != null) {
37:             return faultItem.isAvailable();
38:         }
39:         return true;
40:     }
41: 
42:     @Override
43:     public void remove(final String name) {
44:         this.faultItemTable.remove(name);
45:     }
46: 
47:     /**
48:      * 選擇一個相對優秀的物件
49:      *
50:      * @return 物件
51:      */
52:     @Override
53:     public String pickOneAtLeast() {
54:         // 建立陣列
55:         final Enumeration<FaultItem> elements = this.faultItemTable.elements();
56:         List<FaultItem> tmpList = new LinkedList<>();
57:         while (elements.hasMoreElements()) {
58:             final FaultItem faultItem = elements.nextElement();
59:             tmpList.add(faultItem);
60:         }
61:         //
62:         if (!tmpList.isEmpty()) {
63:             // 打亂   排序。TODO 疑問:應該只能二選一。猜測Collections.shuffle(tmpList)去掉。
64:             Collections.shuffle(tmpList);
65:             Collections.sort(tmpList);
66:             // 選擇順序在前一半的物件
67:             final int half = tmpList.size() / 2;
68:             if (half <= 0) {
69:                 return tmpList.get(0).getName();
70:             } else {
71:                 final int i = this.whichItemWorst.getAndIncrement() % half;
72:                 return tmpList.get(i).getName();
73:             }
74:         }
75:         return null;
76:     }
77: }

說明 :延遲故障容錯實現。維護每個物件的資訊。

FaultItem

  1: class FaultItem implements Comparable<FaultItem> {
2:     /**
3:      * 物件名
4:      */
5:     private final String name;
6:     /**
7:      * 延遲
8:      */
9:     private volatile long currentLatency;
10:     /**
11:      * 開始可用時間
12:      */
13:     private volatile long startTimestamp;
14: 
15:     public FaultItem(final String name) {
16:         this.name = name;
17:     }
18: 
19:     /**
20:      * 比較物件
21:      * 可用性 > 延遲 > 開始可用時間
22:      *
23:      * @param other other
24:      * @return 升序
25:      */
26:     @Override
27:     public int compareTo(final FaultItem other) {
28:         if (this.isAvailable() != other.isAvailable()) {
29:             if (this.isAvailable())
30:                 return -1;
31: 
32:             if (other.isAvailable())
33:                 return 1;
34:         }
35: 
36:         if (this.currentLatency < other.currentLatency)
37:             return -1;
38:         else if (this.currentLatency > other.currentLatency) {
39:             return 1;
40:         }
41: 
42:         if (this.startTimestamp < other.startTimestamp)
43:             return -1;
44:         else if (this.startTimestamp > other.startTimestamp) {
45:             return 1;
46:         }
47: 
48:         return 0;
49:     }
50: 
51:     /**
52:      * 是否可用:當開始可用時間大於當前時間
53:      *
54:      * @return 是否可用
55:      */
56:     public boolean isAvailable() {
57:         return (System.currentTimeMillis() - startTimestamp) >= 0;
58:     }
59: 
60:     @Override
61:     public int hashCode() {
62:         int result = getName() != null ? getName().hashCode() : 0;
63:         result = 31 * result   (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
64:         result = 31 * result   (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
65:         return result;
66:     }
67: 
68:     @Override
69:     public boolean equals(final Object o) {
70:         if (this == o)
71:             return true;
72:         if (!(o instanceof FaultItem))
73:             return false;
74: 
75:         final FaultItem faultItem = (FaultItem) o;
76: 
77:         if (getCurrentLatency() != faultItem.getCurrentLatency())
78:             return false;
79:         if (getStartTimestamp() != faultItem.getStartTimestamp())
80:             return false;
81:         return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
82: 
83:     }
84: }

說明 :物件故障資訊。維護物件的名字、延遲、開始可用的時間。

DefaultMQProducerImpl#sendKernelImpl()

  1: private SendResult sendKernelImpl(final Message msg, //
2:     final MessageQueue mq, //
3:     final CommunicationMode communicationMode, //
4:     final SendCallback sendCallback, //
5:     final TopicPublishInfo topicPublishInfo, //
6:     final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
7:     // 獲取 broker地址
8:     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
9:     if (null == brokerAddr) {
10:         tryToFindTopicPublishInfo(mq.getTopic());
11:         brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
12:     }
13:     //
14:     SendMessageContext context = null;
15:     if (brokerAddr != null) {
16:         // 是否使用broker vip通道。broker會開啟兩個埠對外服務。
17:         brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
18:         byte[] prevBody = msg.getBody(); // 記錄訊息內容。下面邏輯可能改變訊息內容,例如訊息壓縮。
19:         try {
20:             // 設定唯一編號
21:             MessageClientIDSetter.setUniqID(msg);
22:             // 訊息壓縮
23:             int sysFlag = 0;
24:             if (this.tryToCompressMessage(msg)) {
25:                 sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
26:             }
27:             // 事務
28:             final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
29:             if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
30:                 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
31:             }
32:             // hook:傳送訊息校驗
33:             if (hasCheckForbiddenHook()) {
34:                 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
35:                 checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
36:                 checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
37:                 checkForbiddenContext.setCommunicationMode(communicationMode);
38:                 checkForbiddenContext.setBrokerAddr(brokerAddr);
39:                 checkForbiddenContext.setMessage(msg);
40:                 checkForbiddenContext.setMq(mq);
41:                 checkForbiddenContext.setUnitMode(this.isUnitMode());
42:                 this.executeCheckForbiddenHook(checkForbiddenContext);
43:             }
44:             // hook:傳送訊息前邏輯
45:             if (this.hasSendMessageHook()) {
46:                 context = new SendMessageContext();
47:                 context.setProducer(this);
48:                 context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
49:                 context.setCommunicationMode(communicationMode);
50:                 context.setBornHost(this.defaultMQProducer.getClientIP());
51:                 context.setBrokerAddr(brokerAddr);
52:                 context.setMessage(msg);
53:                 context.setMq(mq);
54:                 String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
55:                 if (isTrans != null && isTrans.equals("true")) {
56:                     context.setMsgType(MessageType.Trans_Msg_Half);
57:                 }
58:                 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
59:                     context.setMsgType(MessageType.Delay_Msg);
60:                 }
61:                 this.executeSendMessageHookBefore(context);
62:             }
63:             // 構建傳送訊息請求
64:             SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
65:             requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
66:             requestHeader.setTopic(msg.getTopic());
67:             requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
68:             requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
69:             requestHeader.setQueueId(mq.getQueueId());
70:             requestHeader.setSysFlag(sysFlag);
71:             requestHeader.setBornTimestamp(System.currentTimeMillis());
72:             requestHeader.setFlag(msg.getFlag());
73:             requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
74:             requestHeader.setReconsumeTimes(0);
75:             requestHeader.setUnitMode(this.isUnitMode());
76:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 訊息重發Topic
77:                 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
78:                 if (reconsumeTimes != null) {
79:                     requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
80:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
81:                 }
82:                 String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
83:                 if (maxReconsumeTimes != null) {
84:                     requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
85:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
86:                 }
87:             }
88:             // 傳送訊息
89:             SendResult sendResult = null;
90:             switch (communicationMode) {
91:                 case ASYNC:
92:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
93:                         brokerAddr, // 1
94:                         mq.getBrokerName(), // 2
95:                         msg, // 3
96:                         requestHeader, // 4
97:                         timeout, // 5
98:                         communicationMode, // 6
99:                         sendCallback, // 7
100:                         topicPublishInfo, // 8
101:                         this.mQClientFactory, // 9
102:                         this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
103:                         context, //
104:                         this);
105:                     break;
106:                 case ONEWAY:
107:                 case SYNC:
108:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
109:                         brokerAddr,
110:                         mq.getBrokerName(),
111:                         msg,
112:                         requestHeader,
113:                         timeout,
114:                         communicationMode,
115:                         context,
116:                         this);
117:                     break;
118:                 default:
119:                     assert false;
120:                     break;
121:             }
122:             // hook:傳送訊息後邏輯
123:             if (this.hasSendMessageHook()) {
124:                 context.setSendResult(sendResult);
125:                 this.executeSendMessageHookAfter(context);
126:             }
127:             // 返回傳送結果
128:             return sendResult;
129:         } catch (RemotingException e) {
130:             if (this.hasSendMessageHook()) {
131:                 context.setException(e);
132:                 this.executeSendMessageHookAfter(context);
133:             }
134:             throw e;
135:         } catch (MQBrokerException e) {
136:             if (this.hasSendMessageHook()) {
137:                 context.setException(e);
138:                 this.executeSendMessageHookAfter(context);
139:             }
140:             throw e;
141:         } catch (InterruptedException e) {
142:             if (this.hasSendMessageHook()) {
143:                 context.setException(e);
144:                 this.executeSendMessageHookAfter(context);
145:             }
146:             throw e;
147:         } finally {
148:             msg.setBody(prevBody);
149:         }
150:     }
151:     // broker為空丟擲異常
152:     throw new MQClientException("The broker["   mq.getBrokerName()   "] not exist", null);
153: }

說明 :傳送訊息核心方法。該方法真正發起網路請求,傳送訊息給 Broker

#processRequest()

說明:校驗訊息是否正確,主要是Topic配置方面,例如:Broker

說明:儲存訊息封裝,最終儲存需要 CommitLog 實現。
第 7 至 27 行 :校驗 Broker 是否可以寫入。
第 29 至 39 行 :訊息格式與大小校驗。
第 47 行 :呼叫 CommitLong 進行儲存,詳細邏輯見:《RocketMQ 原始碼分析 —— Message 儲存》

4、某種結尾

感謝閱讀、收藏、點贊本文的工程師同學。

閱讀原始碼是件令自己很愉悅的事情,編寫原始碼解析是讓自己腦細胞死傷無數的過程,痛並快樂著。

如果有內容寫的存在錯誤,或是不清晰的地方,見笑了,?。歡迎加 QQ:7685413 我們一起探討,共進步。

再次感謝閱讀、收藏、點贊本文的工程師同學。