Cris帶你快速入門Flink

NO IMAGE

一 概述

1.1 流處理技術的演變

在開源世界裡,Apache Storm項目是流處理的先鋒。Storm最早由Nathan Marz和創業公司BackType的一個團隊開發,後來才被Apache基金會接納。Storm提供了低延遲的流處理,但是它為實時性付出了一些代價:很難實現高吞吐,並且其正確性沒能達到通常所需的水平,換句話說,它並不能保證exactly-once,即便是它能夠保證的正確性級別,其開銷也相當大。

在低延遲和高吞吐的流處理系統中維持良好的容錯性是非常困難的,但是為了得到有保障的準確狀態,人們想到了一種替代方法:將連續時間中的流數據分割成一系列微小的批量作業。如果分割得足夠小(即所謂的微批處理作業),計算就幾乎可以實現真正的流處理。因為存在延遲,所以不可能做到完全實時,但是每個簡單的應用程序都可以實現僅有幾秒甚至幾亞秒的延遲。這就是在Spark批處理引擎上運行的Spark Streaming所使用的方法。

更重要的是,使用微批處理方法,可以實現exactly-once語義,從而保障狀態的一致性。如果一個微批處理失敗了,它可以重新運行,這比連續的流處理方法更容易。Storm Trident是對Storm的延伸,它的底層流處理引擎就是基於微批處理方法來進行計算的,從而實現了exactly-once語義,但是在延遲性方面付出了很大的代價。

對於Storm Trident以及Spark Streaming等微批處理策略,只能根據批量作業時間的倍數進行分割,無法根據實際情況分割事件數據,並且,對於一些對延遲比較敏感的作業,往往需要開發者在寫業務代碼時花費大量精力來提升性能。這些靈活性和表現力方面的缺陷,使得這些微批處理策略開發速度變慢,運維成本變高。

於是,Flink出現了,這一技術框架可以避免上述弊端,並且擁有所需的諸多功能,還能按照連續事件高效地處理數據,Flink的部分特性如下圖所示:

Cris帶你快速入門Flink

1.2 初識Flink

Flink起源於Stratosphere項目,Stratosphere是在2010~2014年由3所地處柏林的大學和歐洲的一些其他的大學共同進行的研究項目,2014年4月Stratosphere的代碼被複制並捐贈給了Apache軟件基金會,參加這個孵化項目的初始成員是Stratosphere系統的核心開發人員,2014年12月,Flink一躍成為Apache軟件基金會的頂級項目。

在德語中,Flink一詞表示快速和靈巧,項目採用一隻松鼠的彩色圖案作為logo,這不僅是因為松鼠具有快速和靈巧的特點,還因為柏林的松鼠有一種迷人的紅棕色,而Flink的松鼠logo擁有可愛的尾巴,尾巴的顏色與Apache軟件基金會的logo顏色相呼應,也就是說,這是一隻Apache風格的松鼠。

官網鏈接

Cris帶你快速入門Flink

Flink主頁在其頂部展示了該項目的理念:“Apache Flink是為分佈式、高性能、隨時可用以及準確的流處理應用程序打造的開源流處理框架”。

Apache Flink是一個框架和分佈式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。

1.3 Flink核心計算框架

Flink的核心計算架構是下圖中的Flink Runtime執行引擎,它是一個分佈式系統,能夠接受數據流程序並在一臺或多臺機器上以容錯方式執行。

Flink Runtime執行引擎可以作為YARN(Yet Another Resource Negotiator)的應用程序在集群上運行,也可以在Mesos集群上運行,還可以在單機上運行(這對於調試Flink應用程序來說非常有用)。

Cris帶你快速入門Flink

上圖為Flink技術棧的核心組成部分,值得一提的是,Flink分別提供了面向流式處理的接口(DataStream API)和麵向批處理的接口(DataSet API)。因此,Flink既可以完成流處理,也可以完成批處理。Flink支持的拓展庫涉及機器學習(FlinkML)、複雜事件處理(CEP)、以及圖計算(Gelly),還有分別針對流處理和批處理的Table API。

Flink 是一個真正的批流結合的大數據計算框架,將大數據背景下的計算統一整合在一起,不僅降低了學習和操作難度,也有效實現了離線計算和實時計算的大一統

能被Flink Runtime執行引擎接受的程序很強大,但是這樣的程序有著冗長的代碼,編寫起來也很費力,基於這個原因,Flink提供了封裝在Runtime執行引擎之上的API,以幫助用戶方便地生成流式計算程序。Flink 提供了用於流處理的DataStream API和用於批處理的DataSet API。值得注意的是,儘管Flink Runtime執行引擎是基於流處理的,但是DataSet API先於DataStream API被開發出來,這是因為工業界對無限流處理的需求在Flink誕生之初並不大。

DataStream API可以流暢地分析無限數據流,並且可以用Java或者Scala來實現。開發人員需要基於一個叫DataStream的數據結構來開發,這個數據結構用於表示永不停止的分佈式數據流。

Flink的分佈式特點體現在它能夠在成百上千臺機器上運行,它將大型的計算任務分成許多小的部分,每個機器執行一部分。Flink能夠自動地確保發生機器故障或者其他錯誤時計算能夠持續進行,或者在修復bug或進行版本升級後有計劃地再執行一次。這種能力使得開發人員不需要擔心運行失敗。Flink本質上使用容錯性數據流,這使得開發人員可以分析持續生成且永遠不結束的數據(即流處理)。

二 Flink基本架構

2.1 JobManager與TaskManager

Flink運行時包含了兩種類型的處理器:

**JobManager處理器:**也稱之為Master,用於協調分佈式執行,它們用來調度task,協調檢查點,協調失敗時恢復等。Flink運行時至少存在一個master處理器,如果配置高可用模式則會存在多個master處理器,它們其中有一個是leader,而其他的都是standby。

TaskManager處理器:也稱之為Worker,用於執行一個dataflow的task(或者特殊的subtask)、數據緩衝和data stream的交換,Flink運行時至少會存在一個worker處理器。

簡單圖示如下

Cris帶你快速入門Flink

Master和Worker處理器可以直接在物理機上啟動,或者通過像YARN這樣的資源調度框架啟動。

Worker連接到Master,告知自身的可用性進而獲得任務分配。

2.2 無界數據流與有界數據流

Flink用於處理有界和無界數據:

無界數據流無界數據流有一個開始但是沒有結束,它們不會在生成時終止並提供數據,必須連續處理無界流,也就是說必須在獲取後立即處理event。對於無界數據流我們無法等待所有數據都到達,因為輸入是無界的,並且在任何時間點都不會完成。處理無界數據通常要求以特定順序(例如事件發生的順序)獲取event,以便能夠推斷結果完整性,無界流的處理稱為流處理

有界數據流有界數據流有明確定義的開始和結束,可以在執行任何計算之前通過獲取所有數據來處理有界流,處理有界流不需要有序獲取,因為可以始終對有界數據集進行排序,有界流的處理也稱為批處理

Cris帶你快速入門Flink

在無界數據流和有界數據流中我們提到了批處理和流處理,這是大數據處理系統中常見的兩種數據處理方式。

批處理的特點是有界、持久、大量,批處理非常適合需要訪問全套記錄才能完成的計算工作,一般用於離線統計流處理的特點是無界、實時,流處理方式無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作,一般用於實時統計

在Spark生態體系中,對於批處理和流處理採用了不同的技術框架,批處理由SparkSQL實現,流處理由Spark Streaming實現,這也是大部分框架採用的策略,使用獨立的處理器分別實現批處理和流處理,而Flink可以同時實現批處理和流處理。

Flink是如何同時實現批處理與流處理的呢?答案是,Flink將批處理(即處理有限的靜態數據)視作一種特殊的流處理

Apache Flink是一個面向分佈式數據流處理和批量數據處理的開源計算平臺,它能夠基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們要實現的目標是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapReduce、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。

Flink在實現流處理和批處理時,與傳統的一些方案完全不同,它從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。

2.3 數據流編程模型

Flink提供了不同級別的抽象,以開發流或批處理作業,如下圖所示:

Cris帶你快速入門Flink

最底層級的抽象僅僅提供了有狀態流,它將通過過程函數(Process Function)被嵌入到DataStream API中。底層過程函數(Process Function) 與 DataStream API 相集成,使其可以對某些特定的操作進行底層的抽象,它允許用戶可以自由地處理來自一個或多個數據流的事件,並使用一致的容錯的狀態。除此之外,用戶可以註冊事件時間並處理時間回調,從而使程序可以處理複雜的計算。

實際上,大多數應用並不需要上述的底層抽象,而是針對核心API(Core APIs) 進行編程,比如DataStream API(有界或無界流數據)以及DataSet API(有界數據集)。這些API為數據處理提供了通用的構建模塊,比如由用戶定義的多種形式的轉換(transformations),連接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 為有界數據集提供了額外的支持,例如循環與迭代。這些API處理的數據類型以類(classes)的形式由各自的編程語言所表示。

Table API 以表為中心,其中表可能會動態變化(在表達流數據時)。Table API遵循(擴展的)關係模型:表有二維數據結構(schema)(類似於關係數據庫中的表),同時API提供可比較的操作,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什麼邏輯操作應該執行,而不是準確地確定這些操作代碼的看上去如何 。 儘管Table API可以通過多種類型的用戶自定義函數(UDF)進行擴展,其仍不如核心API更具表達能力,但是使用起來卻更加簡潔(代碼量更少)。除此之外,Table API程序在執行之前會經過內置優化器進行優化。

你可以在表與 DataStream/DataSet 之間無縫切換,以允許程序將 Table API 與 DataStream 以及 DataSet 混合使用

Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 類似,但是是以SQL查詢表達式的形式表現程序。SQL抽象與Table API交互密切,同時SQL查詢可以直接在Table API定義的表上執行。

三 Flink集群搭建

Flink可以選擇的部署方式有:

Local、Standalone(資源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。

我們主要對Standalone模式和Yarn模式下的Flink集群部署進行分析。

3.1 Standalone模式安裝

我們對standalone模式的Flink集群進行安裝,準備三臺虛擬機,其中一臺作為JobManager(hadoop101),另外兩臺作為TaskManager(hadoop102、hadoop103)。

  1. 首先官網下載

  2. 然後將下載的壓縮包發送到虛擬機上,解壓到指定位置

    Cris帶你快速入門Flink

  3. 然後修改配置文件

    [[email protected] conf]$ vim flink-conf.yaml
    

    Cris帶你快速入門Flink

    然後修改Worker 節點配置

    [[email protected] conf]$ vim slaves
    

    Cris帶你快速入門Flink

  4. 最後將 Flink 同步到其他兩臺 Worker 節點即可

    [[email protected] module]$ xsync flink-1.6.1/
    
  5. 啟動命令如下

    [[email protected] bin]$ ./start-cluster.sh
    

    非常簡單~

    通過 jps 查看進程情況

    [[email protected] bin]$ jpsall 
    ----------jps of hadoop101---------
    2491 StandaloneSessionClusterEntrypoint
    2555 Jps
    ----------jps of hadoop102---------
    2338 Jps
    2285 TaskManagerRunner
    ----------jps of hadoop103---------
    2212 Jps
    2159 TaskManagerRunner
    
  6. 訪問集群web界面(8081端口)

    出現如下界面表示 Flink 集群啟動成功

    Cris帶你快速入門Flink

  7. 簡單跑個 WC 任務

    Cris帶你快速入門Flink

    Cris帶你快速入門Flink

  8. 關閉集群

    [[email protected] bin]$ ./stop-cluster.sh 
    Stopping taskexecutor daemon (pid: 2285) on host hadoop102.
    Stopping taskexecutor daemon (pid: 2159) on host hadoop103.
    Stopping standalonesession daemon (pid: 2491) on host hadoop101.
    [[email protected] bin]$ jpsall 
    ----------jps of hadoop101---------
    3249 Jps
    ----------jps of hadoop102---------
    2842 Jps
    ----------jps of hadoop103---------
    2706 Jps
    

3.2 Yarn模式安裝

前四步同 Standalone 模式

  1. 明確虛擬機中已經設置好了環境變量HADOOP_HOME

  2. 啟動Hadoop集群(HDFS和Yarn)

  3. 在hadoop101節點提交Yarn-Session,使用安裝目錄下bin目錄中的yarn-session.sh腳本進行提交:

    [[email protected] ~]$ /opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 6 -jm 1024 -tm 1024 -nm test -d
    

    其中:

    -n(–container):TaskManager的數量。

    -s(–slots): 每個TaskManager的slot數量,默認一個slot一個core,默認每個taskmanager的slot的個數為1。

    -jm:JobManager的內存(單位MB)。

    -tm:每個taskmanager的內存(單位MB)。

    -nm:yarn 的appName(現在yarn的ui上的名字)。

    -d:後臺執行。

  4. 啟動後查看Yarn的Web頁面,可以看到剛才提交的會話:

    Cris帶你快速入門Flink

    查看進程信息

    Cris帶你快速入門Flink

  5. 簡單的跑個任務

    [[email protected] flink-1.6.1]$ ./bin/flink run -m yarn-cluster examples/batch/WordCount.jar
    

    終端直接打印結果

    Cris帶你快速入門Flink

    在看看web 界面

    Cris帶你快速入門Flink

四 Flink運行架構

4.1 任務提交流程

Cris帶你快速入門Flink

Flink任務提交後,Client向HDFS上傳Flink的Jar包和配置,之後向Yarn ResourceManager提交任務,ResourceManager分配Container資源並通知對應的NodeManager啟動ApplicationMaster

ApplicationMaster啟動後加載Flink的Jar包和配置構建環境,然後啟動JobManager,之後ApplicationMaster向ResourceManager申請資源啟動TaskManager,ResourceManager分配Container資源後,由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager

NodeManager加載Flink的Jar包和配置構建環境並啟動TaskManager,TaskManager啟動後向JobManager發送心跳包,並等待JobManager向其分配任務

4.2 TaskManager與Slots

每一個TaskManager是一個JVM進程,它可能會在獨立的線程上執行一個或多個subtask。為了控制一個worker能接收多少個task,worker通過task slot來進行控制(一個worker至少有一個task slot)。·

每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那麼它會將其管理的內存分成三份給各個slot。資源slot化意味著一個subtask將不需要跟來自其他job的subtask競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這裡不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。

通過調整task slot的數量,允許用戶定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味著每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味著更多的subtask可以共享同一個JVM。而在同一個JVM進程中的task將共享TCP連接(基於多路複用)和心跳消息。它們也可能共享數據集和數據結構,因此這減少了每個task的負載。

TaskSlot是靜態的概念,是指TaskManager具有的併發執行能力**,可以通過參數taskmanager.numberOfTaskSlots進行配置,而並行度parallelism是動態概念,即TaskManager運行程序時實際使用的併發能力,可以通過參數parallelism.default進行配置。

也就是說,假設一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task,一共9個TaskSlot,如果我們設置parallelism.default=1,即運行程序默認的並行度為1,9個TaskSlot只用了1個,有8個空閒,因此,設置合適的並行度才能提高效率。

4.3 Dataflow

Flink程序由Source、Transformation、Sink這三個核心組件組成,Source主要負責數據的讀取,Transformation主要負責對屬於的轉換操作,Sink負責最終數據的輸出,在各個組件之間流轉的數據稱為流(streams)。

Cris帶你快速入門Flink

Flink程序的基礎構建模塊是 (streams) 與 轉換(transformations)(需要注意的是,Flink的DataSet API所使用的DataSets其內部也是stream)。一個stream可以看成一箇中間結果,而一個transformations是以一個或多個stream作為輸入的某種operation,該operation利用這些stream進行計算從而產生一個或多個result stream。

在運行時,Flink上運行的程序會被映射成streaming dataflows,它包含了streams和transformations operators。每一個dataflow以一個或多個sources開始以一個或多個sinks結束,dataflow類似於任意的有向無環圖(DAG)。

Cris帶你快速入門Flink

4.4 並行數據流

Flink程序的執行具有並行、分佈式的特性。在執行過程中,一個 stream 包含一個或多個 stream partition ,而每一個 operator 包含一個或多個 operator subtask,這些operator subtasks在不同的線程、不同的物理機或不同的容器中彼此互不依賴得執行。

一個特定operator的subtask的個數被稱之為其parallelism(並行度)。一個stream的並行度總是等同於其producing operator的並行度。一個程序中,不同的operator可能具有不同的並行度。

Cris帶你快速入門Flink

Stream在operator之間傳輸數據的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決於operator的種類。

One-to-onestream(比如在source和map operator之間)維護著分區以及元素的順序。那意味著map operator的subtask看到的元素的個數以及順序跟source operator的subtask生產的元素的個數、順序相同,map、fliter、flatMap等算子都是one-to-one的對應關係。

Redistributing這種操作會改變數據的分區個數。每一個operator subtask依據所選擇的transformation發送數據到不同的目標subtask。例如,keyBy() 基於hashCode重分區、broadcast和rebalance會隨機重新分區,這些算子都會引起redistribute過程,而redistribute過程就類似於Spark中的shuffle過程。

4.5 task與operatorchains

出於分佈式執行的目的,Flink將operator的subtask鏈接在一起形成task,每個task在一個線程中執行。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換和基於緩存區的數據交換,在減少時延的同時提升吞吐量。鏈接的行為可以在編程API中進行指定。

下面這幅圖,展示了5個subtask以5個並行的線程來執行:

Cris帶你快速入門Flink

4.6 任務調度流程

Cris帶你快速入門Flink

客戶端不是運行時和程序執行的一部分,但它用於準備併發送dataflow給Master,然後,客戶端斷開連接或者維持連接以等待接收計算結果,客戶端可以以兩種方式運行:要麼作為Java/Scala程序的一部分被程序觸發執行,要麼以命令行./bin/flink run的方式執行。

五 Flink DataStream API

5.1 Flink運行模型

Cris帶你快速入門Flink

以上為Flink的運行模型,Flink的程序主要由三部分構成,分別為Source、Transformation、Sink。DataSource主要負責數據的讀取,Transformation主要負責對屬於的轉換操作,Sink負責最終數據的輸出。

5.2 Flink程序架構

每個Flink程序都包含以下的若干流程:

  • 獲得一個執行環境;(Execution Environment)

  • 加載/創建初始數據;(Source)

  • 指定轉換這些數據;(Transformation)

  • 指定放置計算結果的位置;(Sink)

  • 觸發程序執行

5.3 Environment

執行環境StreamExecutionEnvironment是所有Flink程序的基礎

創建執行環境有三種方式,分別為:

StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment

StreamExecutionEnvironment.getExecutionEnvironment

創建一個執行環境,表示當前執行程序的上下文。 如果程序是獨立調用的,則此方法返回本地執行環境;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什麼樣的運行環境,是最常用的一種創建執行環境的方式。

val env = StreamExecutionEnvironment.getExecutionEnvironment

5.4 Source

I 基於File的數據源

  1. readTextFile(path)

    一列一列的讀取遵循TextInputFormat規範的文本文件,並將結果作為String返回。

    object Test {
    def main(args: Array[String]): Unit = {
    // 1. 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 讀取指定路徑的文本文件
    val stream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    // 3. action 算子對 DataStream 中的數據打印
    stream.print()
    // 4. 啟動 Flink 應用
    executionEnvironment.execute("test")
    }
    }
    

    Terminal 打印結果

    1> apache spark hadoop flume
    1> kafka hbase hive flink
    4> apache spark hadoop flink
    5> kafka hbase hive flink
    6> sqoop hue oozie zookeeper
    8> apache spark hadoop flume
    3> kafka hbase oozie zookeeper
    2> sqoop hue oozie zookeeper
    7> flink oozie azakaban spark
    

    注意stream.print():每一行前面的數字代表這一行是哪一個並行線程輸出的。

    還可以根據指定的 fileInputFormat 來讀取文件

    readFile(fileInputFormat, path)

  2. 基於Socket的數據源

    從Socket中讀取信息

    object Test {
    def main(args: Array[String]): Unit = {
    // 1. 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = executionEnvironment.socketTextStream("localhost", 1234)
    // 3. action 算子對 DataStream 中的數據打印
    stream.print()
    // 4. 啟動 Flink 應用
    executionEnvironment.execute("test")
    }
    }
    

    Cris帶你快速入門Flink

  3. 基於集合(Collection)的數據源

    1. fromCollection(seq):從集合中創建一個數據流,集合中所有元素的類型是一致的

      val stream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2,3,4))
      
    2. fromCollection(Iterator):從迭代(Iterator)中創建一個數據流,指定元素數據類型的類由iterator返回

      val stream: DataStream[Int] = executionEnvironment.fromCollection(Iterator(3,1,2))
      
    3. fromElements(elements:_*):從一個給定的對象序列中創建一個數據流,所有的對象必須是相同類型

      val list = List(1,2,3)
      val stream: DataStream[List[Int]] = executionEnvironment.fromElements(list)
      
    4. generateSequence(from, to):從給定的間隔中並行地產生一個數字序列

      val stream: DataStream[Long] = executionEnvironment.generateSequence(1,10)
      

5.5 Sink

Data Sink 消費DataStream中的數據,並將它們轉發到文件、套接字、外部系統或者打印出。

Flink有許多封裝在DataStream操作裡的內置輸出格式。

1. writeAsText

將元素以字符串形式逐行寫入(TextOutputFormat),這些字符串通過調用每個元素的toString()方法來獲取。

2. WriteAsCsv

將元素以逗號分隔寫入文件中(CsvOutputFormat),行及字段之間的分隔是可配置的。每個字段的值來自對象的toString()方法。

3. print/printToErr

打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。或者也可以在輸出流中添加一個前綴,這個可以幫助區分不同的打印調用,如果並行度大於1,那麼輸出也會有一個標識由哪個任務產生的標誌。

4. writeUsingOutputFormat

自定義文件輸出的方法和基類(FileOutputFormat),支持自定義對象到字節的轉換。

5. writeToSocket

將元素寫入到socket中.

5.6 Transformation

1. map

DataStream → DataStream:輸入一個參數產生一個參數。

    // 初始化 Flink 執行環境
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
// 針對每一行數據前面添加指定字符串
val mapDataStream: DataStream[String] = dataStream.map("Apache:" + _)
mapDataStream.print()
// 啟動 Flink 應用
executionEnvironment.execute("test")

2. flatMap

DataStream → DataStream:輸入一個參數,產生0個、1個或者多個輸出。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
// 將每行數據按照空格分割成集合,最終 "壓平"
val mapDataStream: DataStream[String] = dataStream.flatMap(_.split(" "))
mapDataStream.print()

3. filter

DataStream → DataStream:結算每個元素的布爾值,並返回布爾值為true的元素。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val mapDataStream: DataStream[String] = dataStream.filter(_.contains("kafka"))

4. Connect

Cris帶你快速入門Flink

DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數據流,兩個數據流被Connect之後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

  // 初始化 Flink 執行環境
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val listDataStream: DataStream[Int] = executionEnvironment.fromCollection(List(1, 2, 3))
val connStreams: ConnectedStreams[String, Int] = dataStream.connect(listDataStream)
// map函數中的第一個函數作用於 ConnectedStreams 的第一個 DataStream;第二個函數作用於第二個 DataStream
connStreams.map(e => println(e + "-----"), println(_))
// 啟動 Flink 應用
executionEnvironment.execute("test")

測試效果如下:

Cris帶你快速入門Flink

針對 ConnectedStreams 的map 和 flatMap 操作稱之為 CoMap,CoFlatMap

作用於ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。

5. split

DataStream → SplitStream:根據某些特徵把一個DataStream拆分成兩個或者多個DataStream。

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val flatMapDStream: DataStream[String] = dataStream.flatMap(_.split(" "))
val splitDStream: SplitStream[String] = flatMapDStream.split(e => "hadoop".equals(e) match {
case true => List("hadoop")
case false => List("other")
})
splitDStream.select("hadoop").print()

Cris帶你快速入門Flink

通常配合 select 算子使用

6. Union

Cris帶你快速入門Flink

DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。

    val listDStream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2))
val unionDStream: DataStream[Int] = listDStream.union(listDStream)
unionDStream.print()

Cris帶你快速入門Flink

7. KeyBy

DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
result.print()

通常結合 reduce 等聚合算子使用

8. Reduce,Fold,Aggregations

KeyedStream → DataStream:一個分組數據流的聚合操作,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
val reduceDStream: DataStream[(String, Int)] = result.reduce((iter1, iter2) => (iter1._1, iter1._2 + iter2._2))
reduceDStream.print()

Cris帶你快速入門Flink

可以發現,Flink 並不是像 Spark 那樣將最後的總的統計結果返回,而是每次聚合統計都將結果返回,所以需要藉助 Flink 的Window 來進行數據的聚合統計(fold 和 aggregation同理)

其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的結果.

fold

KeyedStream → DataStream:一個有初始值的分組數據流的滾動摺疊操作,合併當前元素和前一次摺疊操作的結果,併產生一個新的值,返回的流中包含每一次摺疊的結果,而不是隻返回最後一次摺疊的最終結果。

Aggregations

KeyedStream → DataStream:分組數據流上的滾動聚合操作。min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含最小值的元素(同樣原理適用於max和maxBy),返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

六 Time 和 Window(重點)

6.1 Time

在Flink的流式處理中,會涉及到時間的不同概念,如下圖所示:

Cris帶你快速入門Flink

Event Time:是事件創建的時間。它通常由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。

Ingestion Time:是數據進入Flink的時間。

Processing Time:是每一個執行基於時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。

例如,一條日誌進入Flink的時間為2017-11-12 10:00:00.123,到達Window的系統時間為2017-11-12 10:00:01.234,日誌的內容如下:

2017-11-02 18:37:15.624 INFO Fail over to rm2

對於業務來說,要統計1min內的故障日誌個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日誌的生成時間進行統計。

通常我們需要指定日誌中的哪條數據是 eventTime

6.2 Window

Window可以分成兩類:

  • CountWindow:按照指定的數據條數生成一個Window,與時間無關。

  • TimeWindow:按照時間生成Window。

對於TimeWindow,可以根據窗口實現原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。

對於CountWindow 可以分為滾動窗口和滑動窗口

1. 滾動窗口(Tumbling Windows)

將數據依據固定的窗口長度對數據進行切片

特點時間對齊,窗口長度固定,沒有重疊

滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,並且不會出現重疊。例如:如果你指定了一個5分鐘大小的滾動窗口,窗口的創建如下圖所示:

Cris帶你快速入門Flink

適用場景:適合做BI統計等(做每個時間段的聚合計算)。

2. 滑動窗口(Sliding Windows)

滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成

特點時間對齊,窗口長度固定,有重疊

滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小於窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。

例如,你有10分鐘的窗口和5分鐘的滑動,那麼每個窗口中5分鐘的窗口裡包含著上個10分鐘產生的部分數據,如下圖所示:

Cris帶你快速入門Flink

適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。

3. 會話窗口(Session Windows)

由一系列事件組合一個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口

特點時間無對齊

session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間週期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉並且後續的元素將被分配到新的session窗口中去。

Cris帶你快速入門Flink

4. Window API

CountWindow

CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數。

  • 滾動窗口

    默認的CountWindow是一個滾動窗口,只需要指定窗口大小即可,當元素數量達到窗口大小時,就會觸發窗口的執行。

      def main(args: Array[String]): Unit = {
    // 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
    val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
    val strings: Array[String] = e.split(" ")
    (strings(0), strings(1).toInt)
    })
    val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
    // 只有等相同key 的元素個數達到3的時候才會進行 reduce 和 print 操作
    val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(3)
    val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
    reduceDStream.print()
    // 啟動 Flink 應用
    executionEnvironment.execute("test")
    }
    

    測試效果如下:

    Cris帶你快速入門Flink

  • 滑動窗口

    滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。

    下面代碼中的sliding_size設置為了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window範圍是該 key 的前4個元素。

      def main(args: Array[String]): Unit = {
    // 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
    val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
    val strings: Array[String] = e.split(" ")
    (strings(0), strings(1).toInt)
    })
    val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
    // 只有等相同key 的元素個數達到2的時候才會對該 key 的前4條數據進行 reduce 和 print 操作
    val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(4,2)
    val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
    reduceDStream.print()
    // 啟動 Flink 應用
    executionEnvironment.execute("test")
    }
    }
    

    Cris帶你快速入門Flink

TimeWindow

TimeWindow是將指定時間範圍內的所有數據組成一個window,一次對一個window裡面的所有數據進行計算。

  • 滾動窗口

    Flink默認的時間窗口根據Processing Time 進行窗口的劃分,將Flink獲取到的數據根據進入Flink的時間劃分到不同的窗口中。

        // 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
    val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
    val strings: Array[String] = e.split(" ")
    (strings(0), strings(1).toInt)
    })
    val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
    // 每3 秒對進入該窗口的所有相同key 的數據進行reduce 和 print 操作
    val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(3))
    val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
    reduceDStream.print()
    // 啟動 Flink 應用
    executionEnvironment.execute("test")
    

    Cris帶你快速入門Flink

  • 滑動窗口

    滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。

    下面代碼中的sliding_size設置為了2s,也就是說,窗口每2s就計算一次,每一次計算的window範圍是4s內的所有元素。

        // 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
    val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
    val strings: Array[String] = e.split(" ")
    (strings(0), strings(1).toInt)
    })
    val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
    // 每2 秒對進入該窗口的所有數據進行前 4 秒數據的 reduce 和 print 操作
    val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(4),Time
    .seconds(2))
    val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
    reduceDStream.print()
    // 啟動 Flink 應用
    executionEnvironment.execute("test")
    

    Cris帶你快速入門Flink

Window Fold

WindowedStream → DataStream:給窗口賦一個fold功能的函數,並返回一個fold後的結果。

// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 創建SocketSource
val stream = env.socketTextStream("localhost", 11111,'\n',3)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 執行fold操作
val streamFold = streamWindow.fold(100){
(begin, item) =>
begin + item._2
}
// 將聚合數據寫入文件
streamFold.print()
// 執行程序
env.execute("TumblingWindow")
Aggregation on Window

WindowedStream → DataStream:對一個window內的所有元素做聚合操作。min和 minBy的區別是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同樣的原理適用於 max 和 maxBy)。

// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 創建SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 執行聚合操作
val streamMax = streamWindow.max(1)
// 將聚合數據寫入文件
streamMax.print()
// 執行程序
env.execute("TumblingWindow")

七 EventTime與waterMark

7.1 EventTime的引入

在Flink的流式處理中,絕大部分的業務都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime

如果要使用EventTime,那麼需要引入EventTime的時間屬性,引入方式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給env創建的每一個stream追加時間特徵
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Cris帶你快速入門Flink

這裡日誌的時間是 Flink 根據我們的規則去解析生成的eventTime,而不是默認的 processingTime

而window 的時間區間是左閉右開的,及 2019-01-25 00:00:06 時間的日誌會進入第二個window

7.2 Watermark的引入

我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡等原因,導致亂序的產生,所謂亂序,就是指Flink接收到的事件的先後順序不是嚴格按照事件的Event Time順序排列的。

Cris帶你快速入門Flink

那麼此時出現一個問題,一旦出現亂序,如果只根據eventTime決定window的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了,這個特別的機制,就是Watermark。

Watermark是一種衡量Event Time進展的機制,它是數據本身的一個隱藏屬性,數據本身攜帶著對應的Watermark。

Watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現。

數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,因此,window的執行也是由Watermark觸發的。

Watermark可以理解成一個延遲觸發機制,我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然後認定eventTime小於maxEventTime – t的所有數據都已經到達,如果有窗口的停止時間等於maxEventTime – t,那麼這個窗口被觸發執行。

個人總結一下:針對進入窗口的每條數據,計算當前所有達到窗口的數據的最大eventTime,將這個eventTime和延遲時間(watermark)做減法,差值如果大於某一個窗口的的結束時間,那麼該窗口就進行算子操作

有序流的Watermarker如下圖所示:(Watermark設置為0)

Cris帶你快速入門Flink

亂序流的Watermarker如下圖所示:(Watermark設置為2)

Cris帶你快速入門Flink

當Flink接收到每一條數據時,都會產生一條Watermark,這條Watermark就等於當前所有到達數據中的maxEventTime 延遲時長,也就是說,Watermark是由數據攜帶的,一旦數據攜帶的Watermark比當前未觸發的窗口的停止時間要晚,那麼就會觸發相應窗口的執行。由於Watermark是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那麼沒有被觸發的窗口將永遠都不被觸發

上圖中,我們設置的允許最大延遲到達時間為2s,所以時間戳為7s的事件對應的Watermark是5s,時間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那麼時間戳為7s的事件到達時的Watermarker恰好觸發窗口1,時間戳為12s的事件到達時的Watermark恰好觸發窗口2。

7.3 測試代碼

    // 初始化 Flink 執行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 將 Flink 時間由默認的processingTime 設置為 eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source: DataStream[String] = env.socketTextStream("localhost", 1234)
// 設置watermark 以及如何解析每條日誌數據中的eventTime
val stream: DataStream[String] = source.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {
override def extractTimestamp(element: String): Long = {
val time: Long = element.split(" ")(0).toLong
println(time)
time
}
}
)
val keyStream: KeyedStream[(String, Int), Tuple] = stream.map(e => (e.split(" ")(1), 1)).keyBy(0)
// 設置滾動窗口的長度為5秒,及每5秒的eventTime 間隔計算一次
val windowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
val reduceStream: DataStream[(String, Int)] = windowStream.reduce(
(e1, e2) => (e1._1, e1._2 + e2._2)
)
reduceStream.print()
env.execute("test")
}

測試如下

Cris帶你快速入門Flink

Cris帶你快速入門Flink

如果watermark 設置為2,那麼等到7000(毫秒)以及大於這個時間的日誌進入window 的時候,才會進行第一個窗口的計算

如果窗口類型設置為 SlidingEventTimeWindows ,那麼watermark 影響的就是滑動窗口的計算時間,感興趣的可以自己試試

如果窗口類型設置為 EventTimeSessionWindows.withGap(Time.seconds(10)),那麼影響的就是相鄰兩條數據的時間間隔必須大於指定時間才會觸發計算

八 總結

Flink是一個真正意義上的流計算引擎,在滿足低延遲和低容錯開銷的基礎之上,完美的解決了exactly-once的目標,真是由於Flink具有諸多優點,越來越多的企業開始使用Flink作為流處理框架,逐步替換掉了原本的Storm和Spark技術框架。

相關文章

Cris的Docker學習筆記

關於VMwareWorkstation14Pro宿主機無法聯網問題解決方案

最新IDEA和Maven集成問題和解決

Anaconda(miniconda)和JupyterNotebook使用大揭祕