實戰hadoop海量資料處理系列05 : 實現點選流日誌的資料清洗模組

NO IMAGE

實戰hadoop海量資料處理系列05 : 實現點選流日誌的資料清洗模組

之前已經實現結構化資料的清洗,下一步我們將實現半結構化(非結構化)資料的清洗。

本文假設讀者已搭建好了eclipse環境,並且已經匯入ClickStreamETL資料夾下面的子工程。
如果遇到環境相關的問題,可以在專門的帖子下面留言。

在閱讀本文前,強烈建議閱讀原書“實現點選流日誌的資料清洗模組”章節。

本文的程式碼同步於github,相關地址如下:
github地址
本系列部落格專欄地址

overview

經典重現,引入原書流程圖和map-reduce排序圖說明原理。
為了還原案例,本專案增強了日誌中的cookie,獨有動態圖演示執行過程,並在最後尾部給出開發中的若干思考。
全文脈絡如下:
– 流程圖
– 日誌格式解析
– 排序流程圖
– 重要程式碼解釋
– 執行結果
– 工程實踐思考

1 流程圖

從原書摘取了經典的流程圖,在於說明流程框架的重要性,因為流程貫穿全章節。

2日誌格式解析

先列出一條日誌,樣式如下:

120.196.145.58 - - [11/Dec/2013:10:00:32  0800] "GET /__utm.gif" 200 35 "http://easternmiles.ceair.com/flight/index.html" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36" "BIGipServermu_122.119.122.14=192575354.20480.0000;Webtrends=120.196.145.58.1386724976245806;uuid=sid1;userId=099;st=1" 1482 352 - easternmiles.ceair.com 794

這裡展示的日誌,屬於apache伺服器日誌,保留了原書欄位,本專案在原有cookie內部加入uuid,user id和sesstion time三個欄位,這些欄位將在map階段起重要作用。下面把裡面cookie提取在此,其中uuid ,user id, session time分別是sid1,099,1;

"BIGipServermu_122.119.122.14=192575354.20480.0000;Webtrends=120.196.145.58.1386724976245806;uuid=sid1;userId=099;st=1" 

完整的日誌請見github中 log_3_2_1.txt檔案,本檔案將作為map reduce作業的輸入。

2.1 日誌中獲取的相關欄位

原書提供了非常詳細欄位列表資訊,這裡僅對影響map reduce任務的重要欄位進行說明。

欄位說明
ipaddressip地址, 可從點選流日誌中獲取
receive Time伺服器接收時間, 可從點選流日誌中獲取
url由流日誌中主機地址和請求的一行合成
unique id非重複的id,可從點選流的cookie中提取
session time會話發生的時間,可從點選流的cookie中提取
session id由unique id和session time合成

3 排序流程圖

為了避免資料傾斜,作者對map的key進行了重構,利用unqiue id和session time合成了session id ,其中經典的流程圖如下,一圖勝過萬千程式碼喔 ,已經熟悉本圖的忠實讀者可以略過。

4重要程式碼解釋

程式碼主要分map/reduce/partion/二次排序等,
這裡只貼出reducer,因為本專案對其進行了增強,其他程式碼請讀者檢視原有專案程式碼

4.1 Reducer部分

亮點在程式碼裡,原先作者已經有很好的註釋,改進的地方都用註釋放在該行的尾部。

public class ClickStreamReducer extends Reducer<Text, Text, NullWritable, Text>{
///表示前一個sessionId
public static String preSessionId = "-";//66:改進版把他變為靜態變數
static int csvp = 0;//66:改進版把csvp變為靜態變數,並移到成員函式外面
protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {
String sessionId = key.toString().split("&")[0];        
//如果是第一條資料
if(preSessionId.equals("-")){
csvp = 1;
preSessionId = sessionId;//66:改進版,新加入這句,不然邏輯不完整
}else{
//如果與前一個sessionId相同,說明是同一個session
if(preSessionId.equals(sessionId)){
//累加csvp
csvp  ;
//如果不同,說明是新的session,重置preSessionId 和 csvp
}else{
preSessionId = sessionId;
csvp = 1;
}
}
//按照clickstream_log的格式再末尾加上csvp
String reduceOutValue = values.iterator().next().toString()   "\t"   csvp;
context.write(NullWritable.get(), new Text(reduceOutValue));
};
}

5 執行結果

5.1 map reduce結果

ip          uniqueId    sessionId   SessionTimesReceiveTime  UserId csvp          URL                               ReferUrl                                    
120.196.145.58  sid1        sid1|1      1       1386727232000   099 1       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
120.196.145.58  sid1        sid1|1      1       1386727292000   099 2       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
120.196.145.58  sid1        sid1|1      1       1386727352000   099 3       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
120.196.145.58  sid2        sid2|10     10      1386727412000   199 1       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
120.196.145.58  sid2        sid2|10     10      1386727472000   199 2       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  
120.196.145.58  sid3        sid3|100    100     1386727472000   299 1       easternmiles.ceair.com/__utm.gif   http://easternmiles.ceair.com/flight/index.html  

從結果中可見,對於同一組seesion,按照接收時間(receive time)先後, 可見訪問順序(csvp)按照需要給予標記。

5.2 動態執行圖

時長2分鐘左右,請耐心觀看。

6 小結

實踐了作者的ClickStream作業,還原了過程例子演示的過程。
下一步計劃,結合python呼叫jar檔案完善開發過程。

7 其他 開發中的思考

7.1 處理正規表示式匹配過程的堆疊溢位

預設的堆疊大小在正規表示式匹配的時候不能滿足記憶體需求,報以下錯誤

...
java.lang.Exception: java.lang.StackOverflowError at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.lang.StackOverflowError at java.util.regex.Pattern$CharProperty.match(Pattern.java:3692)
...

解決方法是在啟動jvm時提大stack 的大小(這裡本工程是使用-Xss40m)

7.2 處理DateFormat匹配的細節

在原始碼(05-16)版本中,直接執行有以下問題:

17/06/10 09:36:18 INFO mapred.MapTask: record buffer = 262144/327680
java.text.ParseException: Unparseable date: "[11/Dec/2013:10:00:32  0800]"
at java.text.DateFormat.parse(DateFormat.java:357)
at com.etl.mapreduce.ClickStreamMapper.map(ClickStreamMapper.java:84)

分析觸發問題的程式碼,發現有兩處,一是被處理的日期串還有額外的字元如[ ] ,第二是匹配串裡面沒有處理時區的問題,詳細的解決方案見github相關程式碼ClickStreamMapper.java

7.3 除錯工程經驗

我的開發機是在windows, 部署機是在cent os ,因此存在遠端除錯的問題,該細節將會詳細的總結為一章,請見後續更新