說說 MQ 之 RocketMQ, 順序消費和無序消費

NO IMAGE

RocketMQ 是出自 A 公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並做出了自己的一些改進,訊息可靠性上比 Kafka 更好,目前,RocketMQ 的文件仍然不夠豐富 1 2,社群仍然無法與 Kafka 比肩,但 A 公司已經推出了基於 RocketMQ 的雲產品 3,相信未來 RocketMQ 也會有不錯的發展。本文采用 RocketMQ 3.2.6 進行實驗,由於 RocketMQ 與 Kafka 很相似,本文很多地方對兩者做出了比較。

基本概念

RocketMQ 由於借鑑了 Kafka 的設計,包括元件的命名也很多與 Kafka 相似,下面摘抄一段《RocketMQ 原理簡介》中的介紹,可以與 Kafka 的命名比對一下,

  • Producer,訊息生產者,負責產生訊息,一般由業務系統負責產生訊息。
  • Consumer,訊息消費者,負責消費訊息,一般是後臺系統負責非同步消費。
  • Push Consumer,Consumer 的一種,應用通常向 Consumer 物件註冊一個 Listener 介面,一旦收到訊息,Consumer 物件立 刻回撥 Listener 介面方法。
  • Pull Consumer,Consumer 的一種,應用通常主動呼叫 Consumer 的拉訊息方法從 Broker 拉訊息,主動權由應用控制。
  • Producer Group,一類 Producer 的集合名稱,這類 Producer 通常傳送一類訊息,且傳送邏輯一致。
  • Consumer Group,一類 Consumer 的集合名稱,這類 Consumer 通常消費一類訊息,且消費邏輯一致。
  • Broker,訊息中轉角色,負責儲存訊息,轉發訊息,一般也稱為 Server。在 JMS 規範中稱為 Provider。

《RocketMQ 原理簡介》中還介紹了一些其他的概念,例如,廣播消費和叢集消費,廣播消費是 Consumer Group 中對於同一條訊息每個 Consumer 都消費,叢集消費是 Consumer Group 中對於同一條訊息只有一個 Consumer 消費。Kafka 採用的是叢集消費,不支援廣播消費(好吧,是我沒有找到)。再例如,普通順序訊息和嚴格順序訊息,普通順序訊息在 Broker 重啟情況下不會保證訊息順序性;嚴格順序訊息即使在異常情況下也會保證訊息的順序性。個人理解,所謂普通順序訊息,應該就是 Kafka 中的 Partition 級別有序,嚴格順序訊息,應該是 Topic 級別有序,但文中也提到,這樣的有序級別是要付出代價的,Broker 叢集中只要有一臺機器不可用,則整個叢集都不可用,降低服務可用性。使用這種模式,需要依賴同步雙寫,主備自動切換,但自動切換功能目前還未實現(我猜,自動切換僅僅是沒開源吧)。說白了,嚴格順序訊息不具備生產可用性,自己玩玩還行,其應用場景主要是資料庫 binlog 同步。

關於 RocketMQ 和 Kafka 的對比,可以參考 RocketMQ Wiki 中的文章 4,看看就行,不必較真。

關於順序和分割槽

順序性的話題,剛才已經提到了一些,RocketMQ 的實現應該不弱於 Kafka。對於分割槽,RocketMQ 似乎有意弱化了這個概念,只有在 Producer 中有一個引數 defaultTopicQueueNums,分割槽在 RocketMQ 中有時被稱為佇列。RocketMQ 的普通順序訊息模式,應該就是分割槽順序性,這點與 Kafka 一致。

關於高可用

RocketMQ 實現高可用的方式有多種,《RocketMQ 使用者指南》文件中提到的有:多主模式、多主多從非同步複製模式、多主多從同步複製模式。多主模式下,效能較好,但是在 Broker 宕機的時候,該 Broker 上未消費的交易不可消費;多主多從非同步複製模式,與 Kafka 的副本模式比較類似,主 Broker 宕機後,會自動切換到從 Broker,訊息的消費不會出現間斷;多主多從同步複製模式更進一步,採用同步刷盤的方式,避免了主 Broker 宕機帶來的訊息丟失,但是,目前不支援自動切換。

雖然 RocketMQ 提供了多種高可用方式,但是目前能生產使用的就只有多主多從非同步複製模式,即使在這個模式上,其實現也比 Kafka 要差。因為 RocketMQ 的機制中,主從關係是人為指定的,主 Broker 上承擔所有的訊息派發,而 Kafka 的主從關係是通過選舉的方式選出來的,每個分割槽的主節點都是不一樣的,可以從不同的節點派發訊息。Kafka 的模式是分散模式,有利於負載均衡,而且當一個 Broker 宕機的時候,隻影響部分 Topic,而 RocketMQ 一旦主 Broker 宕機,會影響所有的 Topic。另外,Kafka 可以支援 Broker 間同步複製(通過設定 Broker 的 acks 引數),這樣比的話,RocketMQ 就差太多了。

關於 RocketMQ 的介紹,網上的文章不算太多,也比較雜,《分散式開放訊息系統(RocketMQ)的原理與實踐》5 6 7這篇原理介紹的不錯,推薦。

RocketMQ 的工具和程式設計介面

RocketMQ 的工具

相比較 Kafka 而言,RocketMQ 提供的工具要少一些,如下,

1
2
3
4
5
6
7
8
9
bin/mqadmin
bin/mqbroker
bin/mqbroker.numanode0
bin/mqbroker.numanode1
bin/mqbroker.numanode2
bin/mqbroker.numanode3
bin/mqfiltersrv
bin/mqnamesrv
bin/mqshutdown

除了程序啟停之外,常用的運維命令都在 mqadmin 中,詳見《RocketMQ 運維指令》文件。我實驗中常用的一些命令如下,

1
2
3
4
5
6
7
8
9
10
11
sh mqnamesrv &
sh mqbroker -c async-broker-a.properties &
sh mqbroker -c async-broker-a-s.properties &
sh mqadmin topicList -n 192.168.232.23:9876
sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTestjjj
sh mqadmin clusterList -n 192.168.232.23:9876
sh mqadmin deleteTopic -c DefaultCluster -n 192.168.232.23:9876 -t TopicTestjjj
sh mqadmin consumerProgress -n 192.168.232.23:9876 -g ConsumerGroupNamecc4
sh mqadmin deleteSubGroup -c DefaultCluster -n 192.168.232.23:9876 -g ConsumerGroupNamecc4
sh mqadmin consumerConnection -n 192.168.232.23:9876 -g ConsumerGroupNamecc4

RocketMQ 使用了自己的 name server 來做排程(Kafka 用了 Zookeeper),使用 sh mqnamesrv 來啟動,預設監聽埠9876,sh mqnamesrv -m 可以檢視所有預設引數,使用 -c xxxx.properties 引數來指定自定義配置。sh mqbroker 是用於啟動 Broker 的命令,引數比較多,詳細可以通過 sh mqbroker -m 檢視預設引數,配置項細節後文再說。sh mqadmin 是運維命令入口,topicList 是列出所有 Topic;topicRoute 是列出單個 Topic 的詳細資訊;clusterList 是列出叢集的資訊;deleteTopic 是刪除 Topic。consumerProgress 是檢視消費者消費進度,deleteSubGroup 是刪除消費者的訂閱,consumerConnection 是查詢消費者訂閱的情況。

Broker 的配置是最多的,實驗中我修改到的部分如下,其他使用預設,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
brokerClusterName=DefaultCluster
brokerIP1=192.168.232.23
brokerName=broker-a
brokerId=0
namesrvAddr=192.168.232.23:9876
listenPort=10911
deleteWhen=04
fileReservedTime=120
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlog
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

配置檔案中的多數配置看例子就可以知道意思,挑幾個說一下。brokerName 和 brokerId, 同名的 Broker,ID 是0的是主節點,其他是從節點;deleteWhen,刪除檔案時間點,預設凌晨4點;fileReservedTime,檔案保留時間,設定為120小時;brokerRole,Broker 的角色,ASYNC_MASTER 是非同步複製主節點,SYNC_MASTER 是同步雙寫主節點,SLAVE 是備節點。

其實,這些工具的寫法也基本一致,都是先做一些檢查,最後執行 Java 程式,JVM 系統上的應用應該差不多都這樣。

RocketMQ 的 Java API

RocketMQ 是用 Java 語言開發的,因此,其 Java API 相對是比較豐富的,當然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,

  1. 廣播消費,這個在之前已經提到過;
  2. 訊息過濾,支援簡單的 Message Tag 過濾,也支援按 Message Header、body 過濾;
  3. 順序消費和亂序消費,之前也提到過,這裡的順序消費應該指的是普通順序性,這一點與 Kafka 相同;
  4. Pull 模式消費,這個是相對 Push 模式來說的,Kafka 就是 Pull 模式消費;
  5. 事務訊息,這個好像沒有開源,但是 example 程式碼中有示例,總之,不推薦用;
  6. Tag,RocketMQ 在 Topic 下面又分了一層 Tag,用於表示訊息類別,可以用來過濾,但是順序性還是以 Topic 來看;

單看功能的話,即使不算事務訊息,也不算 Tag,RocketMQ 也遠超 Kafka,Kafka 應該只實現了 Pull 模式消費 順序消費這2個功能。RocketMQ 的程式碼示例在 rocketmq-example 中,注意,程式碼是不能直接執行的,因為所有的程式碼都少了設定 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");

先來看一下生產者的 API,比較簡單,只有一種,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”);
producer.setNamesrvAddr(“192.168.232.23:9876”);
producer.start();
for (int i = 0; i < 10; i )
try {
{
Message msg = new Message(“TopicTest1”,// topic
“TagA”,// tag
“OrderID188”,// key
(“RocketMQ ” String.format(“%05d”, i)).getBytes());// body
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, i));
System.out.println(String.format(“%05d”, i) sendResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}

可以發現,相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產者客戶端,有同步和非同步兩種模式,但都是阻塞模式,send 方法返回傳送狀態的 Future,可以通過 Future 的 get 方法阻塞獲得傳送狀態。而 RocketMQ 採用的是同步非阻塞模式,傳送之後立刻返回傳送狀態(而不是 Future)。正常情況下,兩者使用上差別不大,但是在高可用場景中發生主備切換的時候,Kafka 的同步可以等待切換完成並重連,最後返回;而 RocketMQ 只能立刻報錯,由生產者選擇是否重發。所以,在生產者的 API 上,其實 Kafka 是要強一些的。

另外,RocketMQ 可以通過指定 MessageQueueSelector 類的實現來指定將訊息傳送到哪個分割槽去,Kafka 是通過指定生產者的 partitioner.class 引數來實現的,靈活性上 RocketMQ 略勝一籌。

再來看消費者的API,由於 RocketMQ 的功能比較多,我們先看 Pull 模式消費的API,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class PullConsumer {
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(“please_rename_unique_group_name_5”);
consumer.setNamesrvAddr(“192.168.232.23:9876”);
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(“TopicTest1”);
for (MessageQueue mq : mqs) {
System.out.println(“Consume from the queue: ” mq);
SINGLE_MQ: while (true) {
try {
long offset = consumer.fetchConsumeOffset(mq, true);
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
if (null != pullResult.getMsgFoundList()) {
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
System.out.print(new String(messageExt.getBody()));
System.out.print(pullResult);
System.out.println(messageExt);
}
}
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}

這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分割槽,而 Kafka 可以自動管理(當然也可以手動管理),並且不需要指定分割槽(分割槽是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 介面,提供了兩種管理方式,本地檔案和遠端 Broker。這部分感覺兩者差不多。

下面再看看 Push 模式順序消費,程式碼如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“please_rename_unique_group_name_3”);
consumer.setNamesrvAddr(“192.168.232.23:9876”);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(“TopicTest1”, “TagA || TagC || TagD”);
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.println(Thread.currentThread().getName() ” Receive New Messages: ” msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
}
else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
}
else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
}
else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println(“Consumer Started.”);
}
}

雖然提供了 Push 模式,RocketMQ 內部實際上還是 Pull 模式的 MQ,Push 模式的實現應該採用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個注意的地方,

  1. 接收訊息的監聽類要使用 MessageListenerOrderly
  2. ConsumeFromWhere 有幾個引數,表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;
  3. 可以控制 offset 的提交,應該就是 context.setAutoCommit(false); 的作用;

控制 offset 提交這個特性非常有用,某種程度上擴充套件一下,就可以當做事務來用了,看程式碼 ConsumeMessageOrderlyService 的實現,其實並沒有那麼複雜,在不啟用 AutoCommit 的時候,只有返回 COMMIT 才 commit offset;啟用 AutoCommit 的時候,返回 COMMITROLLBACK(這個比較扯)、SUCCESS 的時候,都 commit offset。

後來發現,commit offset 功能在 Kafka 裡面也有提供,使用新的 API,呼叫 consumer.commitSync

再看一個 Push 模式亂序消費 訊息過濾的例子,消費者的程式碼如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“ConsumerGroupNamecc4”);
consumer.setNamesrvAddr(“192.168.232.23:9876”);
consumer.subscribe(“TopicTest1”, MessageFilterImpl.class.getCanonicalName());
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() ” Receive New Messages: ” msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println(“Consumer Started.”);
}
}

這個例子與之前順序消費不同的地方在於,

  1. 接收訊息的監聽類使用的是 MessageListenerConcurrently
  2. 回撥方法中,使用的是自動 offset commit;
  3. 訂閱的時候增加了訊息過濾類 MessageFilterImpl

訊息過濾類 MessageFilterImpl 的程式碼如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg) {
String property = msg.getUserProperty(“SequenceId”);
if (property != null) {
int id = Integer.parseInt(property);
if ((id % 3) == 0 && (id > 10)) {
return true;
}
}
return false;
}
}

RocketMQ 執行過濾是在 Broker 端,Broker 所在的機器會啟動多個 FilterServer 過濾程序;Consumer 啟動後,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉訊息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到訊息後,按照 Consumer 上傳的 Java 過濾程式做過濾,過濾完成後返回給 Consumer。這種過濾方法可以節省網路流量,但是增加了 Broker 的負擔。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。

還有一種廣播消費模式,比較簡單,可以去看程式碼,不再列出。

總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。

RocketMQ 的主備模式

按之前所說,只有 RocketMQ 的多主多從非同步複製是可以生產使用的,因此只在這個場景下測試。另外,訊息採用 Push 順序模式消費。

假設叢集採用2主2備的模式,需要啟動4個 Broker,配置檔案如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
brokerName=broker-a
brokerId=0
listenPort=10911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlog
brokerRole=ASYNC_MASTER
brokerName=broker-a
brokerId=1
listenPort=10921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlog
brokerRole=SLAVE
brokerName=broker-b
brokerId=0
listenPort=20911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlog
brokerRole=ASYNC_MASTER
brokerRole=ASYNC_MASTER
brokerName=broker-b
brokerId=1
listenPort=20921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlog
brokerRole=SLAVE

另外,每個機構共通的配置項如下,

1
2
3
4
5
6
brokerClusterName=DefaultCluster
brokerIP1=192.168.232.23
namesrvAddr=192.168.232.23:9876
deleteWhen=04
fileReservedTime=120
flushDiskType=ASYNC_FLUSH

其他設定均採用預設。啟動 NameServer 和所有 Broker,並試執行一下 Producer,然後看一下 TestTopic1 當前的情況,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
“brokerDatas”:[
{
“brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″
},
“brokerName”:”broker-b”
},
{
“brokerAddrs”:{0:”192.168.232.23:10911″,1:”192.168.232.23:10921″
},
“brokerName”:”broker-a”
}
],
“filterServerTable”:{},
“queueDatas”:[
{
“brokerName”:”broker-a”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
},
{
“brokerName”:”broker-b”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
}
]
}

可見,TestTopic1 在2個 Broker 上,且每個 Broker 備機也在執行。下面開始主備切換的實驗,分別啟動 Consumer 和 Producer 程序,訊息採用 Pull 順序模式消費。在訊息傳送接收過程中,使用 kill -9 停掉 broker-a 的主程序,模擬突然宕機。此時,TestTopic1 的狀態如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
“brokerDatas”:[
{
“brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″
},
“brokerName”:”broker-b”
},
{
“brokerAddrs”:{1:”192.168.232.23:10921″
},
“brokerName”:”broker-a”
}
],
“filterServerTable”:{},
“queueDatas”:[
{
“brokerName”:”broker-a”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
},
{
“brokerName”:”broker-b”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
}
]
}

broker-a 的節點已經減少為只有1個從節點。然後啟動broker-a 的主節點,模擬恢復,再看一下 TestTopic1 的狀態,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
“brokerDatas”:[
{
“brokerAddrs”:{0:”192.168.232.23:20911″,1:”192.168.232.23:20921″
},
“brokerName”:”broker-b”
},
{
“brokerAddrs”:{0:”192.168.232.23:10911″,1:”192.168.232.23:10921″
},
“brokerName”:”broker-a”
}
],
“filterServerTable”:{},
“queueDatas”:[
{
“brokerName”:”broker-a”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
},
{
“brokerName”:”broker-b”,
“perm”:6,
“readQueueNums”:4,
“topicSynFlag”:0,
“writeQueueNums”:4
}
]
}

此時,RocketMQ 已經恢復。

再來看看 Producer 和 Consumer 的日誌,先看 Producer 的,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
……
00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]
00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]
00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]
00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]
00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]
00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]
00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]
00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]
00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]
00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]
00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]
00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]
00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]
00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]
00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]
00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]
……
01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]
01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]
01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]
01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]
01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]
01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]
01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]
01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]
01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]

日誌中顯示,在傳送完00583條訊息之後,開始發生異常 connect to <192.168.232.23:10911> failed,原因應該是 broker-a 的主節點被 kill 掉。之後,從00596條訊息開始,RocketMQ 又恢復正常,原因是 broker-b 已經開始提供服務,承擔了所有的工作。然後,又重新啟動了 broker-a 主節點,由於該節點的加入,從01392條訊息開始,broker-a 又開始恢復工作。實驗中可以驗證,RocketMQ 所謂的多主多備模式,實際上,備機被弱化到無以復加,在主節點宕機的時候,備機無法接替主機的工作,而只是將尚未傳送的資料傳送出去,由剩下的主節點接替工作。也就是說,N 主 N 備的 RocketMQ 叢集中,總共有 2N 臺機器,實際工作的只有 N 臺,如果有一臺掛了,就只有 N-1 臺工作了,機器的利用率太低了。

再來看一下 Consumer 的日誌,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=697, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00567PullResult [pullStatus=FOUND, nextBeginOffset=698, minOffset=0, maxOffset=698, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=697, sysFlag=0, bornTimestamp=1469175033005, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021533, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078A54, commitLogOffset=494164, bodyCRC=2054744282, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=698, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00575PullResult [pullStatus=FOUND, nextBeginOffset=699, minOffset=0, maxOffset=699, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=698, sysFlag=0, bornTimestamp=1469175033286, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021814, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078CB0, commitLogOffset=494768, bodyCRC=225294519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=699, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00583PullResult [pullStatus=FOUND, nextBeginOffset=700, minOffset=0, maxOffset=700, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=699, sysFlag=0, bornTimestamp=1469175033586, bornHost=/192.168.234.98:51987, storeTimestamp=1469175022113, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078F0C, commitLogOffset=495372, bodyCRC=1670775117, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=700, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00591PullResult [pullStatus=FOUND, nextBeginOffset=701, minOffset=0, maxOffset=701, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=700, sysFlag=0, bornTimestamp=1469175037890, bornHost=/192.168.234.98:51987, storeTimestamp=1469175026418, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079168, commitLogOffset=495976, bodyCRC=344150304, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=701, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00599PullResult [pullStatus=FOUND, nextBeginOffset=702, minOffset=0, maxOffset=702, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=701, sysFlag=0, bornTimestamp=1469175042200, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030734, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000793C4, commitLogOffset=496580, bodyCRC=442030354, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=702, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00603PullResult [pullStatus=FOUND, nextBeginOffset=703, minOffset=0, maxOffset=703, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=702, sysFlag=0, bornTimestamp=1469175042345, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030872, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079620, commitLogOffset=497184, bodyCRC=688469276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=703, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00607PullResult [pullStatus=FOUND, nextBeginOffset=704, minOffset=0, maxOffset=704, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=703, sysFlag=0, bornTimestamp=1469175042481, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031008, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007987C, commitLogOffset=497788, bodyCRC=778367237, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=704, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00611PullResult [pullStatus=FOUND, nextBeginOffset=705, minOffset=0, maxOffset=705, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=704, sysFlag=0, bornTimestamp=1469175042615, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031143, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079AD8, commitLogOffset=498392, bodyCRC=1578919281, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=705, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00615PullResult [pullStatus=FOUND, nextBeginOffset=706, minOffset=0, maxOffset=706, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=705, sysFlag=0, bornTimestamp=1469175042753, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031280, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079D34, commitLogOffset=498996, bodyCRC=1500619112, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=706, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00619PullResult [pullStatus=FOUND, nextBeginOffset=707, minOffset=0, maxOffset=707, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=706, sysFlag=0, bornTimestamp=1469175042887, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031414, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079F90, commitLogOffset=499600, bodyCRC=1355279683, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=707, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00623PullResult [pullStatus=FOUND, nextBeginOffset=708, minOffset=0, maxOffset=708, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=707, sysFlag=0, bornTimestamp=1469175043021, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031548, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A1EC, commitLogOffset=500204, bodyCRC=457136030, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=708, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00627PullResult [pullStatus=FOUND, nextBeginOffset=709, minOffset=0, maxOffset=709, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=708, sysFlag=0, bornTimestamp=1469175043154, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031681, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A448, commitLogOffset=500808, bodyCRC=475173767, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=709, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00631PullResult [pullStatus=FOUND, nextBeginOffset=710, minOffset=0, maxOffset=710, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=709, sysFlag=0, bornTimestamp=1469175043299, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031826, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A6A4, commitLogOffset=501412, bodyCRC=1814693875, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=710, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00635PullResult [pullStatus=FOUND, nextBeginOffset=711, minOffset=0, maxOffset=711, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=710, sysFlag=0, bornTimestamp=1469175043435, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031962, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A900, commitLogOffset=502016, bodyCRC=1799865322, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=711, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)
at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)
at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)
at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)
at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)
at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)
at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)
at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)
at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)
at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)
at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)
at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)
at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)
at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)
at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)
at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468572196808, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191827, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011C60, commitLogOffset=72800, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196876, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011EB0, commitLogOffset=73392, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196903, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191928, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012100, commitLogOffset=73984, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
RocketMQ 00001PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=3, sysFlag=0, bornTimestamp=1468572718149, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713175, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001222B, commitLogOffset=74283, bodyCRC=1133127810, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
RocketMQ 00005PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718178, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713210, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012487, commitLogOffset=74887, bodyCRC=1156050075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
……
[queueId=1, storeSize=151, queueOffset=22, sysFlag=0, bornTimestamp=1469170324786, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313333, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D3AA, commitLogOffset=1102762, bodyCRC=1707898805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00477PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=23, sysFlag=0, bornTimestamp=1469170325237, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313771, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D606, commitLogOffset=1103366, bodyCRC=1654764460, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00481PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=24, sysFlag=0, bornTimestamp=1469170325652, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314163, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D862, commitLogOffset=1103970, bodyCRC=207227478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00485PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=25, sysFlag=0, bornTimestamp=1469170326066, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314595, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010DABE, commitLogOffset=1104574, bodyCRC=188206671, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
……
RocketMQ 01370PullResult [pullStatus=FOUND, nextBeginOffset=895, minOffset=0, maxOffset=895, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=894, sysFlag=0, bornTimestamp=1469175070573, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059101, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095A89, commitLogOffset=613001, bodyCRC=1094080495, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=895, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01374PullResult [pullStatus=FOUND, nextBeginOffset=896, minOffset=0, maxOffset=896, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=895, sysFlag=0, bornTimestamp=1469175070712, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059251, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095CE5, commitLogOffset=613605, bodyCRC=1180406774, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=896, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01378PullResult [pullStatus=FOUND, nextBeginOffset=897, minOffset=0, maxOffset=897, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=896, sysFlag=0, bornTimestamp=1469175070899, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059427, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095F41, commitLogOffset=614209, bodyCRC=1340989405, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=897, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01382PullResult [pullStatus=FOUND, nextBeginOffset=898, minOffset=0, maxOffset=898, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=897, sysFlag=0, bornTimestamp=1469175071054, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059582, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000009619D, commitLogOffset=614813, bodyCRC=681585164, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=898, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01386PullResult [pullStatus=FOUND, nextBeginOffset=899, minOffset=0, maxOffset=899, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=898, sysFlag=0, bornTimestamp=1469175071203, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059731, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000963F9, commitLogOffset=615417, bodyCRC=802024981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=899, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01390PullResult [pullStatus=FOUND, nextBeginOffset=900, minOffset=0, maxOffset=900, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=899, sysFlag=0, bornTimestamp=1469175071338, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059866, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000096655, commitLogOffset=616021, bodyCRC=1605728865, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=900, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468571752640, bornHost=/192.168.234.98:56433, storeTimestamp=1468571747895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011B38, commitLogOffset=72504, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196772, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191803, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011BCC, commitLogOffset=72652, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196865, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191886, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011E1C, commitLogOffset=73244, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=3, sysFlag=0, bornTimestamp=1468572196899, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191917, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001206C, commitLogOffset=73836, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]
RocketMQ 00000PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718127, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713166, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012194, commitLogOffset=74132, bodyCRC=881661972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
RocketMQ 00004PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=5, sysFlag=0, bornTimestamp=1468572718170, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713197, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000000123F0, commitLogOffset=74736, bodyCRC=870374413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]
……
RocketMQ 00560PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=140, sysFlag=0, bornTimestamp=1469175032756, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021285, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126922, commitLogOffset=1206562, bodyCRC=1679588729, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00568PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=141, sysFlag=0, bornTimestamp=1469175033043, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021570, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126B7E, commitLogOffset=1207166, bodyCRC=1791489355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00576PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=142, sysFlag=0, bornTimestamp=1469175033320, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021848, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126DDA, commitLogOffset=1207770, bodyCRC=342157581, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01392PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=143, sysFlag=0, bornTimestamp=1469175071411, bornHost=/192.168.234.98:52034, storeTimestamp=1469175059951, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127036, commitLogOffset=1208374, bodyCRC=834345805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01400PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=144, sysFlag=0, bornTimestamp=1469175071746, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060289, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127292, commitLogOffset=1208978, bodyCRC=188274605, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

可以看到,Consumer 在 broker-a 宕機時間的附近,也出現了異常,connect to <192.168.232.23:10911> failed。雖然還能保持分割槽上的順序性,但是已經某種程度上出現了一些紊亂,例如,將我在實驗之前的資料給取了出來(Hello MetaQ的訊息)。可是,我在實驗前,明明做過刪除這個 Topic 的動作,看來 RocketMQ 所謂的刪除,並未刪除 Topic 的資料。之後,broker-a 主機重啟之後,又恢復正常。

RocketMQ Pull模式消費需要手動管理 offset 和指定分割槽,這個在呼叫的時候不覺得,實際執行的時候才會發現每次總是消費一個分割槽,消費完之後,才開始消費下一個分割槽,而下一個分割槽可能已經堆積了很多訊息了,手動做訊息分配又比較費事。或許,Push 順序模式消費才是更好的選擇。

另外還有幾個比較異常的情況,實驗中有幾次出現了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 這樣的錯誤,實際上,這時候我只是關掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明文件中說可以用來新增 Topic,而實際上不行。

補充一下:之後,我又使用 Push 順序模式消費重做了上述實驗,結論差不多。只是因為有多執行緒的原因,日誌看起來偶爾有錯位,這個問題不大,可以解決。而且,在關閉重啟 Broker 的附近,往往伴隨著多次的訊息重發,不過,RocketMQ 也不保證訊息只收到一次就是了。訊息重複的問題,Kafka 要比 RocketMQ 顯得不那麼嚴重一些。Push 順序模式消費不需要指定 offset,不需要指定分割槽,第二次啟動可以自動從前一次的 offset 後開始消費。功能上這個與 Kafka 的 Consumer 更類似,雖然 RocketMQ 採用的是非同步模式。

RocketMQ 最佳實踐

實際上,RocketMQ 自己就有一份《RocketMQ 最佳實踐》的文件,裡面提到了一些系統設計的問題,例如消費者要冪等,一個應用對應一個 Topic,如此等等。這些經驗不僅僅是對 RocketMQ 有用,對 Kafka 也頗有借鑑意義。

後記

這裡談談我對選擇 RocketMQ 還是 Kafka 的個人建議。以上已經做了多處 RocketMQ 和 Kafka 的對比,我個人覺得,Kafka 是一個不斷發展中的系統,開源社群比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經有了同步複製,訊息可靠性更有保障;還有,Kafka 的分割槽機制,幾乎實現了自動負載均衡,這絕對是個殺手級特性;RocketMQ 雖然提供了很多易用的功能,遠超出 Kafka,但這些功能並不一定都能用得上,而且多數可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復的時候,細節上要勝過 RocketMQ。當然,如果是 A 公司內部,或者所在公司使用了 A 公司的雲產品,那麼 RocketMQ 的企業級特性更多一些,或許我會選擇 RocketMQ。