當Parallel遇上了DISpring並行數據聚合最佳實踐

NO IMAGE

分析淘寶PDP

讓我們先看個圖, Taobao的PDP(Product Detail Page)頁.

當Parallel遇上了DISpring並行數據聚合最佳實踐

打開Chrome Network面板, 讓我們來看taobao是怎麼加載這個頁面數據的. 根據經驗, 一般是異步加載的, 要麼是XHR,要麼就是js(jsonp), 你應該很快可以找到

當Parallel遇上了DISpring並行數據聚合最佳實踐

還能看到這個接口的性能

當Parallel遇上了DISpring並行數據聚合最佳實踐

神奇的是, taobao竟然在一次請求中拉下了整個PDP頁的完整數據, 而且服務端處理耗時不到125ms

首先, 這麼做有什麼好處?

  • 前後端開發對接簡單
  • 在一次網絡連接中儘可能多的傳輸數據(數據大小要不影響用戶體驗, 一般不會超過300kb), 減少建立連接的次數和請求頭浪費的流量.

然後, 這又是怎麼做到的呢?

你可能會說緩存, 但你要知道, 這樣一個對電商極為重要的頁面, 絕對涉及到了非常多的團隊, 比如:

  • 商品團隊
  • 賣家團隊
  • 評價團隊
  • 訂單團隊
  • 會員團隊
  • 優惠團隊
  • 問答團隊
  • 推薦團隊
  • 物流系統
  • etc/等等

即使每個團隊的數據全都是緩存的, 你一個個去拿, 要在125ms內拿完也不容易. 而且作為跟錢相關的頁面, 部分數據必須保證絕對實時有效, 能用緩存的地方不多. 怎麼辦, 如果是你, 你會怎麼做? 離線打標? 數據預熱? etc..

此時, 並行調用不失為一種好辦法.

分析一下這個頁面, 你會發現, 每一個模塊除了屬於同一個商品(入參相同), 其實各個模塊的數據之間, 並沒有依賴性, 完全可以並行去獲取.

並行就沒有問題了嗎?

並行獲取數據, 可以提高我們的接口性能. 但也會引入一些問題, 如:

  • 依賴的項可能很多, 怎麼使代碼簡潔清晰?
  • 依賴關係很可能是一個有向圖, 如果做到有向圖中的每個節點都可以並行執行?
  • 異步處理後, 超時怎麼處理? 業務代碼拋出異常了怎麼處理?
  • 依賴關係如果有死循環怎麼辦?
  • 異步之後, ThreadLocal中的內容怎麼處理? 一些基於ThreadLocal實現的Context不work怎麼辦?
  • 事務被線程隔離了怎麼辦?
  • 如何監控每一次異步執行, 每個節點的性能?

下面, 我們來討論下如何簡單\易用\高效的並行獲取數據; 如何解決上述異步問題.

常見的並行方式

假如你現在需要用戶的基礎信息\博客列表\粉絲列表 3份數據. 哪麼你有哪些方式可以並行獲取呢?

Java ThreadPool並行

最簡單原始的辦法, 直接使用Java提供了的線程池和Future機制.

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(3);
Future<User> userFuture = executorService.submit(() -> {
try{
return userService.get(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<Post>> postsFuture = executorService.submit(() -> {
try{
return postService.getPosts(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<User>> followersFuture = executorService.submit(() -> {
try{
return followService.getFollowers(userId);
}finally {
countDownLatch.countDown();
}
});
countDownLatch.await();
User user = userFuture.get();
user.setFollowers(followersFuture.get());
user.setPosts(postsFuture.get());
return user;
}

Spring的異步並行

我們知道, Spring支持@Async註解, 可以方便的實現異步, 並且支持獲取返回值. 參考: www.baeldung.com/spring-asyn…

@Async實現的原理實際是在Bean的代理類的方法中, 攔截方法調用, 向taskExecutor Bean中提交Callable任務. 原理跟自己用Java ThreadPool寫其實區別不大.

那麼要用Spring Async實現上述功能. 首先需要修改下面3個方法的返回值, 並且修改返回值類型, 併為方法添加 @Async註解

class UserServiceImpl implements UserService {
@Async
public Future<User> get(Long userId) {
// ... something
}
}
class PostServiceImpl implements PostService {
@Async
public Future<List<Post> getPosts(Long userId) {
// ... something
}
}
class FollowServiceImpl implements FollowService {
@Async
public Future<List<User> getFollowers(Long userId) {
// ... something
}
}

並行獲取3份用戶數據然後聚合, 代碼如下:

public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
Future<User> userFuture = userService.get(userId);
Future<List<Post>> postsFuture = postService.getPosts(userId);
Future<List<User>> followersFuture = followService.getFollowers(userId);
User user = whileGet(userFuture);
user.setFollowers(whileGet(followersFuture));
user.setPosts(whileGet(postsFuture));
return user;
}
private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException {
while(true) {
if (future.isDone()) {
break;
}
}
return future.get();
}

這裡使用自旋去獲取異步數據. 當然你也可以像前面那樣, 傳遞一個閉鎖(CountDownLatch)到Service中去, 然後讓主調線程在一個閉鎖上面等待.

並行結合DI(依賴注入)

上面2種方式的確能實現功能, 但首先, 它們都很不直觀, 而且沒有處理前面講到的異步問題, 一旦出現超時\異常\ThreadLocal, 代碼可能不會按照你預期的方式工作. 那有沒有更簡單方便可靠的方法呢?

試想這樣一種方式, 如果你需要的數據, 都可以通過方法入參自動並行獲取, 然後傳遞給你, 那是不是很方便? 就像這樣:

@Component
public class UserAggregate {
@DataProvider("userWithPosts")
public User userWithPosts(
@DataConsumer("user") User user,
@DataConsumer("posts") List<Post> posts,
@DataConsumer("followers") List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
}

這裡的@DataConsumer聲明瞭你要異步獲取的數據id. @DataProvider聲明瞭這個方法提供數據, 並且id為userWithPosts.

或者你不想寫這樣一個Aggregate類, 你不需要複用, 你想直接創建一個”匿名Provider”. 那麼你可以直接在任何地方像下面這樣調用拿結果

User user = dataBeanAggregateQueryFacade.get(
Collections.singletonMap("userId", 1L), 
new Function3<User, List<Post>,List<User>, User>() {
@Override
public User apply(@DataConsumer("user") User user, 
@DataConsumer("posts") List<Post> posts,
@DataConsumer("followers") List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
});
Assert.notNull(user,"user not null");
Assert.notNull(user.getPosts(),"user posts not null");

這裡的Function3接收4個泛型參數, 最後一個User表示返回值類型, 前3個參數依次對應apply方法的3個入參類型. 項目預定義了Function2-Function5, 支持不超過5個參數, 如果你需要更多參數, 可以編寫一個接口(FunctionInterface), 繼承MultipleArgumentsFunction接口即可.

很顯然

  • 每一個 @DataConsumer 只會對應一個 @DataProvider .
  • 一個 @DataProvider 可能被多個 @DataConsumer 消費 .
  • 一個 @DataProvider 通過多個 @DataConsumer 依賴上多個 @DataProvider.

現在, 就有這樣一個項目, 實現了上述功能. 只需要在你的方法上, 添加一些註解. 就可以迅速地讓你的調用樹轉為並行.

項目地址: github.com/lvyahui8/sp…

你不用care底層如何實現. 只有在你有定製化的需求時, 才去關心一些配置參數. 去擴展一些能力.

實現原理

當Parallel遇上了DISpring並行數據聚合最佳實踐

  1. 在Spring啟動之時, 掃描應用中的 @DataProvider@DataConsumer 註解. 分析記錄下依賴關係(有向非連通圖), 並且記錄好@DataProvider和Spring Bean的映射關係.
  2. 當進行查詢時, 從已經記錄好的依賴關係中拿出依賴樹, 使用線程池和閉鎖(CountLatchDown), 遞歸異步調用孩子節點對應的Bean方法, 拿到結果後作為入參注入當前節點 (近似廣度優先, 但因為並行的原因, 節點的訪問順序是不確定的).
  3. 在發起遞歸調用前, 傳入進一個map, 用來存放查詢參數, 方法中沒有@DataConsumer註解的入參, 將從此map中取值.
  4. @DataProvider@DataConsumer 註解可以支持一些參數, 用來控制超時時間\異常處理方式\是否冪等緩存等等.

怎麼解決並行/異步後引入的新問題

超時怎麼控制 ?

@DataProvider 註解支持 timeout 參數, 用來控制超時. 實現原理是通過閉鎖的超時等待方法.

java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)

異常怎麼處理 ?

對異常提供兩種處理方式: 吞沒或者向上層拋出.

@DataConsumer 註解支持exceptionProcessingMethod 參數, 用來表示這個Consumer想怎麼處理Provider拋出的異常.

當然, 也支持在全局維度配置. 全局配置的優先級低於(<)Consumer配置的優先級.

依賴關係有死循環怎麼辦 ?

Spring Bean初始化, 因為Bean創建和Bean屬性賦值分了兩步走, 因此可以用所謂的”早期引用”解決循環依賴的問題.

但如果你循環依賴的Bean, 依賴關係定義在構造函數入參上, 那麼是沒法解決循環依賴的問題的.

同理, 我們通過方法入參, 異步注入依賴數據, 在方法入參沒有變化的情況下, 也是無法結束死循環的. 因此必須禁止循環依賴.

那麼問題變為了怎麼禁止循環依賴. 或者說, 怎麼檢測有向非聯通圖中的循環依賴, 兩個辦法:

  • 帶染色的DFS遍歷: 節點入棧訪問前, 先標記節點狀態為”訪問中”, 之後遞歸訪問孩子節點, 遞歸完成後, 將節點標記為”訪問完成”. **如果在DFS遞歸過程中, 再次訪問到”訪問中”的節點, 說明有環. **
  • 拓撲排序: 把有向圖的節點排成一個序列, 不存在索引號較高的節點指向索引號較低的節點, 表示圖存在拓撲排序. 拓撲排序的實現方法是, 先刪除入度為0的節點, 並將領接節點的入度 – 1, 直到所有節點都被刪除. 很顯然, 如果有向圖中有環, 那麼環裡節點的入度不可能為0 , 那麼節點不可能刪完. 因此, 只要滿足節點未刪完 && 不存在入度為0的節點, 那麼一定有環.

這裡我們用領接表+DFS染色搜索, 來實現環的檢查

private void checkCycle(Map<String,Set<String>> graphAdjMap) {
Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
if (visitStatusMap.containsKey(item.getKey())) {
continue;
}
dfs(graphAdjMap,visitStatusMap,item.getKey());
}
}
private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
if (visitStatusMap.containsKey(node)) {
if(visitStatusMap.get(node) == 1) {
List<String> relatedNodes = new ArrayList<>();
for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
if (item.getValue() == 1) {
relatedNodes.add(item.getKey());
}
}
throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes));
}
return ;
}
visitStatusMap.put(node,1);
log.info("visited:{}", node);
for (String relateNode : graphAdjMap.get(node)) {
dfs(graphAdjMap,visitStatusMap,relateNode);
}
visitStatusMap.put(node,2);
}

ThreadLocal怎麼處理?

許多的框架都使用了ThreadLocal來實現Context來保存單次請求中的一些共享數據, Spring也不例外.

眾所周知, ThreadLocal實際是訪問Thread中一個特殊Map的入口. ThreadLocal只能訪問當前Thread的數據(副本), 如果跨越了線程, 是拿不到到其他ThreadLocalMap的數據的.

解決方法

當Parallel遇上了DISpring並行數據聚合最佳實踐

如圖

  1. 在當前線程提交異步任務前, 將當前線程ThreadLocal執行的數據”捆綁”到任務實例中
  2. 當任務開始執行時, 從任務實例中取出數據, 恢復到當前異步線程的ThreadLocal中
  3. 當任務結束後, 清理當前異步線程的ThreadLocal.

這裡, 我們先定義一個接口, 來描述這3個動作

public interface AsyncQueryTaskWrapper {
/**
* 任務提交之前執行. 此方法在提交任務的那個線程中執行
*/
void beforeSubmit();
/**
* 任務開始執行前執行. 此方法在異步線程中執行
* @param taskFrom 提交任務的那個線程
*/
void beforeExecute(Thread taskFrom);
/**
* 任務執行結束後執行. 此方法在異步線程中執行
* 注意, 不管用戶的方法拋出何種異常, 此方法都會執行.
* @param taskFrom 提交任務的那個線程
*/
void afterExecute(Thread taskFrom);
}

為了讓我們定義的3個動作起作用. 我們需要重寫一下 java.util.concurrent.Callable#call方法.

public abstract class AsyncQueryTask<T> implements Callable<T> {
Thread      taskFromThread;
AsyncQueryTaskWrapper asyncQueryTaskWrapper;
public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
this.taskFromThread = taskFromThread;
this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
}
@Override
public T call() throws Exception {
try {
if(asyncQueryTaskWrapper != null) {
asyncQueryTaskWrapper.beforeExecute(taskFromThread);
}
return execute();
} finally {
if (asyncQueryTaskWrapper != null) {
asyncQueryTaskWrapper.afterExecute(taskFromThread);
}
}
}
/**
* 提交任務時, 業務方實現這個替代方法
*
* @return
* @throws Exception
*/
public abstract T  execute() throws Exception;
}

接下來, 向線程池提交任務時, 不再直接提交Callable匿名類實例, 而是提交AsyncQueryTask實例. 並且在提交前觸發 taskWrapper.beforeSubmit();

AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper();
// 任務提交前執行動作.
taskWrapper.beforeSubmit();
Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
@Override
public Object execute() throws Exception {
try {
// something to do
} finally {
stopDownLatch.countDown();
}
}
});
你要做什麼?

你只需要定義一個類, 實現這個接口, 並將這個類加到配置文件中去.

@Slf4j
public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper {
/**
* "捆綁" 在任務實例中的數據
*/
private Long tenantId;
private User user;
@Override
public void beforeSubmit() {
/* 提交任務前, 先從當前線程拷貝出ThreadLocal中的數據到任務中 */
log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName());
this.tenantId = RequestContext.getTenantId();
this.user = ExampleAppContext.getUser();
}
@Override
public void beforeExecute(Thread taskFrom) {
/* 任務提交後, 執行前, 在異步線程中用數據恢復ThreadLocal(Context) */
log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
RequestContext.setTenantId(tenantId);
ExampleAppContext.setLoggedUser(user);
}
@Override
public void afterExecute(Thread taskFrom) {
/* 任務執行完成後, 清理異步線程中的ThreadLocal(Context) */
log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
RequestContext.removeTenantId();
ExampleAppContext.remove();
}
}

添加配置使TaskWapper生效.

io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper

怎麼監控每一次的異步調用?

解決辦法

我們先把一次查詢, 分為以下幾個生命週期

  • 查詢任務初次提交 (querySubmitted)
  • 某一個Provider節點開始執行前 (queryBefore)
  • 某一個Provider節點執行完成後 (queryAfter)
  • 查詢全部完成 (queryFinished)
  • 查詢異常 (exceptionHandle)

轉換成接口如下

public interface AggregateQueryInterceptor {
/**
* 查詢正常提交, Context已經創建
*
* @param aggregationContext 查詢上下文
* @return 返回為true才繼續執行
*/
boolean querySubmitted(AggregationContext aggregationContext) ;
/**
* 每個Provider方法執行前, 將調用此方法. 存在併發調用
*
* @param aggregationContext 查詢上下文
* @param provideDefinition 將被執行的Provider
*/
void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition);
/**
* 每個Provider方法執行成功之後, 調用此方法. 存在併發調用
*
* @param aggregationContext 查詢上下文
* @param provideDefinition 被執行的Provider
* @param result 查詢結果
* @return 返回結果, 如不修改不, 請直接返回參數中的result
*/
Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result);
/**
* 每個Provider執行時, 如果拋出異常, 將調用此方法. 存在併發調用
*
* @param aggregationContext  查詢上下文
* @param provideDefinition 被執行的Provider
* @param e Provider拋出的異常
*/
void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e);
/**
* 一次查詢全部完成.
*
* @param aggregationContext 查詢上下文
*/
void queryFinished(AggregationContext aggregationContext);
}

在Spring應用啟動之初, 獲取所有實現了AggregateQueryInterceptor接口的Bean, 並按照Order註解排序, 作為攔截器鏈.

至於攔截器如何執行. 很簡單, 在遞歸提交查詢任務時, 插入執行一些鉤子(hook)函數即可. 涉及到的代碼很多, 就不貼在這裡, 感興趣的可以去github clone代碼查看.

你要做什麼?

你可以實現一個攔截器, 在攔截器中輸出日誌, 監控節點執行狀態(耗時, 出入參), 如下:

@Component
@Order(2)
@Slf4j
public class SampleAggregateQueryInterceptor implements AggregateQueryInterceptor {
@Override
public boolean querySubmitted(AggregationContext aggregationContext) {
log.info("begin query. root:{}",aggregationContext.getRootProvideDefinition().getMethod().getName());
return true;
}
@Override
public void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition) {
log.info("query before. provider:{}",provideDefinition.getMethod().getName());
}
@Override
public Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result) {
log.info("query after. provider:{},result:{}",provideDefinition.getMethod().getName(),result.toString());
return result;
}
@Override
public void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e) {
log.error(e.getMessage());
}
@Override
public void queryFinished(AggregationContext aggregationContext) {
log.info("query finish. root: {}",aggregationContext.getRootProvideDefinition().getMethod().getName());
}
}

項目地址

最後, 再次貼一下項目地址: github.com/lvyahui8/sp… .

歡迎拍磚, 歡迎star, 歡迎使用

相關文章

ObjectiveC基礎之五(Runtime之Class結構解析)

徹底理解IaaS、PaaS、SaaS

【一分鐘系列】一分鐘瞭解git常用操作

架構師,怎樣才能搞定上下游客戶?