分散式事務柔性事務解決方案:可靠訊息最終一致性(非同步確保型) —— 三、生產者實戰

分散式事務柔性事務解決方案:可靠訊息最終一致性(非同步確保型) —— 三、生產者實戰

建議簡單看看上一篇文章再往下閱讀

我們的專案就基於這個模型:

這裡寫圖片描述

接下來就到了我們的實戰時刻~

專案基於spring cloud編寫,沒有spring cloud基礎看起來可能有一點點費力。

準備階段:定義可靠訊息介面

package com.anur.messageapi.api;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.Map;
/**
* Created by Anur IjuoKaruKas on 2018/5/8
*/
public interface TransactionMsgApi {
/**
* 預傳送訊息,先將訊息儲存到訊息中心
*/
@RequestMapping(value = "prepare", method = RequestMethod.GET)
int prepareMsg(
@RequestParam("id") String id,
@RequestParam("msg") String msg,
@RequestParam("routingKey") String routingKey,
@RequestParam("exchange") String exchange,
@RequestParam("paramMap") String paramMap,
@RequestParam("artist") String artist);
/**
* 生產者確認訊息可投遞
*/
@RequestMapping(value = "confirm", method = RequestMethod.GET)
int confirmMsgToSend(@RequestParam("id") String id, @RequestParam("caller") String caller);
/**
* 向佇列投遞訊息
*/
@RequestMapping(value = "send", method = RequestMethod.GET)
void sendMsg(@RequestParam("id") String id);
/**
* 消費者確認消費成功
*/
@RequestMapping(value = "ack", method = RequestMethod.GET)
int acknowledgement(@RequestParam("id") String id,
@RequestParam("artist") String artist);
}

我們先忽略後面的兩個介面,先看第一個,一共有六個引數

  • id:訊息的id,這個設計其實很自由,可以在可靠訊息服務中生成,也可以在生產者端生成,本專案選擇在生產者端生成。
  • msg:訊息的主體,可以是普通的字串,也可以是物件
  • routingKey:路由鍵,傳送訊息時用(不懂的可以去看看MQ基礎
  • exchange:交換器,傳送訊息時用(不懂的可以去看看MQ基礎
  • paramMap:可靠訊息服務回查時用,比如說我一個訊息傳送到可靠訊息服務,結果沒確認,可靠訊息服務就根據這個paramMap進行訊息的回查,向生產者查詢這個業務到底執行成功了沒。
  • artist:回撥(回查)地址,在springCloud中,其實就是serverName

具體場景解析:訂單服務

一、建立預傳送訊息,並將其儲存到資料庫

我們首先生成一條訊息,我們往paramMap中指定了,我們這個訂單的訂單id是orderId,訊息內容我瞎寫的,這條訊息要儲存到資料庫(它的作用是保證訊息一定被可靠訊息接收並持久化)

        String routingKey = "test.key.testing";
Map<String, String> map = new HashMap<>();
String orderId = UUID.randomUUID().toString()   System.currentTimeMillis();
map.put("id", orderId);
String mapStr = JSON.toJSONString(map);
TestMsg testMsg = new TestMsg();
testMsg.setContent("這是一條測試訊息");
String testMsgStr = JSON.toJSONString(testMsg);
// ===============================
// 要儲存到資料庫(它的作用是保證訊息一定被可靠訊息接收並持久化)
PrepareMsg prepareMsg = prepareMsgService.genMsg(orderId, testMsgStr, routingKey, Constant.TEST_EXCHANGE, mapStr);

二、非同步傳送這條訊息,將其標記為預傳送

非同步傳送了一條【預傳送】訊息給訊息可靠訊息服務

        Future<Integer> future = prepareMsgService.prepareMsg(prepareMsg);
// 下面是prepareMsg的實現
@Async
@Override
public Future<Integer> prepareMsg(PrepareMsg prepareMsg) {
// 呼叫我們剛才在【準備階段】定義的介面
int result = transactionMsgService.prepareMsg(prepareMsg.getId(), prepareMsg.getMsg(), prepareMsg.getRoutingKey(), prepareMsg.getExchange(), prepareMsg.getParamMap(), artistConfiguration.getArtist());
// 如果呼叫成功,刪除剛才本地儲存的資料庫
if (result == 1) {
prepareMsgMapper.deleteByPrimaryKey(prepareMsg.getId());
}
return new AsyncResult<>(result);
}

三、執行業務

你可以把下面那些想象成處理訂單狀態,上面的這個步驟是有事務的,也就是說:

  • 如果執行失敗,我們的可靠訊息服務只會收到一條預傳送的訊息,保證了操作的原子性。
  • 或者執行成功,但沒有及時向可靠訊息服務傳送,這種情況往下看,先忽略它。
        ///////////// 事務
ProviderOrder providerOrder = new ProviderOrder();
providerOrder.setId(orderId);
providerOrderService.save(providerOrder);
///////////// 事務

四、非同步告知可靠訊息服務,業務處理成功,將剛才預傳送的訊息標記為待傳送

       // 確認訊息可以被髮送
if (future.get() == 1) {
prepareMsgService.confirmMsgToSend(orderId, this.getClass().getSimpleName());
}

Extra、異常情況

1、執行成功,但沒有及時向可靠訊息服務傳送通知。

這時候我們的artist和paramMap就發揮作用了,我們的可靠訊息服務,可以拿著這兩個東西,定時向生產者查詢那些沒有被標記為【待傳送】的訊息。比如說這樣:

        // 這裡是可靠訊息服務
String url = String.format("http://%s/check?", transactionMsg.getCreater());
Map<String, String> paramMap = JSON.parseObject(transactionMsg.getParamMap(), new TypeReference<HashMap<String, String>>() {
});
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> stringStringEntry : paramMap.entrySet()) {
sb.append(stringStringEntry.getKey()).append("=").append(stringStringEntry.getValue()).append("&");
}
sb.deleteCharAt(sb.length() - 1);
// 結果為true,代表這條訊息的業務執行成功了,可自助將訊息狀態標記為【待傳送】
// 反之執行失敗
resultBoolean = restTemplate.getForObject(url   sb, boolean.class);
2、執行失敗,也沒有及時向可靠訊息服務傳送通知。

這個情況並不影響,因為可靠訊息服務會回查,發現訊息沒有執行成功,不會將訊息投遞出去。

這裡要注意,每條訊息最好設定一個查詢次數的限制

3、預傳送失敗,業務執行成功

這時候我們在第一步事先儲存的訊息就發揮作用了,這裡只要寫一個定時任務,向可靠訊息服務定時投遞即可。這裡要注意可靠訊息服務的冪等性。

由於訊息id是由生產者指定,所以即使可靠訊息服務收到了重複的建立【預傳送】的訊息,插入資料庫也是會失敗的。

    @Scheduled(cron = "*/1 * * * * *")
public void checkPrepareMsg() {
List<PrepareMsg> prepareMsgList = prepareMsgService.getUnConfirmList();
if (prepareMsgList.size() > 0) {
System.out.println("訊息重發中");
}
for (PrepareMsg prepareMsg : prepareMsgList) {
prepareMsgService.prepareMsg(prepareMsg);
}
}

Github – > 可靠訊息服務 example