Spark入門(六)Spark的combineByKey、sortBykey

NO IMAGE

spark的combineByKey

combineByKey的特點

combineByKey的強大之處,在於提供了三個函數操作來操作一個函數。第一個函數,是對元數據處理,從而獲得一個鍵值對。第二個函數,是對鍵值鍵值對進行一對一的操作,即一個鍵值對對應一個輸出,且這裡是根據key進行整合。第三個函數是對key相同的鍵值對進行操作,有點像reduceByKey,但真正實現又有著很大的不同。

Spark入門(五)–Spark的reduce和reduceByKey中,我們用reduce進行求平均值。用combineByKey我們則可以求比平均值更為豐富的事情。現在有一個數據集,每一行數據包括一個a-z字母和一個整數,其中字母和整數之間以空格分隔。現在要求得每個字母的平均數。這個場景有點像多個學生,每個學生多門成績,求得學生的平均分。但這裡將問題簡化,其中數據集放在grades中。數據集以及下面的代碼都可以在github上下載。

combineByKey求多個平均值

scala實現


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkCombineByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(
value => (value,1),
(x:(Int,Int),y)=>(x._1+y,x._2+1),
(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)
}
}

scala運行結果

(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)

Spark入門(六)Spark的combineByKey、sortBykey

java實現:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.sources.In;
import scala.Tuple2;
public class SparkCombineByKeyJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava");
JavaSparkContext sc = new JavaSparkContext(conf);
combineByKeyJava(sc);
combineByKeyJava8(sc);
}
public static void combineByKeyJava(JavaSparkContext sc){
JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] splits = s.split(" ");
return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
}
});
splitData.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<>(integer, 1);
}
}, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Integer integer) throws Exception {
return new Tuple2<>(integerIntegerTuple2._1 + integer, integerIntegerTuple2._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2);
}
}).map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Tuple2<String,Double>>() {
@Override
public Tuple2<String,Double> call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
return new Tuple2<>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1*1.0/stringTuple2Tuple2._2._2);
}
}).foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});
}
public static void combineByKeyJava8(JavaSparkContext sc){
JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(line -> {
String[] splits = line.split(" ");
return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
});
splitData.combineByKey(
x->new Tuple2<>(x,1),
(x,y)->new Tuple2<>(x._1+y,x._2+1),
(x,y)->new Tuple2<>(x._1+y._1,x._2+y._2)
).map(x->new Tuple2(x._1,x._2._1*1.0/x._2._2)).foreach(x->System.out.println(x._1+" "+x._2));
}
}

java運行結果

d 338451.6
e 335306.7480769231
a 336184.95321637427
i 346279.497029703
b 333069.8589473684
h 334343.75
f 341380.94444444444
j 320145.7618069815
g 334042.37605042016
c 325022.4183673469

Spark入門(六)Spark的combineByKey、sortBykey

分析

在開始python之前,我們先觀察java和scala兩個程序。我們發現java7的代碼非常冗餘,而java8和scala則相比起來非常乾淨利落。當然,我們難說好壞,但是這也表現出當代語言開始從繁就簡的一個轉變。到了python這一特點就體現的更加淋漓盡致。

但我們不光說語言,我們分析這個求平均的實現方式,由於java中對數值做了一個處理,因此有保留小數,而scala則沒有,但至少可以判斷兩者的結果是一致的。當然,這不是重點,重點是,這個combinByKey非常複雜,有三個函數。我們很難觀察到每個過程做了什麼。因此我們在這裡,對scala程序進行進一步的輸出,從而觀察combineByKey到底做了什麼。

scala修改


import org.apache.spark.{SparkConf, SparkContext}
object SparkCombineByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(
value => {
println("這是第一個函數")
println("將所有的值遍歷,並放在元組中,標記1")
println(value)
(value,1)
},
(x:(Int,Int),y)=>{
println("這是第二個函數")
println("將x中的第一個值進行累加求和,第二個值加一,求得元素總個數")
println("x:"+x.toString())
println("y:"+y)
(x._1+y,x._2+1)
},
(x:(Int,Int),y:(Int,Int))=>{
(x._1+y._1,x._2+y._2)
}
).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)
}
}

得到結果

這是第一個函數
將所有的值遍歷,並放在元組中,標記1
222783
這是第一個函數
將所有的值遍歷,並放在元組中,標記1
48364
這是第一個函數
將所有的值遍歷,並放在元組中,標記1
204950
這是第一個函數
將所有的值遍歷,並放在元組中,標記1
261777
...
...
...
這是第二個函數
將x中的第一個值進行累加求和,第二個值加一,求得元素總個數
x:(554875,2)
y:357748
這是第二個函數
將x中的第一個值進行累加求和,第二個值加一,求得元素總個數
x:(912623,3)
y:202407
這是第一個函數
將所有的值遍歷,並放在元組中,標記1
48608
這是第二個函數
將x中的第一個值進行累加求和,第二個值加一,求得元素總個數
x:(1115030,4)
y:69003
這是第一個函數
將所有的值遍歷,並放在元組中,標記1
476893
...
...
...
(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)

這裡我們發現了,函數的順序並不先全部執行完第一個函數,再執行第二個函數。而是分區並行,即第一個分區執行完第一個函數,並不等待其他分區執行完第一個函數,而是緊接著執行第二個函數,最後在第三個函數進行處理。在本地單機下,該並行特點並不能充分發揮,但在集群環境中,各個分區在不同節點計算,然後處理完結果彙總處理。這樣,當數據量十分龐大時,集群節點數越多,該優勢就表現地越明顯。

此外還有一個非常值得關注的特點,當我們把foreach(println)這句話去掉時

foreach(println)

我們運行程序,發現程序沒有任何輸出。這是由於spark的懶加載特點,spark只用在對數據執行具體操作時,如輸出、保存等才會執行計算。這看起來有點不合理,但實際上這樣做在很多場景下能大幅度提升效率,但如果沒有處理好,可能會導致spark每次執行操作都會從頭開始計算該過程。因此當一個操作結果需要被頻繁或者多次調用的時候,我們應該將結果存下來。

python實現

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
sc = SparkContext(conf=conf)
sc.textFile("./grades")\
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)

得到結果

('b', 333069.8589473684)
('f', 341380.94444444444)
('j', 320145.7618069815)
('h', 334343.75)
('a', 336184.95321637427)
('g', 334042.37605042016)
('d', 338451.6)
('e', 335306.7480769231)
('c', 325022.4183673469)

Spark入門(六)Spark的combineByKey、sortBykey

spark的sortByKey

sortByKey進行排序

sortByKey非常簡單,也非常常用。這裡依然採用上述文本,將處理後的結果,進行排序,得到平均值最大的字母。在實際運用中我們這裡可以看成求得按照成績排序,或者按照姓名排序。

scala實現

import org.apache.spark.{SparkConf, SparkContext}
object SparkSortByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
val result = sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._1,x._2._1/x._2._2))
//按照名字排序,順序
result.sortByKey(true).foreach(println)
//按照名字排序,倒序
result.sortByKey(false).foreach(println)
val result1 = sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._2._1/x._2._2,x._1))
//按照成績排序,順序
result1.sortByKey(true).foreach(println)
//按照成績排序,倒序
result1.sortByKey(false).foreach(println)
}
}

python實現

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
sc = SparkContext(conf=conf)
result = sc.textFile("./grades")\
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1]))
result.sortByKey(True).foreach(print)
result.sortByKey(False).foreach(print)
result1 = sc.textFile("./grades")\
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[1][0]/x[1][1],x[0]))
result1.sortByKey(True).foreach(print)
result1.sortByKey(False).foreach(print)

得到結果

(a,336184)
(b,333069)
(c,325022)
(d,338451)
(e,335306)
(f,341380)
(g,334042)
(h,334343)
(i,346279)
(j,320145)
(j,320145)
(i,346279)
(h,334343)
(g,334042)
(f,341380)
(e,335306)
(d,338451)
(c,325022)
(b,333069)
(a,336184)
(320145,j)
(325022,c)
(333069,b)
(334042,g)
(334343,h)
(335306,e)
(336184,a)
(338451,d)
(341380,f)
(346279,i)
(346279,i)
(341380,f)
(338451,d)
(336184,a)
(335306,e)
(334343,h)
(334042,g)
(333069,b)
(325022,c)
(320145,j)

數據集以及代碼都可以在github上下載。

相關文章

【機器學習】深度學習開發環境搭建

Spark實戰搭建我們的Spark分佈式架構

Spark實戰尋找5億次訪問中,訪問次數最多的人

Spark入門(七)Spark的intersection、subtract、union和distinct