第一個MapReduce程式——WordCount

通常我們在學習一門語言的時候,寫的第一個程式就是Hello World。而在學習Hadoop時,我們要寫的第一個程式就是詞頻統計WordCount程式。

一、MapReduce簡介

1.1 MapReduce程式設計模型

MapReduce採用”分而治之”的思想,把對大規模資料集的操作,分發給一個主節點管理下的各個分節點共同完成,然後通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是”任務的分解與結果的彙總”。

在Hadoop中,用於執行MapReduce任務的機器角色有兩個:

  • JobTracker用於排程工作的,一個Hadoop叢集中只有一個JobTracker,位於master。
  • TaskTracker用於執行工作,位於各slave上。

在分散式計算中,MapReduce框架負責處理了並行程式設計中分散式儲存、工作排程、負載均衡、容錯均衡、容錯處理以及網路通訊等複雜問題,把處理過程高度抽象為兩個函式:map和reduce,map負責把任務分解成多個任務,reduce負責把分解後多工處理的結果彙總起來。

需要注意的是,用MapReduce來處理的資料集(或任務)必須具備這樣的特點:待處理的資料集可以分解成許多小的資料集,而且每一個小資料集都可以完全並行地進行處理。

1.2 MapReduce工作過程

對於一個MR任務,它的輸入、輸出以及中間結果都是<key, value>鍵值對:

  • Map:<k1, v1> ——> list(<k2, v2>)
  • Reduce:<k2, list(v2)> ——> list(<k3, v3>)

MR程式的執行過程主要分為三步:Map階段、Shuffle階段、Reduce階段,如下圖:

  1. Map階段

    • 分片(Split):map階段的輸入通常是HDFS上檔案,在執行Mapper前,FileInputFormat會將輸入檔案分割成多個split ——1個split至少包含1個HDFS的Block(預設為64M);然後每一個分片執行一個map進行處理。

    • 執行(Map):對輸入分片中的每個鍵值對呼叫map()函式進行運算,然後輸出一個結果鍵值對。

      • Partitioner:對map()的輸出進行partition,即根據key或value及reduce的數量來決定當前的這對鍵值對最終應該交由哪個reduce處理。預設是對key雜湊後再以reduce task數量取模,預設的取模方式只是為了避免資料傾斜。然後該key/value對以及partitionIdx的結果都會被寫入環形緩衝區。
    • 溢寫(Spill):map輸出寫在記憶體中的環形緩衝區,預設當緩衝區滿80%,啟動溢寫執行緒,將緩衝的資料寫出到磁碟。

      • Sort:在溢寫到磁碟之前,使用快排對緩衝區資料按照partitionIdx, key排序。(每個partitionIdx表示一個分割槽,一個分割槽對應一個reduce)
      • Combiner:如果設定了Combiner,那麼在Sort之後,還會對具有相同key的鍵值對進行合併,減少溢寫到磁碟的資料量。
    • 合併(Merge):溢寫可能會生成多個檔案,這時需要將多個檔案合併成一個檔案。合併的過程中會不斷地進行 sort & combine 操作,最後合併成了一個已分割槽且已排序的檔案。

  2. Shuffle階段:廣義上Shuffle階段橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和merge/sort過程。通常認為Shuffle階段就是將map的輸出作為reduce的輸入的過程

    • Copy過程:Reduce端啟動一些copy執行緒,通過HTTP方式將map端輸出檔案中屬於自己的部分拉取到本地。Reduce會從多個map端拉取資料,並且每個map的資料都是有序的。

    • Merge過程:Copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區比較大;當緩衝區資料量達到一定閾值時,將資料溢寫到磁碟(與map端類似,溢寫過程會執行 sort & combine)。如果生成了多個溢寫檔案,它們會被merge成一個有序的最終檔案。這個過程也會不停地執行 sort & combine 操作。

  3. Reduce階段:Shuffle階段最終生成了一個有序的檔案作為Reduce的輸入,對於該檔案中的每一個鍵值對呼叫reduce()方法,並將結果寫到HDFS。

二、執行WordCount程式

在執行程式之前,需要先搭建好Hadoop叢集環境,參考《Hadoop HBase ZooKeeper分散式叢集環境搭建》。

2.1 原始碼

WordCount可以說是最簡單的MapReduce程式了,只包含三個檔案:一個 Map 的 Java 檔案,一個 Reduce 的 Java 檔案,一個負責呼叫的主程式 Java 檔案。

我們在當前使用者的主資料夾下建立wordcount_01/目錄,在該目錄下再建立src/classes/。 src 目錄存放 Java 的原始碼,classes 目錄存放編譯結果。

TokenizerMapper.java

package com.lisong.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

IntSumReducer.java

package com.lisong.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = 0;
for(IntWritable val:values) {
sum  = val.get();
}
result.set(sum);
context.write(key,result);
}
}

WordCount.java

package com.lisong.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
} 
}

以上三個.java原始檔均置於 src 目錄下。

2.2 編譯

Hadoop 2.x 版本中jar不再集中在一個 hadoop-core-*.jar 中,而是分成多個 jar。編譯WordCount程式需要如下三個 jar:

$HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar

使用javac命令進行編譯:

$ cd wordcount_01
$ javac -classpath /home/hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar -d classes/ src/*.java
  • -classpath,設定原始碼裡使用的各種類庫所在的路徑,多個路徑用":"隔開。
  • -d,設定編譯後的 class 檔案儲存的路徑。
  • src/*.java,待編譯的原始檔。

2.3 打包

將編譯好的 class 檔案打包成 Jar 包,jar 命令是 JDK 的打包命令列工具。

$ jar -cvf wordcount.jar classes

打包結果是 wordcount.jar 檔案,放在當前目錄下。

2.4 執行

執行hadoop程式的時候,輸入檔案必須先放入hdfs檔案系統中,不能是本地檔案。

1 . 先檢視hdfs檔案系統的根目錄:

$ hadoop/bin/hadoop fs -ls /
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2015-07-28 14:38 /hbase

可以看出,hdfs的根目錄是一個叫/hbase的目錄。

2 . 然後利用put將輸入檔案(多個輸入檔案位於input資料夾下)複製到hdfs檔案系統中:

$ hadoop/bin/hadoop fs -put input /hbase

3 . 執行wordcount程式

$ hadoop/bin/hadoop jar wordcount_01/wordcount.jar WordCount /hbase/input /hbase/output

提示找不到 WordCount 類:Exception in thread "main" java.lang.NoClassDefFoundError: WordCount

因為程式中宣告瞭 package ,所以在命令中也要 com.lisong.hadoop.WordCount 寫完整:

$ hadoop/bin/hadoop jar wordcount_01/wordcount.jar com.lisong.hadoop.WordCount /hbase/input /hbase/output

其中 “jar” 引數是指定 jar 包的位置,com.lisong.hadoop.WordCount 是主類。執行程式處理 input 目錄下的多個檔案,將結果寫入 /hbase/output 目錄。

4 . 檢視執行結果

$ hadoop/bin/hadoop fs -ls /hbase/output
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2015-07-28 18:05 /hbase/output/_SUCCESS
-rw-r--r--   3 hadoop supergroup         33 2015-07-28 18:05 /hbase/output/part-r-00000

可以看到/hbase/output/目錄下有兩個檔案,結果就存在part-r-00000中:

$ hadoop/bin/hadoop fs -cat /hbase/output/part-r-00000
Google  6
Java    2
baidu   3
hadoop  4

三、WordCount程式分析

3.1 Hadoop資料型別

Hadoop MapReduce操作的是鍵值對,但這些鍵值對並不是Integer、String等標準的Java型別。為了讓鍵值對可以在叢集上移動,Hadoop提供了一些實現了WritableComparable介面的基本資料型別,以便用這些型別定義的資料可以被序列化進行網路傳輸、檔案儲存與大小比較。

  • 值:僅會被簡單的傳遞,必須實現WritableWritableComparable介面。
  • 鍵:在Reduce階段排序時需要進行比較,故只能實現WritableComparable介面。

下面是8個預定義的Hadoop基本資料型別,它們均實現了WritableComparable介面:

描述
BooleanWritable標準布林型數值
ByteWritable單位元組數值
DoubleWritable雙位元組數
FloatWritable浮點數
IntWritable整型數
LongWritable長整型數
Text使用UTF8格式儲存的文字
NullWritable<key,value>中的key或value為空時使用

3.2 原始碼分析

3.2.1 Map過程

package com.lisong.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

Map過程需要繼承org.apache.hadoop.mapreduce包中 Mapper 類,並重寫其map方法。

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>

其中的模板引數:第一個Object表示輸入key的型別;第二個Text表示輸入value的型別;第三個Text表示表示輸出鍵的型別;第四個IntWritable表示輸出值的型別。

作為map方法輸入的鍵值對,其value值儲存的是文字檔案中的一行(以回車符為行結束標記),而key值為該行的首字母相對於文字檔案的首地址的偏移量。然後StringTokenizer類將每一行拆分成為一個個的單詞,並將<word,1>作為map方法的結果輸出,其餘的工作都交有 MapReduce框架 處理。

注:StringTokenizer是Java工具包中的一個類,用於將字串進行拆分——預設情況下使用空格作為分隔符進行分割。

3.2.2 Reduce過程

package com.lisong.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = 0;
for(IntWritable val:values) {
sum  = val.get();
}
result.set(sum);
context.write(key,result);
}
}

Reduce過程需要繼承org.apache.hadoop.mapreduce包中 Reducer 類,並 重寫 reduce方法。

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>

其中模板引數同Map一樣,依次表示是輸入鍵型別,輸入值型別,輸出鍵型別,輸出值型別。

public void reduce(Text key, Iterable<IntWritable> values, Context context)

reduce 方法的輸入引數 key 為單個單詞,而 values 是由各Mapper上對應單詞的計數值所組成的列表(一個實現了 Iterable 介面的變數,可以理解成 values 裡包含若干個 IntWritable 整數,可以通過迭代的方式遍歷所有的值),所以只要遍歷 values 並求和,即可得到某個單詞出現的總次數。

3.2.3 執行作業

package com.lisong.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
} 
}

在MapReduce中,由Job物件負責管理和執行一個計算任務,並通過Job的一些方法對任務的引數進行相關的設定,此處:

  • 設定了使用TokenizerMapper.class完成Map過程中的處理,使用IntSumReducer.class完成Combine和Reduce過程中的處理。
  • 還設定了Map過程和Reduce過程的輸出型別:key的型別為Text,value的型別為IntWritable。
  • 任務的輸出和輸入路徑則由命令列引數指定,並由FileInputFormat和FileOutputFormat分別設定。

    1. FileInputFormat類的很重要的作用就是將檔案進行切分 split,並將 split 進一步拆分成key/value對
    2. FileOutputFormat類的作用是將處理結果寫入輸出檔案。
  • 完成相應任務的引數設定後,即可呼叫 job.waitForCompletion() 方法執行任務。

3.2.4 WordCount流程

1)將檔案拆分成splits,由於測試用的檔案較小,所以每個檔案為一個split,並將檔案按行分割形成<key,value>對,key為偏移量(包括了回車符),value為文字行。這一步由MapReduce框架自動完成,如下圖:

2)將分割好的<key,value>對交給使用者定義的map方法進行處理,生成新的<key,value>對,如下圖所示:

3)得到map方法輸出的<key,value>對後,Mapper會將它們按照key值進行排序,並執行Combine過程,將key值相同的value值累加,得到Mapper的最終輸出結果。如下圖:

4)Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的<key,value>對,並作為WordCount的輸出結果,如下圖:

個人站點:http://songlee24.github.com


參考:
[1] 實戰Hadoop:開啟通向雲端計算的捷徑
[2]《瞭解MapReduce核心Shuffle