2017/2/20

Spark tutorial


要使用 Spark 之前,一般會先遇到 scala 這個語言的熟悉度的問題,當有了一定的語言程度後,再來就是 scala IDE 的選擇,目前的狀況,還是IDEA 會比 scala IDE for Eclipse 好用。接下來就是下載跟安裝 spark,然後進行 WordCount 的範例練習,以下記錄怎麼安裝與設定 stand alone 的 spark 開發環境。


要下載哪一個 spark 套件


當我們連結到 Download Apache Spark 時,首先遇到的問題,就是要下載哪一個 spark release 套件。


基本的原則如下:


如果要直接下載已經編譯好的 binary 套件,我們可以根據 Hadoop 的版本決定要下載哪一個,但如果像我們一樣,不打算安裝 Hadoop 就直接測試,就直接選最新版的 spark-1.6.1-bin-hadoop2.6.tgz 就好了,下載後解壓縮,馬上就可以使用 spark-shell,或直接取得 all-in-one 的 spark-assembly-1.6.1-hadoop2.6.0.jar 套件。


如果我們要編譯 source code,就下載預設的 1.6.1(Mar 09 2016) spark release,Package type 選擇 Source Code:spark-1.6.1.tgz


由於目前 spark 預設是使用 scala 2.10 版,使用預先編譯的 spark 就必須要使用 scala 2.10 版,如果像要改成 2.11,就一定要自己重新編譯 spark,目前 spark 的 JDBC component 還不支援 scala 2.11。


Building for Scala 2.11 有兩行指令說明如何將 spark 由 2.10 調整為 2.11,我們同時把 hadoop 版本改為 2.6。


./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package

編譯 spark 要花的時間很久,以我現在的環境花了 40 分鐘。


[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 39:33 min
[INFO] Finished at: 2016-04-29T09:23:03+08:00
[INFO] Final Memory: 452M/2703M

也可以使用 sbt 來編譯 spark,編譯後會得到 spark-assembly 的 jar。


sbt/sbt assembly

如果要修改 spark souce code,可以啟用增量編譯模式,避免每一次修改都要花很久的時間重新編譯。


export SPARK_PREPEND_CLASSES=true
sbt/sbt compile

unset SPARK_PREPEND_CLASSES

在 compile 前面加上 ~ 可以避免每一次都重開一次新的 sbt console


sbt/sbt ~ compile

可以用 sbt 或是 mvn 指令查閱 dependency map


sbt/sbt dependency-tree

mvn -DskipTests install
mvb dependency:tree

如果要設定 spark source 的開發環境,可以用以下的指令產生 IDEA project file


git clone https://github.com/apache/spark
sbt/sbt gen-idea

Spark 開發環境 in IDEA


  1. 在 IDEA 建立新的 scala project: sparktest

  2. 在 project 中建立一個 lib 目錄,把 spark-assembly-1.6.1-hadoop2.6.0.jar 放在那個目錄中

  3. 在 File -> Project Structure -> Libraries 點 "+",然後把 lib 目錄加入 project 中

  4. 取得一個文字檔的測試資料 pg5000.txt ,將檔案放在新建立的 data 目錄中

  5. 將 RunWordCount.scala 放在 src 目錄中,程式會計算 pg5000.txt 裡面每一個字出現的數量


    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.rdd.RDD
    
    object RunWordCount {
      def main(args: Array[String]): Unit = {
    
        // 以這兩行設定不顯示 spark 內部的訊息
        Logger.getLogger("org").setLevel(Level.OFF)
        System.setProperty("spark.ui.showConsoleProgress", "false")
    
        // 清除 output folder
        FileUtils.deleteDirectory(new File("data/output"))
    
        println("執行RunWordCount")
    
        // 設定 application 提交到 MASTER 指向的 cluster 或是 local 執行的模式
        // local[4] 代表是在本地以 四核心的 CPU 執行
        val sc = new SparkContext(new SparkConf().setAppName("wordCount").setMaster("local[4]"))
    
        println("讀取文字檔...")
        val textFile = sc.textFile("data/pg5000.txt") 
    
        println("開始建立RDD...")
        // flapMap 是取出文字檔的每一行資料,並以 " " 進行 split,分成一個一個的 word
        // map 是將每一個 word 轉換成 (word, 1) 的 tuple
        // reduceByKey 會根據 word 這個 key,將後面的 1 加總起來,就會得到 (word, 數量) 的結果
        val countsRDD = textFile.flatMap(line => line.split(" "))
          .map(word => (word, 1))
          .reduceByKey(_ + _) 
    
        println("儲存結果至文字檔...")
        try {
          countsRDD.saveAsTextFile("data/output") 
          println("存檔成功")
        } catch {
          case e: Exception => println("輸出目錄已經存在,請先刪除原有目錄");
        }
    
      }
    }
  6. 我們可以直接在 IDEA 就執行這個測試程式


    執行RunWordCount
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    16/04/29 16:28:50 INFO Slf4jLogger: Slf4jLogger started
    16/04/29 16:28:50 INFO Remoting: Starting remoting
    16/04/29 16:28:50 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.151:56205]
    讀取文字檔...
    開始建立RDD...
    儲存結果至文字檔...
    存檔成功
    
    Process finished with exit code 0
  7. 最後產生的結果有三個檔案,其中 part-00000 及 part-00001 裡面存了每一個 word 的發生次數


    _SUCCESS
    part-00000
    part-00001

會產生兩個檔案的原因是因為,spark 本身是平行運算的工具,所以會自動產生多個 partitions。


如果需要將結果整合成一個檔案,就必須使用 coalesce,在程式的最後面,用 countsRDD.coalesce(1).saveAsTextFile 將結果輸出到新目錄,也會得到一個檔案的結果。


try {
      countsRDD.coalesce(1).saveAsTextFile("data/output2")
      println("存檔成功")
    } catch {
      case e: Exception => println("輸出目錄已經存在,請先刪除原有目錄");
    }

匯出程式


  1. 在 IDEA 選擇 "File" -> "Project Structure" -> "Artifact"

  2. 點擊 "+" -> "JAR" -> "From modules with dependencies"

  3. Main Class 填成 "RunWordCount",輸出目錄的最後面改為 "out"

  4. 選擇 "Build" -> "Build Artifacts",就能在 out 目錄取得 sparktest.jar 檔

  5. 這樣就能在另一台機器執行 sparktest


java -jar sparktest.jar

References


HADOOP+SPARK大數據巨量分析與機器學習整合開發實戰


Spark大資料分析實戰

2017/2/13

scopt: command line parsing library in Scala


在 Java 要製作一個 command line 工具可以使用 Apache Commons cli,不過在 scala,有另一個更簡潔的 library: scopt,可以幫助我們製作 cli 程式。


libraryDependencies


根據 scopt github 的說明,我們應該在 build.sbt 中加上這樣的 libraryDependencies 宣告設定


libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"

但我們使用起來覺得有點問題,搜尋了 maven Group: com.github.scopt,看起來這個 library 有針對 scala 的版本提供不同的 library,因為我們是使用 scala 2.11.8,所以就將 libraryDependencies 改成以下這樣


"com.github.scopt" % "scopt_2.11" % "3.5.0",

Config


使用 scopt 之前,要先定義一個用來存放 cli parsing 結果的 case class: Config,我們是要做一個 License File 的產生工具,所以 Config 裡面存的都是 license 需要的資料。


  case class Config(mode: String = "",
                    ver: Boolean = false,
                    getmid: Boolean = false,
                    keyfile: String ="",

                    lictype:String ="",
                    product:String ="",
                    version:String ="",
                    name:String ="",
                    company:String="",
                    copies:Int=1,
                    mid:String="",
                    validfrom:String="",
                    goodthru:String=""
                   ) {
    def copy(mode: String = mode, ver: Boolean = ver,
             getmid:Boolean = getmid,
             keyfile: String = keyfile,

             lictype: String = lictype,
             product: String = product,
             version: String = version,
             name: String = name,
             company: String = company,
             copies: Int = copies,
             mid: String = mid,
             validfrom: String = validfrom,
             goodthru: String = goodthru
            ) =
      new Config(mode, ver, getmid, keyfile, lictype, product, version, name, company,
        copies, mid, validfrom, goodthru)
  }

Parser


接下來是使用 Config 產生 OptionParser,Parser 中是以第一個參數 "mode" 作為不同指令的判斷,我們提供了四個指令:key, lic, dec, --getmid, --ver,另外還有一個基本的 --help,每一個指令都有一個縮寫。


我們可以先看 help 列印出來的結果,最前面的 Usage 是這個程式的使用方式,然後有兩個基本的 --ver 及 --getmid 方法。


接下來是 key, lic, dec 這三個獨立指令的說明,每一個指令都有相關的參數,最後一行是 --help 列印 help 頁面的部分。


[info] Running license.LicenseBuilder -h
[info] License Builder 0.1
[info] Usage: license.LicenseBuilder [key|lic|dec] [options] <args>...
[info]
[info]   -v, --ver                Prints the version number.
[info]   -i, --getmid             Prints the machine id.
[info] Command: key keyfile
[info]   generate RSA key file
[info]   keyfile                  gen key files with key filename prefix
[info] Command: lic [options]
[info]   generate license file
[info]   -k, --prikeyfile <value>
[info]                            private key file prefix
[info]   -l, --lictype <value>    Evaluation/Standard/Enterprise
[info]   -p, --product <value>    product name, ex: kokome
[info]   -e, --version <value>    product version number, ex: 3.0.0
[info]   -n, --name <value>       licensed name, ex: kokome
[info]   -o, --company <value>    licensed company name, ex: maxkit
[info]   -c, --copies <value>     licensed number of users, ex: 5
[info]   -m, --mid <value>        machine id
[info]   -v, --validfrom <value>  licensed valid from date ex: 2016/01/01
[info]   -g, --goodthru <value>   licensed good thru date ex: 2016/12/31
[info] Command: dec keyfile
[info]   decode maxkit.lic
[info]   keyfile                  decode maxkit.lic with key filename prefix
[info]   -h, --help               prints this usage text

看了 help 的說明後,再去看 OptionParser 的寫法,就比較能清楚地分辨不同指令區塊的部分。


val parser = new scopt.OptionParser[Config]("license.LicenseBuilder") {
    head("License Builder", LicenseBuilder.ver)

    //activator "runMain license.LicenseBuilder -v"
    opt[Unit]("ver").abbr("v").action( (_, c) => c.copy(ver = true)).
      text("Prints the version number.")

    //activator "runMain license.LicenseBuilder -i"
    opt[Unit]("getmid").abbr("i").action( (_, c) => c.copy(getmid = true)).
      text("Prints the machine id.")

    //activator "runMain license.LicenseBuilder key maxkit"
    cmd("key").action( (x, c) => c.copy(mode = "key")).
      children(
        arg[String]("keyfile").unbounded().required().action( (x, c) => c.copy(keyfile = x)).
          text("gen key files with key filename prefix")
      ).text("  generate RSA key file")

    //activator "runMain license.LicenseBuilder lic -k maxkit -l Enterprise -p kokome -e 3.0.0 -n kokome -o maxkit -c 10 -m 1234 -v 2016/10/01 -g 2116/01/01"
    cmd("lic").action( (_, c) => c.copy(mode = "lic")).
      children(
        opt[String]('k', "prikeyfile").required().action( (x,c) => c.copy(keyfile=x) ).
          text("private key file prefix"),

        opt[String]('l', "lictype").required().action( (x,c) => c.copy(lictype=x) ).
          text("Evaluation/Standard/Enterprise"),

        opt[String]('p', "product").required().action( (x,c) => c.copy(product=x) ).
          text("product name, ex: kokome"),

        opt[String]('e', "version").required().action( (x,c) => c.copy(version=x) ).
          text("product version number, ex: 3.0.0"),

        opt[String]('n', "name").required().action( (x,c) => c.copy(name=x) ).
          text("licensed name, ex: kokome"),

        opt[String]('o', "company").required().action( (x,c) => c.copy(company=x) ).
          text("licensed company name, ex: maxkit"),

        opt[Int]('c', "copies").required().action( (x,c) => c.copy(copies=x) ).
          text("licensed number of users, ex: 5"),

        opt[String]('m', "mid").required().action( (x,c) => c.copy(mid=x) ).
          text("machine id"),

        opt[String]('v', "validfrom").required().action( (x,c) => c.copy(validfrom=x) ).
          text("licensed valid from date ex: 2016/01/01"),

        opt[String]('g', "goodthru").required().action( (x,c) => c.copy(goodthru=x) ).
          text("licensed good thru date ex: 2016/12/31")

      ).text("  generate license file")

    //activator "runMain license.LicenseBuilder dec maxkit"
    cmd("dec").action( (x, c) => c.copy(mode = "dec")).
      children(
        arg[String]("keyfile").unbounded().required().action( (x, c) => c.copy(keyfile = x)).
          text("decode maxkit.lic with key filename prefix")
      ).text("  decode maxkit.lic")

    //activator "runMain license.LicenseBuilder --help"
    help("help").abbr("h").text("prints this usage text")
  }

  parser.parse(args, Config()) match {
    case Some(config) => {
      // gen privat/pubilic key pairs
      if (config.mode == "key") LicenseBuilder.key(config.keyfile)

      // gen license file
      if (config.mode == "lic") LicenseBuilder.lic(config.keyfile, config.lictype, config.product,
        config.version, config.name, config.company, config.copies,
        config.mid, config.validfrom, config.goodthru)

      // decode license file
      if (config.mode == "dec") LicenseBuilder.dec(config.keyfile)

      // get machine if
      if (config.getmid) LicenseBuilder.getmid

      // print LicenseBuilder version
      if (config.ver) println("LicenseBuilder Version is: " + LicenseBuilder.ver)
    }
    case None => println("Please use -h for usage")
  }

完整的程式


package license

import java.io.File
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.commons.codec.binary.Base64
import org.apache.commons.io.FileUtils
import play.api.Logger
import utils.StringUtil

object LicenseBuilder extends App {
  val ver = "0.1"

  case class Config(mode: String = "",
                    ver: Boolean = false,
                    getmid: Boolean = false,
                    keyfile: String ="",

                    lictype:String ="",
                    product:String ="",
                    version:String ="",
                    name:String ="",
                    company:String="",
                    copies:Int=1,
                    mid:String="",
                    validfrom:String="",
                    goodthru:String=""
                   ) {
    def copy(mode: String = mode, ver: Boolean = ver,
             getmid:Boolean = getmid,
             keyfile: String = keyfile,

             lictype: String = lictype,
             product: String = product,
             version: String = version,
             name: String = name,
             company: String = company,
             copies: Int = copies,
             mid: String = mid,
             validfrom: String = validfrom,
             goodthru: String = goodthru
            ) =
      new Config(mode, ver, getmid, keyfile, lictype, product, version, name, company,
        copies, mid, validfrom, goodthru)
  }

  def key(keyfile: String) = {
    println(s"generate key pairs with filename prefix ${keyfile}")
  }

  def getmid() = {
    val mid = LicenseId.getLicenseId
    println(s"mid = ${mid}")
  }

  def dec(keyfile:String) = {
    println(s"decode license maxkit.lic with ${keyfile}.prikey.dat")

  }

  def lic(keyfile:String, lictype:String,
          product:String, version:String,
          name:String, company:String,
          copies:Int, mid:String,
          validfrom:String, goodthru:String) = {

    println(s"gen license with ${keyfile}.prikey.dat, lictype=${lictype}," +
      s"product=${product}, version=${version}, name=${name}, company=${company}, " +
      s"copies=${copies}, mid=${mid}, validfrom=${validfrom}, goodthru=${goodthru}")
      
  }

  val parser = new scopt.OptionParser[Config]("license.LicenseBuilder") {
    head("License Builder", LicenseBuilder.ver)

    //activator "runMain license.LicenseBuilder -v"
    opt[Unit]("ver").abbr("v").action( (_, c) => c.copy(ver = true)).
      text("Prints the version number.")

    //activator "runMain license.LicenseBuilder -i"
    opt[Unit]("getmid").abbr("i").action( (_, c) => c.copy(getmid = true)).
      text("Prints the machine id.")

    //activator "runMain license.LicenseBuilder key maxkit"
    cmd("key").action( (x, c) => c.copy(mode = "key")).
      children(
        arg[String]("keyfile").unbounded().required().action( (x, c) => c.copy(keyfile = x)).
          text("gen key files with key filename prefix")
      ).text("  generate RSA key file")

    //activator "runMain license.LicenseBuilder lic -k maxkit -l Enterprise -p kokome -e 3.0.0 -n kokome -o maxkit -c 10 -m 1234 -v 2016/10/01 -g 2116/01/01"
    cmd("lic").action( (_, c) => c.copy(mode = "lic")).
      children(
        opt[String]('k', "prikeyfile").required().action( (x,c) => c.copy(keyfile=x) ).
          text("private key file prefix"),

        opt[String]('l', "lictype").required().action( (x,c) => c.copy(lictype=x) ).
          text("Evaluation/Standard/Enterprise"),

        opt[String]('p', "product").required().action( (x,c) => c.copy(product=x) ).
          text("product name, ex: kokome"),

        opt[String]('e', "version").required().action( (x,c) => c.copy(version=x) ).
          text("product version number, ex: 3.0.0"),

        opt[String]('n', "name").required().action( (x,c) => c.copy(name=x) ).
          text("licensed name, ex: kokome"),

        opt[String]('o', "company").required().action( (x,c) => c.copy(company=x) ).
          text("licensed company name, ex: maxkit"),

        opt[Int]('c', "copies").required().action( (x,c) => c.copy(copies=x) ).
          text("licensed number of users, ex: 5"),

        opt[String]('m', "mid").required().action( (x,c) => c.copy(mid=x) ).
          text("machine id"),

        opt[String]('v', "validfrom").required().action( (x,c) => c.copy(validfrom=x) ).
          text("licensed valid from date ex: 2016/01/01"),

        opt[String]('g', "goodthru").required().action( (x,c) => c.copy(goodthru=x) ).
          text("licensed good thru date ex: 2016/12/31")

      ).text("  generate license file")

    //activator "runMain license.LicenseBuilder dec maxkit"
    cmd("dec").action( (x, c) => c.copy(mode = "dec")).
      children(
        arg[String]("keyfile").unbounded().required().action( (x, c) => c.copy(keyfile = x)).
          text("decode maxkit.lic with key filename prefix")
      ).text("  decode maxkit.lic")

    //activator "runMain license.LicenseBuilder --help"
    help("help").abbr("h").text("prints this usage text")
  }

  parser.parse(args, Config()) match {
    case Some(config) => {
      // gen privat/pubilic key pairs
      if (config.mode == "key") LicenseBuilder.key(config.keyfile)

      // gen license file
      if (config.mode == "lic") LicenseBuilder.lic(config.keyfile, config.lictype, config.product,
        config.version, config.name, config.company, config.copies,
        config.mid, config.validfrom, config.goodthru)

      // decode license file
      if (config.mode == "dec") LicenseBuilder.dec(config.keyfile)

      // get machine if
      if (config.getmid) LicenseBuilder.getmid

      // print LicenseBuilder version
      if (config.ver) println("LicenseBuilder Version is: " + LicenseBuilder.ver)
    }
    case None => println("Please use -h for usage")
  }
}

Reference


scala 命令行解析

2017/2/6

OpenJDK


Oracle JDK 長久以來並沒有被追討授權費用的問題,但因為 JDK 本來就是以 BCL 授權,並不是整個 JDK 都是免費使用的,再加上Oracle 開始追討 Java 授權費,企業客戶頭痛,所以要開始注意這個問題。Oracle 取締未經適當授權的 Java 用戶 提供了如何安全地使用 Oracle JDK 的一些 hint,不過最根本的方法就是換成 OpenJDK。


OpenJDK 是以 GPL with Classpath Exception 授權,classpath exception 就是可以在 proprietary 軟體中使用 OpenJDK 的意思。


OpenJDK 8 已經跟 Oracle JDK 沒有什麼差異,在 Linux Server 中,都已經可以很快速就將 JDK 轉換到 OpenJDK 上面,不過 windows 跟 MacOS 就麻煩了一些,但基本上後面這兩個 OS 都是開發環境,只是下載使用,沒有散佈,繼續用 Oracle JDK 應該也可以。


CentOS


ref: CentOS7 使用yum命令安装Java SDK


$ yum search java | grep -i --color JDK

java-1.8.0-openjdk.x86_64 : OpenJDK Runtime Environment
java-1.8.0-openjdk-debug.x86_64 : OpenJDK Runtime Environment with full debug on
java-1.8.0-openjdk-demo.x86_64 : OpenJDK Demos
java-1.8.0-openjdk-demo-debug.x86_64 : OpenJDK Demos with full debug on
java-1.8.0-openjdk-devel.x86_64 : OpenJDK Development Environment
java-1.8.0-openjdk-devel-debug.x86_64 : OpenJDK Development Environment with
java-1.8.0-openjdk-headless.x86_64 : OpenJDK Runtime Environment
java-1.8.0-openjdk-headless-debug.x86_64 : OpenJDK Runtime Environment with full
java-1.8.0-openjdk-javadoc.noarch : OpenJDK API Documentation
java-1.8.0-openjdk-javadoc-debug.noarch : OpenJDK API Documentation for packages
java-1.8.0-openjdk-src.x86_64 : OpenJDK Source Bundle
java-1.8.0-openjdk-src-debug.x86_64 : OpenJDK Source Bundle for packages with

openjdk 的安裝路徑 /usr/lib/jvm/


yum install java-1.8.0-openjdk  java-1.8.0-openjdk-devel

以 alternatives 調整執行檔的目標


alternatives --config java
alternatives --config javac
alternatives --config javadoc
alternatives --config javah
alternatives --config javap

設定環境變數


vi /etc/profile

export JAVA_HOME=/usr/lib/jvm/java-openjdk
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin

Debian


ref: How to download and install prebuilt OpenJDK packages


apt-get update
apt-get install openjdk-8-jdk

openjdk8 的路徑是 /usr/lib/jvm/java-8-openjdk-amd64


update-alternatives --display java
update-alternatives --display javac
update-alternatives --display javadoc
update-alternatives --display javah
update-alternatives --display javap

java -version

設定環境變數


vi /etc/profile

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin

openjdk for windows


Redhat Develper 提供了 windows 的 openjdk binary installer OpenJDK now available for Windows


openjdk for macos


build OpenJDKs at home on Linux and OSX


How to build and package OpenJDK 8 on OSX


https://www.zhihu.com/question/40816585
安装Homebrew然后在终端sudo brew install openjdk