GraphLab:新的面向機器學習的並行框架

NO IMAGE

歡迎訪問我的部落格閱讀:http://androidfuture.com/blog/?p=215 

1.1 GraphLab簡介

在海量資料盛行的今天,大規模平行計算已經隨處可見,尤其是MapReduce框架的出現,促進了平行計算在網際網路海量資料處理中的廣泛應用。而針對海量資料的機器學習對平行計算的效能、開發複雜度等提出了新的挑戰。

機器學習的演算法具有下面兩個特點:資料依賴性強,運算過程各個機器之間要進行頻繁的資料交換;流處理複雜,整個處理過程需要多次迭代,資料的處理條件分支多。

而MapReduce是典型的SIMD模型,Map階段叢集的各臺機器各自完成負載較重的計算過程,資料並行度高,適合完成類似矩陣運算、資料統計等資料獨立性強的計算,而對於機器學習類演算法並行效能不高。

另一個並行實現方案就是採用純MPI(Native MPI)的方式。純MPI實現通過精細的設計將並行任務按照MPI協議分配到叢集機器上,並根據具體應用,在計算過程中進行機器間的資料通訊和同步。純MPI的優點是,可以針對具體的應用,進行深度優化,從而達到很高的並行效能。但純MPI存在的問題是,針對不同的機器學習演算法,需要重寫其資料分配、通訊等實現細節,程式碼重用率低,機器拓展效能差,對程式設計開發人員的要求高,而且優化和除錯成本高。因而,純MPI不適合敏捷的網際網路應用。

為解決機器學習的流處理,Google提出了Pregel框架,Pregel是嚴格的BSP模型,採用“計算-通訊-同步”的模式完成機器學習的資料同步和演算法迭代。Goolge曾稱其80%的程式使用MapReduce完成,20%的程式使用Pregel實現。因而,Pregel是很成熟的機器學習流處理框架,但Google一直沒有將Pregel的具體實現開源,外界對Pregel的模仿實現在效能和穩定性方面都未能達到工業級應用的標準。

2010年,CMU的Select實驗室提出了GraphLab框架,GraphLab是面向機器學習的流處理並行框架[1]。同年, GraphLab基於最初的並行概念實現了1.0版本,在機器學習的流處理並行效能方面得到很大的提升,並引起業界的廣泛關注,在2012年GraphLab升級到2.1版本,進一步優化了其並行模型,尤其對自然圖的並行效能得到顯著改進。

在本章的餘下章節,將詳細介紹GraphLab的並行框架和具體的原始碼實現。

1.2 GraphLab並行框架

GraphLab將資料抽象成Graph結構,將演算法的執行過程抽象成Gather、Apply、Scatter三個步驟。其並行的核心思想是對頂點的切分,以下面的例子作為一個說明。

圖1. Graph對並行思想

示例中,需要完成對V0鄰接頂點的求和計算,序列實現中,V0對其所有的鄰接點進行遍歷,累加求和。而GraphLab中,將頂點V0進行切分,將V0的邊關係以及對應的鄰接點部署在兩臺處理器上,各臺機器上並行進行部分求和運算,然後通過master頂點和mirror頂點的通訊完成最終的計算。

1.2.1 資料模型:Graph

頂點是其最小並行粒度和通訊粒度,邊是機器學習演算法中資料依賴性的表現方式。

對於某個頂點,其被部署到多臺機器,一臺機器作為master頂點,其餘機器上作為mirror。Master作為所有mirror的管理者,負責給mirror安排具體計算任務;mirror作為該頂點在各臺機器上的代理執行者,與master資料的保持同步。

對於某條邊,GraphLab將其唯一部署在某一臺機器上,而對邊關聯的頂點進行多份儲存,解了邊資料量大的問題。

同一臺機器上的所有edge和vertex構成local graph,在每臺機器上,存在本地id到全域性id的對映表。vertex是一個程序上所有執行緒共享的,在平行計算過程中,各個執行緒分攤程序中所有頂點的gather->apply->scatter操作。

下面這個例子說明,GraphLab是怎麼構建Graph的。

圖2 Graph的構建形式

1.2.2 執行模型:Gather-Apply-Scatter

每個頂點每一輪迭代經過gather->apple->scatter三個階段。

1)       Gather階段

工作頂點的邊 (可能是所有邊,也有可能是入邊或者出邊)從領接頂點和自身收集資料,記為gather_data_i,各個邊的資料graphlab會求和,記為sum_data。這一階段對工作頂點、邊都是隻讀的。

2)       Apply階段

Mirror將gather計算的結果sum_data傳送給master頂點,master進行彙總為total。Master利用total和上一步的頂點資料,按照業務需求進行進一步的計算,然後更新master的頂點資料,並同步mirror。Apply階段中,工作頂點可修改,邊不可修改。

3)       Scatter階段

工作頂點更新完成之後,更新邊上的資料,並通知對其有依賴的鄰結頂點更新狀態。這scatter過程中,工作頂點只讀,邊上資料可寫。

在執行模型中,graphlab通過控制三個階段的讀寫許可權來達到互斥的目的。在gather階段只讀,apply對頂點只寫,scatter對邊只寫。平行計算的同步通過master和mirror來實現,mirror相當於每個頂點對外的一個介面人,將複雜的資料通訊抽象成頂點的行為。

下面這個例子說明GraphLab的執行模型:

                                                                           圖3. Gather-Apply-Scatter

1.3 GraphLab的原始碼實現

Graphlab的實現可以分為四層:基礎元件層,抽象層,引擎層,應用層。

 

圖4. GraphLab原始碼結構

1.3.1 基礎元件層

提供Graphlab資料傳輸、多執行緒管理等基礎並行結構的元件模組,下面將主要介紹其通訊、資料序列化、資料交換、多執行緒管理四個功能模組。

1)       通訊(dc_tcp_comm.cpp)

Graphlab基於TCP協議的長連線在機器之間進行資料通訊。在Graphlab初始化階段,所有機器建立連線,將socket資料儲存在std::vector<socket_info> sock 結構中。

Graphlab使用單獨的執行緒來接收和傳送資料,其中接收或傳送都可以配置多個執行緒,預設每個執行緒中負責與64臺機器進行通訊。在接收連線中,tcp_comm基於libevent採用epoll的方式獲取連線到達的通知,效率高。將這部分抽象成以下虛擬碼:

listen();

for(size_t i = 0;i < nprocs; i)

connect(i);

while{

wait_for_connect();

}

in_thread_num=machine_num / proc_per_thread;

  • out_thread_num= machine_num / proc_per_thread;

 

for(每一個執行緒)

{

event_add();

}

for(每一個執行緒)

{

event_add();

}

 

for(每一個執行緒)

{

In_thread.launch(receive_loop);

}

for(每一個執行緒)

{

In_thread.launch(send_loop)

}

 

需要補充的是,Graphlab在資料通訊中,並沒有採用MPI的介面,但在原始碼中封裝了MPI_tools,其用途是在distributed_control::init時,獲取系統引數(包括機器IP和埠)提供兩種方式,一種是系統配置中初始化,一種是通過MPI介面實現(dc_init_from_mpi::init_param_from_mpi)。

2)       資料序列化(oarchive & iarchive)

Oarchive通過過載操作符>>將物件序列化後寫入ostream中,在Graphlab中對於POD( Plain Old Data)和非POD資料區分對待, POD型別的資料直接轉為為char*寫入ostream, 而非POD資料需要使用者實現save方法,否則將丟擲異常。iarchive的過程與oarchive的過程相反。

所有通過rpc傳輸的資料都通過oarchive和iarchive轉化為stream,比如vertex_program, vertex_data。

 

圖5. 資料序列化

3)       資料傳輸流(buffered_stream_send2.cpp)

Oarchive,iarchive是資料序列化的工具, 在實際的傳輸過程中,資料並沒有立即傳送出去,而是快取在buffered_stream_send。

4)       Pthread_tools:

Thread類封裝了lpthread的方法

提供thread_group管理執行緒佇列

封裝了鎖、訊號量、條件變數等同步方法。

1.3.2 抽象層

1)      dc_dist_object是GraphLab對所有分散式物件的一個抽象,其目標是將分散式處理的資料物件對使用者抽象成普通物件,以希望在使用的時候不需要關心其分散式細節。

2)      buffer_exchange是基於dc_dist_object對需要在頂點間交換的資料提供一個容器。

3)      distribute_controller是基於dc_dist_object實現的一個整個分散式系統的控制器,提供了機器資料、頂點關係等全域性資訊。

1.3.3引擎層

1.3.3.1同步引擎

                         圖6. 同步引擎

1) Excange message階段,master接受來⾃自mirror的訊息;

2) Receive Message階段,master接收上一輪Scatter傳送的訊息和mirror傳送的訊息,將有message的master啟用, 對於啟用的頂點,master通知mirror啟用,並將vectex_program同步到mirrors;

3) Gather階段,多執行緒並行gather, 誰先完成,多執行緒並行localgraph中的頂點,mirror將gather的結果到master;

4) Apply階段,master執行apply(apply()),並將apply的結果同步到mirror (sync_vertex_data()).

5)Scatter階段,master和mirror基於新的頂點資料,更新邊上資料,並以signal的形式通知相鄰頂點。

下面這個例子形象地說明了同步引擎的工作過程:

 

圖7. 頂點2的GraphLab執行過程

1.3.3.2非同步引擎

圖8. master和mirror狀態轉移過程

非同步引擎中,每個頂點是訊息驅動的狀態機。

1) 在每一輪執行開始時,Master從全域性的排程器(Sceduler)獲取訊息,獲取訊息後,master獲得鎖,並進入Locking狀態。同時,master通知mirror獲取鎖,進入Locking狀態。

2) master和mirror分別進行Gathering操作,mirror將gathering結果彙報給master,由master完成彙總。

3) master完成applying之後,將結果同步到mirror上。

4) master和mirror獨立的執行scattering,執行完成之後釋放鎖進入None狀態,等待新的任務到來。

5) mirror在scattering狀態時,可能再次接收到來自master的locking請求,這種情況下,mirror在完成scattering之後將不會釋放鎖,而直接進入下一輪任務中。