RDD

NO IMAGE
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

RDD概念

  • RDD原始碼中的描述:
    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map, filter, and persist. In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as groupByKey and join[[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.

  • RDD是Spark 中一種抽象的基本單元,從原始碼中可以總結為,RDD是一種彈性的分散式資料集(Resilient Distributed Dataset),彈性,體現在它的計算上,即RDD在計算時提供了高容錯機制,使得一個RDD在損壞或丟失的時候,可以從父RDD進行重算;分散式,RDD內部是分割槽的,對RDD做計算就是對RDD中每個分割槽做計算,RDD的分割槽並行地執行在多個節點之上,提高計算速度;資料集,可以把RDD理解為一個資料集合,這個資料集合是不可變的,如果從一個RDD通過計算得到一個新RDD,那麼這倆個RDD是不同的。

RDD的五大特性

  • 原始碼中的描述:
    • A list of partitions
    • A function for computing each split
    • A list of dependencies on other RDDs
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
      an HDFS file)
  • 解釋:
    • 1.A list of partitions
      RDD是一個由多個partition(某個節點裡的某一片連續的資料)組成的的list;將資料載入為RDD時,一般會遵循資料的本地性(一般一個hdfs裡的block會載入為一個partition)。
    • 2.A function for computing each split
      RDD的每個partition上面都會有function,也就是函式應用,其作用是實現RDD之間partition的轉換。
    • 3.A list of dependencies on other RDDs
      RDD會記錄它的依賴 ,為了容錯(重算,cache,checkpoint),也就是說在記憶體中的RDD操作時出錯或丟失會進行重算。
    • 4.Optionally,a Partitioner for Key-value RDDs
      可選項,如果RDD裡面存的資料是key-value形式,則可以傳遞一個自定義的Partitioner進行重新分割槽,例如這裡自定義的Partitioner是基於key進行分割槽,那則會將不同RDD裡面的相同key的資料放到同一個partition裡面
    • 5.Optionally, a list of preferred locations to compute each split on
      最優的位置去計算,也就是資料的本地性。

RDD建立方式

SparkContext’s parallelize(並行化集合)

  • 並行化集合通過呼叫SparkContext的並行化方法在驅動程式(Scala Seq)中的現有集合上建立的。複製集合的元素以形成可並行操作的分散式資料集。

    1. val data = Array(1, 2, 3, 4, 5)
    2. val distData = sc.parallelize(data)
    

    parallelize()方法可以傳遞第二引數來設定RDD的分割槽數。

SparkContext’s textFile(外部資料集)

  • Spark可以從Hadoop支援的任何儲存源建立分散式資料集,包括本地檔案系統、HDFS、Cassandra、HBase、Amazon S3等。SCAP支援 text files, SequenceFiles,和任何其他Hadoop輸入格式。

    1. 本地檔案
    2. val distFile = sc.textFile("file:///home/hadoop/data/input.txt") 
    3. hdfs檔案
    4. val distFile = sc.textFile("hdfs://192.168.137.120/input.txt") 
    

    textFile()方法可以傳遞一個資料夾也可以使用萬用字元來傳遞多個指定的格式的檔案,當傳遞多個檔案時,每一個檔案為一個process,載入為一個分割槽。

RDD的常用運算元介紹

  • RDD的運算元有Transformation和Action倆類,Transformation是將一個RDD通過函式轉換成一個新的RDD,Transfortmation操作被定義為lazy (懶載入)即,當Transformation一個RDD時,並不會馬上執行,只有當遇到一個Action 時才會執行;Action是一個應用程式job的產生。

Transformation

  • map(func):返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成
  • filter(func):返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成,可以理解為一種過濾操作,即,將符合條件(true)的元素返回。
  • flatMap(func): 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素),可以理解為元素壓扁到一行進行操作。
  • groupByKey([numTasks]):在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD,可以理解為按key進行分組,將同一key的值放入一個陣列中。
  • reduceByKey(func, [numTasks]):在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定,可以理解為按key將value進行reduce。
  • sortByKey([ascending], [numTasks]):在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD
  • join(otherDataset, [numTasks]):在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD,可以使用多種join,如inner join(就是普通的join),outer join(left,right,full),cross join等
  • subtract(otherDataset):取引數RDD在源RDD的補集(做減法),返回一個新RDD
  • intersection(otherDataset):對源RDD和引數RDD求交集後返回一個新的RDD
  • cartesian(otherDataset): 對源RDD和引數RDD每個元素做笛卡爾積,返回一個新RDD

Action

  • reduce(func): 通過func函式聚集RDD中的所有元素,這個功能必須是可交換且可並聯的
  • collect(): 在驅動程式中,以陣列的形式返回資料集的所有元素
  • count(): 返回RDD的元素個數
  • first(): 返回RDD的第一個元素(類似於take(1))
  • take(n): 返回一個由資料集的前n個元素組成的陣列
  • saveAsTextFile(path): 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字
  • saveAsSequenceFile(path): 將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下
  • saveAsObjectFile(path): 將資料集的元素,以 Java 序列化的方式儲存到指定的目錄下
  • foreach(func): 遍歷元素,對每個元素呼叫函式

相關文章

程式語言 最新文章