Java應用beanstalkd訊息佇列

應用場景

最近做一個專案,處理每一個從佇列收到的訊息都要去獲取一個鎖(使用Redis實現的分散式鎖),如果沒有獲取到鎖,也不能把這個訊息給丟了,那可不可以把這個沒有獲取到鎖的訊息再發回佇列?

如果是用Kafka的話,訊息一發回佇列,馬上就消費,然而這時候還是獲取不到鎖,進入死迴圈了,影響效能。考慮把這些訊息給存起來,但又很繁瑣。
這裡寫圖片描述

如圖,每個消費者收到訊息後準備進行業務處理時都會去獲取分散式鎖資源,獲得鎖就正常執行業務流程,如果獲取不到鎖就設定一個delay時間,將訊息再次發回佇列,
等到達delay時間之後再次消費訊息。

beanstalkd正好滿足這一需求,而且輕量級,沒有Kafka那麼重。
參考beanstalkd的狀態機的變化,這裡不再贅述。

開始使用beanstalkd

啟動beanstalk服務:

beanstalkd -l 127.0.0.1 -p 11300

引入Java客戶端Maven依賴,還有其他的Java client包,可自行選用:

  <dependency>
<groupId>com.dinstone</groupId>
<artifactId>beanstalkc</artifactId>
<version>2.2.0</version>
</dependency>
生產者
public class BeanstalkProducer {
public static void main(String[] args) {
Configuration config = new Configuration();
config.setServiceHost("127.0.0.1");
config.setServicePort(11300);
BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
JobProducer producer = factory.createJobProducer("beanstalkd-demo");
String msg = "hello, beanstalkd";
// 返回job id
long p = producer.putJob(100, 0, 5, msg.getBytes());
System.out.println(p);
}
}

傳送訊息的程式碼putJob:

public long putJob(int priority, int delay, int ttr, byte[] data);
  • priority:優先順序
  • delay:延遲多長時間開始執行,單位秒
  • ttr: 單位秒,為consumer操作設定的reserve超時時間,如果consumer在這個ttr時間裡沒有完成job並將job delete掉,那這個job就會重新被遷回ready狀態,再次供消費者執行

消費者

reserveJob
public class BeanstalkConsumer {
public static void main(String[] args) {
Configuration config = new Configuration();
config.setServiceHost("127.0.0.1");
config.setServicePort(11300);
BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
JobConsumer consumer = factory.createJobConsumer("beanstalkd-demo");
while (true) {
// reserveJob 有一個超時時間引數,單位是秒,表示獲取訊息最多花費多長時間
Job job = consumer.reserveJob(3);
if (Objects.isNull(job)) {
continue;
}
System.out.println(job.getId());
System.out.println(new String(job.getData()));
//            consumer.deleteJob(job.getId());
}
}
}

consumer程式碼最後註釋掉了 consumer.deleteJob(job.getId()) ,沒有將訊息delete掉,這個job將會一直從reserve狀態到ready狀態,beanstalkd會認為consumer沒有在ttr時間之內完成job,而且這個操作的頻繁執行很耗效能。大量的這種操作會導致你的CPU使用率一下就上去了。 所以consumer完成了job之後,就將job delete掉,如果業務程式碼在完成job時出現異常,也要在try catch Exception中將job給delete掉,然後就可以開始報警了 ^~^

releaseJob
public class BeanstalkConsumerRelease {
public static void main(String[] args) {
Configuration config = new Configuration();
config.setServiceHost("127.0.0.1");
config.setServicePort(11300);
BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
JobConsumer consumer = factory.createJobConsumer("beanstalkd-demo");
while (true) {
Job job = consumer.reserveJob(3);
if (Objects.isNull(job)) {
continue;
}
System.out.println(job.getId());
System.out.println(new String(job.getData()));
consumer.releaseJob(job.getId(), 99, 5);
}
}
}

consumer.releaseJob(long id, int priority, int delay),將訊息從reserved狀態遷移到delay狀態,延遲(指定的延遲時間)之後job變成ready狀態供消費者繼續消費。 在我的專案中正好利用了這一特性解決了一開始描述的那個問題。