NO IMAGE

使用場景介紹

1、發版時經常需要不停機發版,遇到mq消費者,消費一半停機就會出現訊息丟失(沒有使用手動確認的情況)

2、例如支付場景,準時支付、超過未支付將執行不同的方案,其中超時未支付可以看做一個延時訊息。

3、例如滴滴、淘寶的自動評價都是類似場景(不一定是用的什麼技術)

我是發版的情況遇到了

可能有人會問了,mq支援手動確認啊,為什麼不使用確認機制呢?
1、由於用的是Spring 的RabbitListener註解,無法使用手動確認機制
2、也是最主要的原因是我們使用mq的場景

(類似於假如我做一個事情需要兩步完成,每一步完成都會收錢。那麼就會出現,第一步完成後,停機了,假如使用訊息重發就會造成浪費第一步完成的錢)

實現方式

開始想到的是發版時,可以將訊息發到佇列裡,然後消費者不要馬上消費,等一定的時間再來消費這一訊息。

在網上找資料找到兩種實現方式:

1、使用外掛

在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列,而我們公司mq是單獨管理的,所以這種方式直接pass

2、使用兩個特性

AMQP和RabbitMQ本身沒有直接支援延遲佇列功能,但是可以通過以下特性模擬出延遲佇列的功能。但是我們可以通過RabbitMQ的兩個特性來曲線實現延遲佇列。

Time To Live(TTL)

RabbitMQ可以針對Queue和Message設定 x-message-tt,來控制訊息的生存時間,如果超時,則訊息變為dead letter
RabbitMQ針對佇列中的訊息過期時間有兩種方法可以設定。

A: 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。
B: 對訊息進行單獨設定,每條訊息TTL可以不同。

如果同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead letter

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由。

x-dead-letter-exchange:出現dead letter之後將dead letter重新傳送到指定exchange
x-dead-letter-routing-key:指定routing-key傳送
佇列出現dead letter的情況有:

訊息或者佇列的TTL過期
佇列達到最大長度
訊息被消費端拒絕(basic.reject or basic.nack)並且requeue=false

利用DLX,當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。這時候訊息就可以重新被消費。

示例程式碼

初始化過期佇列的程式碼

@Bean
public Queue signQueueStore() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", exchange);
arguments.put("x-dead-letter-routing-key", signKey);
arguments.put("x-message-ttl",300000);
Queue queue = new Queue(signQueueStore,true,false,false,arguments);
System.out.println("arguments :"   queue.getArguments());
return queue;
}

繫結過期佇列的程式碼

@Bean
public Binding  signStoreBinding() {
return BindingBuilder.bind(signQueueStore()).to(defaultExchange()).with(signKeyStore);
}

初始化正常佇列的程式碼和繫結普通佇列的程式碼

@Bean
public Queue signQueue() {
return new Queue(signQueue);
}
@Bean
public Binding signBinding(Queue signQueue, DirectExchange defaultExchange) {
/** 將佇列繫結到交換機 */
return BindingBuilder.bind(signQueue).to(defaultExchange).with(signKey);
}

其他傳送與消費不變

在重啟時將訊息傳送到過期佇列,在重啟完成後,訊息傳送到正常佇列,過期佇列訊息過期後會自動路由到正常佇列進行消費。(可以設定一個標誌,判斷該標誌位重啟狀態,則發到過期佇列,為正常狀態傳送到正常佇列)

參考文獻

rabbitmq 實現延遲佇列的兩種方式