Spark入門(三)Spark經典的單詞統計

NO IMAGE

spark經典之單詞統計

準備數據

既然要統計單詞我們就需要一個包含一定數量的文本,我們這裡選擇了英文原著《GoneWithTheWind》(《飄》)的文本來做一個數據統計,看看文章中各個單詞出現頻次如何。為了便於大家下載文本。可以到GitHub上下載文本以及對應的代碼。我將文本放在項目的目錄下。

Spark入門(三)Spark經典的單詞統計

首先我們要讀取該文件,就要用到SparkContext中的textFile的方法,我們嘗試先讀取第一行。

scala實現

import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
println(sc.textFile("./GoneWithTheWind").first())
}
}

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
System.out.println(sc.textFile("./GoneWithTheWind").first());
}
}

python實現

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
print(sc.textFile("./GoneWithTheWind").first())

得到輸出

Spark入門(三)Spark經典的單詞統計

Chapter 1

以scala為例,其餘兩種語言也差不多。第一步我們創建了一個SparkConf

val conf = new SparkConf().setMaster("local").setAppName("WordCount")

這裡我們設置Master為local,該程序名稱為WordCount,當然程序名稱可以任意取,和類名不同也無妨。但是這個Master則不能亂寫,當我們在集群上運行,用spark-submit的時候,則要注意。我們現在只討論本地的寫法,因此,這裡只寫local。

接著一句我們創建了一個SparkContext,這是spark的核心,我們將conf配置傳入初始化

 val sc = new SparkContext(conf)

最後我們將文本路徑告訴SparkContext,然後輸出第一行內容

println(sc.textFile("./GoneWithTheWind").first())

開始統計

接著我們就可以開始統計文本的單詞數了,因為單詞是以空格劃分,所以我們可以把空格作為單詞的標記。

scala實現

import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
//設置數據路徑
val text = sc.textFile("./GoneWithTheWind")
//將文本數據按行處理,每行按空格拆成一個數組
// flatMap會將各個數組中元素合成一個大的集合
val textSplit = text.flatMap(line =>line.split(" "))
//處理合並後的集合中的元素,每個元素的值為1,返回一個元組(key,value)
//其中key為單詞,value這裡是1,即該單詞出現一次
val textSplitFlag = textSplit.map(word => (word,1))
//reduceByKey會將textSplitFlag中的key相同的放在一起處理
//傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
val countWord = textSplitFlag.reduceByKey((x,y)=>x+y)
//將計算後的結果存在項目目錄下的result目錄中
countWord.saveAsTextFile("./result")
}
}

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
//設置數據的路徑
JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
//將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
//這裡需要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出
//與Hadoop中的map-reduce非常相似
JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//處理合並後的集合中的元素,每個元素的值為1,返回一個Tuple2,Tuple2表示兩個元素的元組
//值得注意的是上面是JavaRDD,這裡是JavaPairRDD,在返回的是元組時需要注意這個區別
//PairFunction中<String, String, Integer>,第一個String是輸入值類型
//第二第三個,String, Integer是返回值類型
//這裡返回的是一個word和一個數值1,表示這個單詞出現一次
JavaPairRDD<String, Integer> splitFlagRDD = splitRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
//reduceByKey會將splitFlagRDD中的key相同的放在一起處理
//傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
JavaPairRDD<String, Integer> countRDD = splitFlagRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
//將計算後的結果存在項目目錄下的result目錄中
countRDD.saveAsTextFile("./resultJava");
}
}

python實現

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
# 設置數據的路徑
textData = sc.textFile("./GoneWithTheWind")
# 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
splitData = textData.flatMap(lambda line:line.split(" "))
# 處理合並後的集合中的元素,每個元素的值為1,返回一個元組(key,value)
# 其中key為單詞,value這裡是1,即該單詞出現一次
flagData = splitData.map(lambda word:(word,1))
# reduceByKey會將textSplitFlag中的key相同的放在一起處理
# 傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
countData = flagData.reduceByKey(lambda x,y:x+y)
#輸出文件
countData.saveAsTextFile("./result")

運行後在住目錄下得到一個名為result的目錄,該目錄如下圖,SUCCESS表示生成文件成功,文件內容存儲在part-00000中

Spark入門(三)Spark經典的單詞統計

我們可以查看文件的部分內容:

('Chapter', 1)
('1', 1)
('SCARLETT', 1)
('O’HARA', 1)
('was', 74)
('not', 33)
('beautiful,', 1)
('but', 32)
('men', 4)
('seldom', 3)
('realized', 2)
('it', 37)
('when', 19)
('caught', 1)
('by', 20)
('her', 65)
('charmas', 1)
('the', 336)
('Tarleton', 7)
('twins', 16)
('were.', 1)
('In', 1)
('face', 6)
('were', 49)
...
...
...
...

Spark入門(三)Spark經典的單詞統計

這樣就完成了一個spark的真正HelloWorld程序–單詞計數。對比三個語言版本的程序,發現一個事實那就是,用scala和python寫的代碼非常簡潔而且易懂,而Java實現的則相對複雜,難懂。當然這個易懂和難懂是相對而言的。如果你只會Java無論如何你都應該從中能看懂java的程序,而簡潔的scala和python對你來說根本看不懂。這也無妨,語言只是工具,重點看你怎麼用。況且,我們使用java8的特性也可以寫出簡潔的代碼。

java8實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
countJava8(sc);
}
public static void countJava8(JavaSparkContext sc){
sc.textFile("./GoneWithTheWind")
.flatMap(s->Arrays.asList(s.split(" ")).iterator())
.mapToPair(s->new Tuple2<>(s,1))
.reduceByKey((x,y)->x+y)
.saveAsTextFile("./resultJava8");
}
}

spark的優越性在這個小小的程序中已經有所體現,計算一本書的每個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。

對程序進行優化

程序是否還能再簡單高效呢?當然是可以的,我們可以用countByValue這個函數,這個函數正是常用的計數的方法。

scala實現


import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
//設置數據路徑
val text = sc.textFile("./GoneWithTheWind")
//將文本數據按行處理,每行按空格拆成一個數組
// flatMap會將各個數組中元素合成一個大的集合
val textSplit = text.flatMap(line =>line.split(" "))
println(textSplit.countByValue())
}
}

運行得到結果

Map(Heknew -> 1, &emsp;&emsp;“Ashley -> 1, “Let’s -> 1, anarresting -> 1, of. -> 1, pasture -> 1, war’s -> 1, wall. -> 1, looks -> 2, ain’t -> 7,.......

Spark入門(三)Spark經典的單詞統計

java實現

public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
countJava(sc);
}
public static void countJava(JavaSparkContext sc){
//設置數據的路徑
JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
//將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
//這裡需要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出
//與Hadoop中的map-reduce非常相似
JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
System.out.println(splitRDD.countByValue());
}
}

運行得到結果

{Heknew=1, &emsp;&emsp;“Ashley=1, “Let’s=1, anarresting=1, of.=1, pasture=1, war’s=1, wall.=1, looks=2, ain’t=7, Clayton=1, approval.=1, ideas=1,

Spark入門(三)Spark經典的單詞統計

python實現

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
# 設置數據的路徑
textData = sc.textFile("./GoneWithTheWind")
# 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
splitData = textData.flatMap(lambda line:line.split(" "))
print(splitData.countByValue())

運行得到結果:

defaultdict(<class 'int'>, {'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O’HARA': 1, 'was': 74, 'not': 33, 'beautiful,': 1, 'but': 32, 'men': 4, 

Spark入門(三)Spark經典的單詞統計

spark的優越性在這個小小的程序中已經有所體現,計算一本書的每個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。如果想要獲取源代碼以及數據內容,可以前往我的github下載。

相關文章

Spark入門(七)Spark的intersection、subtract、union和distinct

Spark入門(六)Spark的combineByKey、sortBykey

Spark入門(五)Spark的reduce和reduceByKey

Spark入門(四)Spark的map、flatMap、mapToPair