Spark入門(七)Spark的intersection、subtract、union和distinct

NO IMAGE

Spark的intersection

intersection顧名思義,他是指交叉的。當兩個RDD進行intersection後,將保留兩者共有的。因此對於RDD1.intersection(RDD2) 和RDD2.intersection(RDD1) 。應該是一致的。

比如對於,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7},對於包含這兩個List的RDD來說,他們進行一次intersection應該得到result={3,4,5}

Spark的subtract

subtract則和intersection不同,他是找出兩者之間不一致的內容。

比如對於,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他們進行一次subtract得到的結果是跟順序有關的。

list1.subtract(list2) 

結果應該為

1 2

而對於

list2.subtract(list1) 

結果應該為

6 7

Spark的union

union最好理解,他是把兩個RDD進行整合,但不考慮其中重複的情況。比如對於,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他們進行一次union得到的結果是跟順序無關的。結果應該為

result = {1,2,3,4,5,3,4,5,6,7}

Spark的distinct

distinc 是將RDD中重複的內容剔除,注意,這個剔除的過程並不會把重複的元素都去掉,而是重複的元素只保留一份。這當然很好理解,比如result = {1,2,3,4,5,3,4,5,6,7},進行一次distinct,則得到{1,2,3,4,5,6,7}

一個綜合的例子

考慮到intersection、subtract、union和distinct比較常用,且在一個案例中能夠很好體現其特點。因此我們這次獲取的數據集是兩個課程,lesson1和lesson2。lesson1中有十位同學,每個同學都有著許多個能力的估值,該估值是一個Int類型數據。lesson2中也是如此。對於這兩個數據集我將其分別放在lesson1中和lesson2中。數據集和下面的代碼均可以在github上找到並下載。

數據集分析

對於lesson1,裡面有很多同學,每個同學又有很多次能力估值。在Spark入門(六)–Spark的combineByKey、sortBykey中已經提到過給每個人的成績求平均分,因此這裡不做這個處理。

這兩個數據集我們解決如下的問題:

  • 0、計算lesson1和lesson2中每個同學的能力總估值
  • 1、找出lesson1中所有的同學(不重複)
  • 2、找出lesson2中所有同學(不重複)
  • 3、找出選了兩門課程的同學
  • 4、找出只在lesson1而不在lesson2中的同學
  • 5、找出只在lesson2而不在lesson1中的同學

數據的部分內容展示

Spark入門(七)Spark的intersection、subtract、union和distinct

對於第0個問題,因為用到的並非本節的內容,因此標註為0。要求每個課程中的每個同學能力的總估值,首先要對數據進行處理,按空格拆分。拆分後的數據應該是(姓名,分數)的元組集合,然後根據姓名對分數進行累加。

  • 第一個問題中找出lesson1中所有同學,只要得到了每個同學能力的總估值,去掉分數,即可知道lesson1中的所有同學。

  • 第二題同理。

  • 第三題要找出選了兩門課的同學,則要對兩門課所有的同學進行一次整合,然後剔除重複的數據,即先union再distinc

  • 第四題要找到lesson1中而不在lesson二中的同學,則只要對lesson1的同學和lesson2中的同學進行一次substract即可

  • 第五題同理

scala實現


import org.apache.spark.{SparkConf, SparkContext}
object SparkIntersectionAndSubtract {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtract")
val sc = new SparkContext(conf)
//課程一中的數據
val lesson1Data = sc.textFile("./lesson1").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))
//將課程一中每個人的分數相加
val lesson1Grade = lesson1Data.reduceByKey(_+_)
val lesson1Student = lesson1Grade.map(x=>x._1)
//課程二中的數據處理
val lesson2Data = sc.textFile("./lesson2").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))
//將課程二中每個人的分數相加
val lesson2Grade = lesson2Data.reduceByKey((x,y)=>x+y)
val lesson2Student = lesson2Grade.map(x=>x._1)
//在課程一中的人且在課程二中的人的集合
println("Students On Lesson1 And On Lesson2")
lesson1Student.intersection(lesson2Student).foreach(println)
//在課程二中的人且在課程一中的人的集合,與上面的結果一致
println("Students On Lesson1 And On Lesson2")
lesson2Student.intersection(lesson1Student).foreach(println)
//在課程一中的人但不在課程二中的人的集合
println("Students Only In Lesson1")
val onlyInLesson1 = lesson1Student.subtract(lesson2Student)
onlyInLesson1.foreach(println)
//在課程二中的人但不在課程二中的人的集合
println("Students Only In Lesson2")
val onlyInLesson2 = lesson2Student.subtract(lesson1Student)
onlyInLesson2.foreach(println)
//只選了一門課的同學
println("Students Only Choose One Lesson")
lesson1Student.union(lesson2Student).foreach(println)
//兩門課所有學生(不重複)
println("All the students")
lesson1Student.union(lesson2Student).distinct().foreach(print)
}
}

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class SparkIntersectionAndSubtractJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtractJava");
JavaSparkContext sc = new JavaSparkContext(conf);
//java7實現
intersectionAndSubtractJava(sc);
//java8實現
intersectionAndSubtractJava8(sc);
}
public static void intersectionAndSubtractJava(JavaSparkContext sc){
JavaRDD<String> lesson1Data = sc.textFile("./lesson1");
JavaRDD<String> lesson2Data = sc.textFile("./lesson2");
JavaPairRDD<String,Integer> lesson1InfoData = lesson1Data.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
}
});
JavaPairRDD<String,Integer> lesson2InfoData = lesson2Data.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
}
});
JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
JavaRDD<String> lesson1Students = lesson1Grades.map(new Function<Tuple2<String, Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2._1;
}
});
JavaRDD<String> lesson2Students = lesson2Grades.map(new Function<Tuple2<String, Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2._1;
}
});
//既在lesson1中又在lesson2中的學生
System.out.println("Students On Lesson1 And On Lesson2");
lesson1Students.intersection(lesson2Students).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
//既在lesson2中又在lesson1中的學生,與上面的結果一致
System.out.println("Students On Lesson1 And On Lesson2");
lesson2Students.intersection(lesson1Students).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
//只在lesson1中而不在lesson2中的學生
JavaRDD<String> studensOnlyInLesson1 = lesson1Students.subtract(lesson2Students);
System.out.println("Students Only In Lesson1");
lesson1Students.subtract(lesson2Students).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
//只在lesson2中而不在lesson1中的學生
JavaRDD<String> studensOnlyInLesson2 = lesson2Students.subtract(lesson1Students);
System.out.println("Students Only In Lesson2");
studensOnlyInLesson2.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
//只選了一門課的學生
JavaRDD<String> onlyOneLesson = studensOnlyInLesson1.union(studensOnlyInLesson2);
System.out.println("Students Only Choose One Lesson");
onlyOneLesson.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
System.out.println("All the students");
lesson1Students.union(lesson2Students).distinct().foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
}
public static void intersectionAndSubtractJava8(JavaSparkContext sc){
JavaRDD<String> lesson1Data = sc.textFile("./lesson1");
JavaRDD<String> lesson2Data = sc.textFile("./lesson2");
JavaPairRDD<String,Integer> lesson1InfoData =
lesson1Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));
JavaPairRDD<String,Integer> lesson2InfoData =
lesson2Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));
JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey((x,y) -> x+y);
JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey((x,y) -> x+y);
JavaRDD<String> studentsInLesson1 = lesson1Grades.map(x->x._1);
JavaRDD<String> studentsInLesson2 = lesson2Grades.map(x->x._1);
//既在lesson1中又在lesson2中的學生
studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));
//既在lesson2中又在lesson1中的學生,與上面的結果一致
studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));
//只在lesson1中的學生
JavaRDD<String> studentsOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2);
studentsOnlyInLesson1.foreach(name -> System.out.println(name));
//只在lesson2中的學生
JavaRDD<String> studentsOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1);
studentsOnlyInLesson2.foreach(name -> System.out.println(name));
//只選了一門課的學生
JavaRDD<String> studentsOnlyOneLesson = studentsOnlyInLesson1.union(studentsInLesson2);
studentsOnlyOneLesson.foreach(name -> System.out.println(name));
studentsInLesson1.union(studentsInLesson2).distinct().foreach(name -> System.out.println(name));
}
}

python實現

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
sc = SparkContext(conf=conf)
#lesson1數據
lesson1Data = sc.textFile("./lesson1").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))
#lesson2數據
lesson2Data = sc.textFile("./lesson2").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))
#lesson1中每個人的總分
lesson1InfoData = lesson1Data.reduceByKey(lambda x,y:x+y)
#lesson2中每個人的總分
lesson2InfoData = lesson2Data.reduceByKey(lambda x,y:x+y)
#lesson1中的學生
studentsInLesson1 = lesson1InfoData.map(lambda x:x[0])
#lesson2中的學生
studentsInLesson2 = lesson2InfoData.map(lambda x:x[0])
#在lesson1中且在lesson2中的學生
print("Students On Lesson1 And On Lesson2")
studentsInLesson1.intersection(studentsInLesson2).foreach(print)
#在lesson2中且在lesson1中的學生,與上面的結果一致
print("Students On Lesson1 And On Lesson2")
studentsInLesson2.intersection(studentsInLesson1).foreach(print)
#只在lesson1中的學生
print("Students Only In Lesson1")
studensOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2)
studensOnlyInLesson1.foreach(print)
#只在lesson2中的學生
print("Students Only In Lesson2")
studensOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1)
studensOnlyInLesson2.foreach(print)
#只選了一門課的學生
print("Students Only Choose One Lesson")
studensOnlyInLesson1.union(studensOnlyInLesson2).foreach(print)
#兩門課所有學生(不重複)
print("All the students")
studentsInLesson1.union(studentsInLesson2).distinct().foreach(print)

運行得到結果

Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Bob
Coco
Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Coco
Bob
Students Only In Lesson1
Bill
David
Mike
Nancy
Lucy
Students Only In Lesson2
White
Jimmy
Jason
John
Frank
Students Only Choose One Lesson
Bill
David
Mike
Nancy
Lucy
White
Jimmy
Jason
John
Frank
All the students
Vicky
Bill
Amy
White
Jimmy
Jason
Lili
David
Bob
Mike
Coco
Nancy
Lucy
John
Frank

Spark入門(七)Spark的intersection、subtract、union和distinct

通過上面的例子,非常具體地應用了intersection、subtract、union和distinct來解決具體的問題。並且利用好這幾個方法能夠很快速地進行一些數據集之間的關係操作。事實上,直接利用這幾種方法比我們自己動手實現要好很多,因為spark中對這幾種方法進行了優化。

數據集和代碼均可以在github上找到並下載

相關文章

【機器學習】機器學習簡介

【機器學習】深度學習開發環境搭建

Spark實戰搭建我們的Spark分佈式架構

Spark實戰尋找5億次訪問中,訪問次數最多的人