NO IMAGE
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

RDD

RDD彈性分散式資料集,spark最基本的資料抽象,代表一個不可變,可分割槽,裡面元素可平行計算的集合。
具有資料流模型的特點:自動容錯,位置感知性排程和可伸縮性。
RDD允許使用者在執行多個查詢時,顯示地將工作集快取在記憶體中,後續的查詢能重用工作集,這極大提高查詢速度
特點:一系列的分割槽,每一個函式作用於每個分割槽,RDD之間是一系列依賴,如果是k-v型別的RDD,會有一個分割槽器,分割槽器就是決定把資料放到哪個分割槽
一般Worker需要和DataNode節點部署在一起,這樣可以有效的避免大量網路IO
一個分割槽一定在一個節點上,一個節點可以有多個分割槽
RDD由多個分割槽組成的分散式資料集
一個block塊對應一個分割槽

啟動spark-shell

./spark-shell 
--master spark://hadoop01:7077  master地址
--executor-memory 512m    記憶體
--total-executor-cores 2    cpu核數

提交任務

./spark-submit
.
--class org.apache... 類名
--master spark://hadoop01:7077  master地址
--executor-memory 512m    記憶體
--total-executor-cores 2    cpu核數
jar包路徑
args 其它引數(需要傳才寫)

spark叢集的啟動流程

1、首先啟動master程序
2、master開始解析slaves配置檔案,找到worker的host,然後啟動相應的worker
3、worker開始與master進行註冊,把註冊資訊傳送給master
4、master收到註冊資訊後,把註冊資訊儲存到記憶體與硬碟中,然後master給worker傳送註冊成功的資訊(masterurl)
5、worker收到master的url資訊,開始與master建立心跳

spark叢集提交流程

1、driver 端 的sparksubmit 程序與master進行通訊,建立一個重要的物件(sparkcontext)
2、master收到任務資訊後,開始資源排程,和所有的worker進行通訊,找到比較空閒的worker,並通知worker啟動excutor程序
3、executor程序啟動後,開始與driver(client) 進行通訊(反向註冊),driver開始將任務提交到相應的executor,executor開始計算任務

RDD和它依賴的父RDD的依賴關係,窄依賴narrow dependency,寬依賴wide dependency

主要說的是partition  分割槽
窄依賴:一父分割槽只能對應一子分割槽        分組後join
寬依賴:一父分割槽可以對應多子分割槽,寬依賴與shuffle密不可分  沒分組join

Lineage

RDD轉換為RDD(只支援粗粒度轉換),將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復分割槽
Lineage會記錄後設資料資訊和轉換行為,當RDD部分分割槽資料丟失時,通過Lineage儲存的資訊來重新運算和恢復丟失的資料分割槽
一系列一對一關係(窄依賴),會形成一個Pipeline。可以通過父分割槽來再執行計算丟失分割槽的資料,其它的Pipeline不受影響 
Lineage在開發中不如cache 然後checkpoint(快取放到hdfs)
丟失分割槽資料後找回資料順序:cache(記憶體中找),checkpoint(hdfs中恢復),Lineage(從父分割槽再執行計算)
它與叢集容錯機制有點像。
叢集容錯機制:當一個節點(Worker)突然宕機,Master(重新排程任務)會將宕機的節點Worker未完成的資料交給其它節點。其它節點再在自己上面啟動一個Worker,然後Worker啟動Executor執行

RDD 快取 checkpoint

cache:重要RDD的快取起來,裡面呼叫的是persist,預設是記憶體快取(可以選擇快取級別)
checkpoint:持久化到本地或者hdfs
Spark速度快的原因:在不同操作中可以在記憶體中持久化或者快取多個資料集(RDD),當持久化後,每個分割槽都把計算的資料分片儲存在記憶體中,在此RDD或者之後的計算中可以重用,使後續動作更加迅速。快取是spark構建迭代式演算法和快速互動式查詢的關鍵。
為什麼要checkpoint?
執行處的中間結果,往往很重要,所以為了保證資料的安全性,要把資料做檢查點
最好把checkpoint 到RDFS,便於該叢集所有節點訪問到
在checkpoint之前,最好先cache一下,就是先把資料快取到記憶體,這樣便於執行程式時呼叫,也便於checkpoint時直接重快取中獲取資料
什麼時候做checkpoint?
在發生shuffle之後做checkpoint,(shuffle速度慢,專案中shuffle越少越好)
checkpoint的步驟?
1、建立checkpoint儲存目錄  sc.checkpointDir("hdfs://...")
2、把資料cache起來  rdd.cache()
3、checkpoint   rdd.checkpoint()
### 注意:當資料很大時,慎用cache,也不要不用,(關鍵地方,資料集不是非常大就用)
rdd.unpersist() 清除cache(cache的rdd呼叫unpersist())

DAG 有向無環圖 stage

stage劃分:找到最後的RDD,向前找,以寬依賴劃分(寬依賴前的)為一個stage,整體劃為一個stage,直到所有RDD劃分完。(每個寬依賴劃)
Stage:根據RDD之間的依賴關係的不同將DAG劃分為不同的Stage,對於窄依賴,partition的轉換處理在stage中完成計算,對於寬依賴,由於有shuffle的存在,只能在parentRDD中處理完成後才開始接下來的計算,因此寬依賴是劃分stage的依據。
劃分stage是為了把RDD來生成一個個task提交到Executor中執行,所以需要把RDD 先劃分stage再生成task。
一個Stage 生成n個分割槽個task
task的生成是依據stage,在stage中先劃分pipeline(與分割槽個數相同),然後根據pipeline生成task
shuffle read :發生在shuffle後,把父RDD 的資料讀取到子RDD中
shuffle write: 把中間結果資料寫到磁碟,為了保證資料安全性。
shuffle write 為什麼不寫記憶體裡?
1、避免結果集太大而佔用太多的記憶體資源,造成記憶體溢位
2、儲存到磁碟可以保證資料的安全性

任務執行流程 4個階段

RDD的生成  :  RDD 依賴關係,
stage的劃分  : DAG 劃分stage(在sparkcontext中 呼叫了 DAGScheduler(劃分stage的))
任務的生成  :在sparkcontext中 呼叫了 TaskSetScheduler
任務的提交  :多節點提交
在sparkcontext中  呼叫 actorSystem,DAGScheduler,TaskSetScheduler詳情看sparkcontext原始碼

yarn提交任務和spark提交任務的對比

resoucemanager 相當於master,負責任務排程,nodemanage相當於worker,負責建立容器和啟動自己的子程序。
client端相當於driver,提交任務
yarnchild 相當於executor,直接參與計算程序

JdbcRDD RDD與mysql的互動

//這是向mysql中資料庫 bigdata下的location_info表中查詢資料
object JdbcRDD {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("jdbcRDD").setMaster("local[*]")
val sc =new SparkContext(conf)
val conn=()=>{
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://192.168.216.53:3306/bigdata?useUnicode=true&characterEncoding=utf8","root","root")
}
val sql="select id,location,counts,access_date from location_info where id >= ? and id <= ? order by counts"
val jdbcRDD =new JdbcRDD(
sc,conn,sql,0,100,2,
res=>{
val id=res.getInt("id")
val location =res.getString("location")
val counts=res.getInt("counts")
val access_date=res.getDate("access_date")
(id,location,counts,access_date)
}
)
println(jdbcRDD.collect().toBuffer)
sc.stop()
}
}

自定義排序 比較兩個或多個欄位時

第一種有隱式轉換,第二種就直接繼承

object Mysort {
implicit val girlOrdering = new Ordering[Girl] {
override def compare(x: Girl, y: Girl) = {
if (x.faceValue != y.faceValue) {
x.faceValue - y.faceValue
} else {
y.age - x.age
}
}
}
}
//case class Girl(val faceValue: Int, val age: Int) {}//第一種
case class Girl(val faceValue: Int, val age: Int) extends Ordered[Girl] {
override def compare(that: Girl) = {
if (this.faceValue != that.faceValue) {
this.faceValue - that.faceValue
} else {
this.age - that.age
}
}
}
object CustimSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("jdbcRDD").setMaster("local[*]")
val sc = new SparkContext(conf)
val girlInfo = sc.parallelize(Array(("a", 80, 25), ("b", 84, 24), ("c", 85, 26)))
//第一種排序方式
//    import Mysort.girlOrdering
//    val res: RDD[(String, Int, Int)] = girlInfo.sortBy(x=>Girl(x._2,x._3),false)
//第二種
val res: RDD[(String, Int, Int)] = girlInfo.sortBy(x => Girl(x._2, x._3), false)
println(res.collect().toBuffer)
sc.stop()
}
}

累加器 Accumulator

task 只能對accumulator進行累加

spark提供的accumulator,主要用於多個節點對一個變數進行共享操作。
它提供了多個task對一個變數並行的操作功能,task只能對Accumutor進行累加的操作,不能讀取其值
只有driver才能讀取。
/**
* 累加器,就是累加操作
*/
object Accumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("jdbcRDD").setMaster("local[*]")
val sc = new SparkContext(conf)
val sum: Accumulator[Int] = sc.accumulator(0)
val numbers = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 0), 2)
numbers.foreach(num => {
sum  = num
})
println(sum)
sc.stop()
}
}

相關文章

程式語言 最新文章