分類
發燒車訊

Spark文檔閱讀之二:Programming Guides – Quick Start

Quick Start: https://spark.apache.org/docs/latest/quick-start.html

 

在Spark 2.0之前,Spark的編程接口為RDD (Resilient Distributed Dataset)。而在2.0之後,RDDs被Dataset替代。Dataset很像RDD,但是有更多優化。RDD仍然支持,不過強烈建議切換到Dataset,以獲得更好的性能。 RDD文檔: https://spark.apache.org/docs/latest/rdd-programming-guide.html Dataset文檔: https://spark.apache.org/docs/latest/sql-programming-guide.html  

一、最簡單的Spark Shell交互分析

scala> val textFile = spark.read.textFile("README.md")   # 構建一個Dataset
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.count()  # Dataset的簡單計算
res0: Long = 104 

scala> val linesWithSpark = textFile.filter(line => line.contain("Spark"))  # 由現有Dataset生成新Dataset
res1: org.apache.spark.sql.Dataset[String] = [value: string]
# 等價於:
# res1 = new Dataset()
# for line in textFile:
#     if line.contain("Spark"):
#         res1.append(line)
# linesWithSpark = res1

scala> linesWithSpark.count()
res2: Long = 19

# 可以將多個操作串行起來
scala> textFile.filter(line => line.contain("Spark")).count()
res3: Long = 19

 

進一步的Dataset分析:

scala> textFile.map(line => line.split(" ").size).reduce((a,b) => if (a > b) a else b)
res12: Int = 16
# 其實map和reduce就是兩個普通的算子,不要被MapReduce中一個map配一個reduce、先map后reduce的思想所束縛
# map算子就是對Dataset的元素X計算fun(X),並且將所有f(X)作為新的Dataset返回
# reduce算子其實就是通過兩兩計算fun(X,Y)=Z,將Dataset中的所有元素歸約為1個值

# 也可以引入庫進行計算
scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res14: Int = 16

# 還可以使用其他算子
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

# flatMap算子也是對Dataset的每個元素X執行fun(X)=Y,只不過map的res是
#     res.append(Y),如[[Y11, Y12], [Y21, Y22]],結果按元素區分
# 而flatMap是
#     res += Y,如[Y11, Y12, Y21, Y22],各元素結果合在一起

# groupByKey算子將Dataset的元素X作為參數傳入進行計算f(X),並以f(X)作為key進行分組,返回值為KeyValueGroupedDataset類型
# 形式類似於(key: k; value: X1, X2, ...),不過KeyValueGroupedDataset不是一個Dataset,value列表也不是一個array
# 注意:這裏的textFile和textFile.flatMap都是Dataset,不是RDD,groupByKey()中可以傳func;如果以sc.textFile()方法讀文件,得到的是RDD,groupByKey()中間不能傳func

# identity就是函數 x => x,即返回自身的函數

# KeyValueGroupedDataset的count()方法返回(key, len(value))列表,結果是Dataset類型

scala> wordCounts.collect()
res37: Array[(String, Long)] = Array((online,1), (graphs,1), ...
# collect操作:將分佈式存儲在集群上的RDD/Dataset中的所有數據都獲取到driver端

 

數據的cache:

scala> linesWithSpark.cache()  # in-memory cache,讓數據在分佈式內存中緩存
res38: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res41: Long = 19

 

二、最簡單的獨立Spark任務(spark-submit提交)

需提前安裝sbt,sbt是scala的編譯工具(Scala Build Tool),類似java的maven。 brew install sbt   1)編寫SimpleApp.scala

import org.apache.spark.sql.SparkSession

object SimpleApp {
    def main(args: Array[String]) {
        val logFile = "/Users/dxm/work-space/spark-2.4.5-bin-hadoop2.7/README.md"
        val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
        val logData = spark.read.textFile(logFile).cache()
        val numAs = logData.filter(line => line.contains("a")).count()  # 包含字母a的行數
        val numBs = logData.filter(line => line.contains("b")).count()  # 包含字母b的行數
        println(s"Lines with a: $numAs, Lines with b: $numBs")
        spark.stop()
    }
}

 

2)編寫sbt依賴文件build.sbt

name := "Simple Application"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

 

其中,”org.apache.spark” %% “spark-sql” % “2.4.5”這類庫名可以在網上查到,例如https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.0.0

 

3)使用sbt打包 目錄格式如下,如果SimpleApp.scala和build.sbt放在一個目錄下會編不出來

$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

 

sbt目錄格式要求見官方文檔 https://www.scala-sbt.org/1.x/docs/Directories.html

src/
  main/
    resources/
       <files to include in main jar here>
    scala/
       <main Scala sources>
    scala-2.12/
       <main Scala 2.12 specific sources>
    java/
       <main Java sources>
  test/
    resources
       <files to include in test jar here>
    scala/
       <test Scala sources>
    scala-2.12/
       <test Scala 2.12 specific sources>
    java/
       <test Java sources>

 

使用sbt打包

# 打包
$ sbt package
...
[success] Total time: 97 s (01:37), completed 2020-6-10 10:28:24
# jar包位於 target/scala-2.12/simple-application_2.12-1.0.jar

 

4)提交並執行Spark任務

$ bin/spark-submit --class "SimpleApp" --master spark://xxx:7077 ../scala-tests/SimpleApp/target/scala-2.12/simple-application_2.12-1.0.jar
# 報錯:Caused by: java.lang.ClassNotFoundException: scala.runtime.LambdaDeserialize
# 參考:https://stackoverflow.com/questions/47172122/classnotfoundexception-scala-runtime-lambdadeserialize-when-spark-submit
# 這是spark版本和scala版本不匹配導致的

 

查詢spark所使用的scala的版本

$ bin/spark-shell --master spark://xxx:7077

scala> util.Properties.versionString
res0: String = version 2.11.12

 

修改build.sbt: scalaVersion := “2.11.12” 從下載頁也可驗證,下載的spark 2.4.5使用的是scala 2.11  

 

重新sbt package,產出位置變更為target/scala-2.11/simple-application_2.11-1.0.jar 再次spark-submit,成功

 

$ bin/spark-submit --class "SimpleApp" --master spark://xxx:7077 ../scala-tests/SimpleApp/target/scala-2.11/simple-application_2.11-1.0.jar 
Lines with a: 61, Lines with b: 30

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準