Spark Streaming支持實時數據流的可擴展(scalable)、高吞吐(high-throughput)、容錯(fault-tolerant)的流處理(stream processing)。
架構圖
特性如下:
-
可線性伸縮至超過數百個節點;
-
實現亞秒級延遲處理;
-
可與Spark批處理和交互式處理無縫集成;
-
提供簡單的API實現複雜算法;
-
更多的流方式支持,包括Kafka、Flume、Kinesis、Twitter、ZeroMQ等。
原理
Spark在接收到實時輸入數據流后,將數據劃分成批次(divides the data into batches),然後轉給Spark Engine處理,按批次生成最後的結果流(generate the final stream of results in batches)。
API
DStream
DStream(Discretized Stream,離散流)是Spark Stream提供的高級抽象連續數據流。
-
組成:一個DStream可看作一個RDDs序列。
-
核心思想:將計算作為一系列較小時間間隔的、狀態無關的、確定批次的任務,每個時間間隔內接收的輸入數據被可靠存儲在集群中,作為一個輸入數據集。
-
特性:一個高層次的函數式編程API、強一致性以及高校的故障恢復。
-
應用程序模板:
-
模板1
-
模板2
WordCount示例
Input DStream
Input DStream是一種從流式數據源獲取原始數據流的DStream,分為基本輸入源(文件系統、Socket、Akka Actor、自定義數據源)和高級輸入源(Kafka、Flume等)。
- Receiver:
-
每個Input DStream(文件流除外)都會對應一個單一的Receiver對象,負責從數據源接收數據並存入Spark內存進行處理。應用程序中可創建多個Input DStream并行接收多個數據流。
-
每個Receiver是一個長期運行在Worker或者Executor上的Task,所以會佔用該應用程序的一個核(core)。如果分配給Spark Streaming應用程序的核數小於或等於Input DStream個數(即Receiver個數),則只能接收數據,卻沒有能力全部處理(文件流除外,因為無需Receiver)。
-
Spark Streaming已封裝各種數據源,需要時參考官方文檔。
Transformation Operation
-
常用Transformation
* map(func) :對源DStream的每個元素,採用func函數進行轉換,得到一個新的DStream;
* flatMap(func):與map相似,但是每個輸入項可用被映射為0個或者多個輸出項;
* filter(func):返回一個新的DStream,僅包含源DStream中滿足函數func的項;
* repartition(numPartitions):通過創建更多或者更少的分區改變DStream的并行程度;
* union(otherStream):返回一個新的DStream,包含源DStream和其他DStream的元素;
* count():統計源DStream中每個RDD的元素數量;
* reduce(func):利用函數func聚集源DStream中每個RDD的元素,返回一個包含單元素RDDs的新DStream;
* countByValue():應用於元素類型為K的DStream上,返回一個(K,V)鍵值對類型的新DStream,每個鍵的值是在原DStream的每個RDD中的出現次數;
* reduceByKey(func, [numTasks]):當在一個由(K,V)鍵值對組成的DStream上執行該操作時,返回一個新的由(K,V)鍵值對組成的DStream,每一個key的值均由給定的recuce函數(func)聚集起來;
* join(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, (V, W))鍵值對的新DStream;
* cogroup(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, Seq[V], Seq[W])的元組;
* transform(func):通過對源DStream的每個RDD應用RDD-to-RDD函數,創建一個新的DStream。支持在新的DStream中做任何RDD操作。
-
updateStateByKey(func)
-
updateStateByKey可對DStream中的數據按key做reduce,然後對各批次數據累加
-
WordCount的updateStateByKey版本
-
transform(func)
-
通過對原DStream的每個RDD應用轉換函數,創建一個新的DStream。
-
官方文檔代碼舉例
-
Window operations
-
窗口操作:基於window對數據transformation(個人認為與Storm的tick相似,但功能更強大)。
-
參數:窗口長度(window length)和滑動時間間隔(slide interval)必須是源DStream批次間隔的倍數。
-
舉例說明:窗口長度為3,滑動時間間隔為2;上一行是原始DStream,下一行是窗口化的DStream。
-
常見window operation
有狀態轉換包括基於滑動窗口的轉換和追蹤狀態變化(updateStateByKey)的轉換。
基於滑動窗口的轉換
* window(windowLength, slideInterval) 基於源DStream產生的窗口化的批數據,計算得到一個新的DStream;
* countByWindow(windowLength, slideInterval) 返迴流中元素的一個滑動窗口數;
* reduceByWindow(func, windowLength, slideInterval) 返回一個單元素流。利用函數func聚集滑動時間間隔的流的元素創建這個單元素流。函數func必須滿足結合律,從而可以支持并行計算;
* reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 應用到一個(K,V)鍵值對組成的DStream上時,會返回一個由(K,V)鍵值對組成的新的DStream。每一個key的值均由給定的reduce函數(func函數)進行聚合計算。注意:在默認情況下,這個算子利用了Spark默認的併發任務數去分組。可以通過numTasks參數的設置來指定不同的任務數;
* reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每個窗口的reduce值,是基於先前窗口的reduce值進行增量計算得到的;它會對進入滑動窗口的新數據進行reduce操作,並對離開窗口的老數據進行“逆向reduce”操作。但是,只能用於“可逆reduce函數”,即那些reduce函數都有一個對應的“逆向reduce函數”(以InvFunc參數傳入);
* countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當應用到一個(K,V)鍵值對組成的DStream上,返回一個由(K,V)鍵值對組成的新的DStream。每個key的值都是它們在滑動窗口中出現的頻率。
-
官方文檔代碼舉例
-
join(otherStream, [numTasks])
-
連接數據流
-
官方文檔代碼舉例1
-
官方文檔代碼舉例2
Output Operation
緩存與持久化
-
通過persist()將DStream中每個RDD存儲在內存。
-
Window operations會自動持久化在內存,無需显示調用persist()。
-
通過網絡接收的數據流(如Kafka、Flume、Socket、ZeroMQ、RocketMQ等)執行persist()時,默認在兩個節點上持久化序列化后的數據,實現容錯。
Checkpoint
-
用途:Spark基於容錯存儲系統(如HDFS、S3)進行故障恢復。
-
分類:
-
元數據檢查點:保存流式計算信息用於Driver運行節點的故障恢復,包括創建應用程序的配置、應用程序定義的DStream operations、已入隊但未完成的批次。
-
數據檢查點:保存生成的RDD。由於stateful transformation需要合併多個批次的數據,即生成的RDD依賴於前幾個批次RDD的數據(dependency chain),為縮短dependency chain從而減少故障恢復時間,需將中間RDD定期保存至可靠存儲(如HDFS)。
-
使用時機:
-
Stateful transformation:updateStateByKey()以及window operations。
-
需要Driver故障恢復的應用程序。
-
使用方法
-
Stateful transformation
streamingContext.checkpoint(checkpointDirectory)
-
需要Driver故障恢復的應用程序(以WordCount舉例):如果checkpoint目錄存在,則根據checkpoint數據創建新StreamingContext;否則(如首次運行)新建StreamingContext。
-
checkpoint時間間隔
-
方法:
dstream.checkpoint(checkpointInterval)
-
原則:一般設置為滑動時間間隔的5-10倍。
-
分析:checkpoint會增加存儲開銷、增加批次處理時間。當批次間隔較小(如1秒)時,checkpoint可能會減小operation吞吐量;反之,checkpoint時間間隔較大會導致lineage和task數量增長。
性能調優
降低批次處理時間
-
數據接收并行度
-
增加DStream:接收網絡數據(如Kafka、Flume、Socket等)時會對數據反序列化再存儲在Spark,由於一個DStream只有Receiver對象,如果成為瓶頸可考慮增加DStream。
-
設置“spark.streaming.blockInterval”參數:接收的數據被存儲在Spark內存前,會被合併成block,而block數量決定了Task數量;舉例,當批次時間間隔為2秒且block時間間隔為200毫秒時,Task數量約為10;如果Task數量過低,則浪費了CPU資源;推薦的最小block時間間隔為50毫秒。
-
顯式對Input DStream重新分區:在進行更深層次處理前,先對輸入數據重新分區。
inputStream.repartition(<number of partitions>)
-
數據處理并行度:reduceByKey、reduceByKeyAndWindow等operation可通過設置“spark.default.parallelism”參數或顯式設置并行度方法參數控制。
-
數據序列化:可配置更高效的Kryo序列化。
設置合理批次時間間隔
-
原則:處理數據的速度應大於或等於數據輸入的速度,即批次處理時間大於或等於批次時間間隔。
-
方法:
-
先設置批次時間間隔為5-10秒以降低數據輸入速度;
-
再通過查看log4j日誌中的“Total delay”,逐步調整批次時間間隔,保證“Total delay”小於批次時間間隔。
內存調優
-
持久化級別:開啟壓縮,設置參數“spark.rdd.compress”。
-
GC策略:在Driver和Executor上開啟CMS。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※網頁設計公司推薦不同的風格,搶佔消費者視覺第一線
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※南投搬家公司費用需注意的眉眉角角,別等搬了再說!
※新北清潔公司,居家、辦公、裝潢細清專業服務
※教你寫出一流的銷售文案?