ActiveMQ:點對點佇列消費者接收不到訊息

NO IMAGE

一.環境說明

  • Windows 1709
  • 阿里雲ECS
    • CentOS 7.4
    • ActiveMQ 5.15.2
    • JDK 1.8
  • IDEA 2017.3
  • Maven 3.5.0

二.問題說明

遠端訊息伺服器使用的是阿里雲ECS,在windows上編寫測試類測試訊息佇列的點對點的通訊,卻發現,無法消費生產者生產的訊息,即接收不到訊息.

三.程式碼

testProducer

  @Test
public void testProducer() throws JMSException {
//建立ConnectionFactory物件.需要制定服務端ip和埠號
//ConnectionFactory是介面,使用其實現類
ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://阿里雲伺服器公網IP地址:61616");
//開啟連線,使用Connection物件的start方法
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//使用Connection物件建立Session物件
/*
引數說明:
arg0:boolean,是否開啟事務
arg1:int,應答模式(1:自動應答,2:手動應答)
當使用事務時,第二個引數無效
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session物件建立一個Destination物件(topic,queue,此處選擇queue)
Queue queue = session.createQueue("testQueue");
//使用Session物件建立一個Producer物件
MessageProducer producer = session.createProducer(queue);
//建立一個Message物件,建立一個TextMessage物件
TextMessage textMessage = session.createTextMessage("生產者1號,為您服務");
//使用Producer物件傳送訊息
producer.send(textMessage);
System.err.println("訊息生產完成");
//關閉資源
producer.close();
session.close();
connection.close();
}

testConsumer

  @Test
public void testConsumer() throws Exception {
//建立ConnectionFactory物件.需要制定服務端ip和埠號
//ConnectionFactory是介面,使用其實現類
ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://阿里雲伺服器公網IP地址:61616");
//開啟連線,使用Connection物件的start方法
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//使用Connection物件建立Session物件
/*
引數說明:
arg0:boolean,是否開啟事務
arg1:int,應答模式(1:自動應答,2:手動應答)
當使用事務時,第二個引數無效
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session物件建立一個Destination物件(topic,queue,此處選擇queue),與傳送方保持一致
Queue queue = session.createQueue("testQueue");
//使用Session物件建立一個Consumer物件
MessageConsumer consumer = session.createConsumer(queue);
//接收訊息
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message;
try {
//列印訊息
System.err.println("訊息接收完成...."   textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
//關閉資源
consumer.close();
connection.close();
session.close();
}

:ConnectionFactory屬於很重的類,不宜重複建立.這裡由於只是做演示,所以未對其進行抽取封裝成工具類,可自行完成

四.解決問題

經測試發現,這樣寫我的消費者無法消費訊息佇列中的訊息,而且setMessageListener中的onMessage方法無法進入,就結束了.後來根據別人指點,才知道onMessage是非同步接收訊息的,所以有個時間差的問題.因為阿里雲是遠端伺服器,無法做到像本機虛擬機器一樣的快速訪問,導致還沒接收訊息,consumer等就被程式close掉了,也就無法消費訊息,接收訊息佇列中的訊息.
所以,為了解決非同步問題,可從以下兩個方向著手:
1. 強制執行緒休眠.等待非同步執行完成

 consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message;
try {
//列印訊息
System.err.println("訊息接收完成...."   textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
//讓執行緒休眠,等待訊息接收完畢
Thread.sleep(5000);

2.使用receive(阻塞式方法,在接到訊息前會一直阻塞著)

        Message message = consumer.receive();
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());

經測試,均可以達到接收訊息的目的


2017/11/20
Lucifer