要使用 Spark 之前,一般會先遇到 scala 這個語言的熟悉度的問題,當有了一定的語言程度後,再來就是 scala IDE 的選擇,目前的狀況,還是IDEA 會比 scala IDE for Eclipse 好用。接下來就是下載跟安裝 spark,然後進行 WordCount 的範例練習,以下記錄怎麼安裝與設定 stand alone 的 spark 開發環境。
要下載哪一個 spark 套件
當我們連結到 Download Apache Spark 時,首先遇到的問題,就是要下載哪一個 spark release 套件。
基本的原則如下:
如果要直接下載已經編譯好的 binary 套件,我們可以根據 Hadoop 的版本決定要下載哪一個,但如果像我們一樣,不打算安裝 Hadoop 就直接測試,就直接選最新版的 spark-1.6.1-bin-hadoop2.6.tgz 就好了,下載後解壓縮,馬上就可以使用 spark-shell,或直接取得 all-in-one 的 spark-assembly-1.6.1-hadoop2.6.0.jar 套件。
如果我們要編譯 source code,就下載預設的 1.6.1(Mar 09 2016) spark release,Package type 選擇 Source Code:spark-1.6.1.tgz。
由於目前 spark 預設是使用 scala 2.10 版,使用預先編譯的 spark 就必須要使用 scala 2.10 版,如果像要改成 2.11,就一定要自己重新編譯 spark,目前 spark 的 JDBC component 還不支援 scala 2.11。
Building for Scala 2.11 有兩行指令說明如何將 spark 由 2.10 調整為 2.11,我們同時把 hadoop 版本改為 2.6。
./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package
編譯 spark 要花的時間很久,以我現在的環境花了 40 分鐘。
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 39:33 min
[INFO] Finished at: 2016-04-29T09:23:03+08:00
[INFO] Final Memory: 452M/2703M
也可以使用 sbt 來編譯 spark,編譯後會得到 spark-assembly 的 jar。
sbt/sbt assembly
如果要修改 spark souce code,可以啟用增量編譯模式,避免每一次修改都要花很久的時間重新編譯。
export SPARK_PREPEND_CLASSES=true
sbt/sbt compile
unset SPARK_PREPEND_CLASSES
在 compile 前面加上 ~ 可以避免每一次都重開一次新的 sbt console
sbt/sbt ~ compile
可以用 sbt 或是 mvn 指令查閱 dependency map
sbt/sbt dependency-tree
mvn -DskipTests install
mvb dependency:tree
如果要設定 spark source 的開發環境,可以用以下的指令產生 IDEA project file
git clone https://github.com/apache/spark
sbt/sbt gen-idea
Spark 開發環境 in IDEA
在 IDEA 建立新的 scala project: sparktest
在 project 中建立一個 lib 目錄,把 spark-assembly-1.6.1-hadoop2.6.0.jar 放在那個目錄中
在 File -> Project Structure -> Libraries 點 "+",然後把 lib 目錄加入 project 中
取得一個文字檔的測試資料 pg5000.txt ,將檔案放在新建立的 data 目錄中
將 RunWordCount.scala 放在 src 目錄中,程式會計算 pg5000.txt 裡面每一個字出現的數量
import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.rdd.RDD object RunWordCount { def main(args: Array[String]): Unit = { // 以這兩行設定不顯示 spark 內部的訊息 Logger.getLogger("org").setLevel(Level.OFF) System.setProperty("spark.ui.showConsoleProgress", "false") // 清除 output folder FileUtils.deleteDirectory(new File("data/output")) println("執行RunWordCount") // 設定 application 提交到 MASTER 指向的 cluster 或是 local 執行的模式 // local[4] 代表是在本地以 四核心的 CPU 執行 val sc = new SparkContext(new SparkConf().setAppName("wordCount").setMaster("local[4]")) println("讀取文字檔...") val textFile = sc.textFile("data/pg5000.txt") println("開始建立RDD...") // flapMap 是取出文字檔的每一行資料,並以 " " 進行 split,分成一個一個的 word // map 是將每一個 word 轉換成 (word, 1) 的 tuple // reduceByKey 會根據 word 這個 key,將後面的 1 加總起來,就會得到 (word, 數量) 的結果 val countsRDD = textFile.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) println("儲存結果至文字檔...") try { countsRDD.saveAsTextFile("data/output") println("存檔成功") } catch { case e: Exception => println("輸出目錄已經存在,請先刪除原有目錄"); } } }
我們可以直接在 IDEA 就執行這個測試程式
執行RunWordCount Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/04/29 16:28:50 INFO Slf4jLogger: Slf4jLogger started 16/04/29 16:28:50 INFO Remoting: Starting remoting 16/04/29 16:28:50 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.151:56205] 讀取文字檔... 開始建立RDD... 儲存結果至文字檔... 存檔成功 Process finished with exit code 0
最後產生的結果有三個檔案,其中 part-00000 及 part-00001 裡面存了每一個 word 的發生次數
_SUCCESS part-00000 part-00001
會產生兩個檔案的原因是因為,spark 本身是平行運算的工具,所以會自動產生多個 partitions。
如果需要將結果整合成一個檔案,就必須使用 coalesce,在程式的最後面,用 countsRDD.coalesce(1).saveAsTextFile 將結果輸出到新目錄,也會得到一個檔案的結果。
try {
countsRDD.coalesce(1).saveAsTextFile("data/output2")
println("存檔成功")
} catch {
case e: Exception => println("輸出目錄已經存在,請先刪除原有目錄");
}
匯出程式
在 IDEA 選擇 "File" -> "Project Structure" -> "Artifact"
點擊 "+" -> "JAR" -> "From modules with dependencies"
Main Class 填成 "RunWordCount",輸出目錄的最後面改為 "out"
選擇 "Build" -> "Build Artifacts",就能在 out 目錄取得 sparktest.jar 檔
這樣就能在另一台機器執行 sparktest
java -jar sparktest.jar
References
HADOOP+SPARK大數據巨量分析與機器學習整合開發實戰
沒有留言:
張貼留言