2016/11/14

slick Database Persistence in Scala play 2.5


在 scala play 2.5 framework 中,要將資料儲存在 DB 中有好幾種方式,都是以整合第三方套件的方式實作,我們測試了原始的 JDBC 以及 slick 兩種,以下是 slick 的部分。


slick 是 functional relational mapping database library,是以 functional programming 方式存取關聯式資料庫。


準備 slick project


首先以 activator 產生一個新的 project,我們是使用Play Framework 2.5 and Slick 3.1


activator new tst6 play-slick3-example

這個 template 原本是使用 H2 memory database,改用 mysql。


build.sbt


name := """play-slick-example"""

version := "1.0"

lazy val root = (project in file(".")).enablePlugins(PlayScala)

scalaVersion := "2.11.7"

routesGenerator := InjectedRoutesGenerator

resolvers += "scalaz-bintray" at "https://dl.bintray.com/scalaz/releases"

libraryDependencies ++= Seq(
    cache,
    ws,
    filters,
    "com.typesafe.play" %% "play-slick" % "2.0.0",
    "com.typesafe.play" %% "play-slick-evolutions" % "2.0.0",
    //"com.h2database" % "h2" % "1.4.187",
    "mysql" % "mysql-connector-java" % "5.1.36",
    "org.scalatestplus.play" %% "scalatestplus-play" % "1.5.0" % "test",
    specs2 % Test
)

resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"

fork in run := true

修改 logback.xml,主要是增加以下這一行的設定。


<logger name="slick.jdbc.JdbcBackend.statement"  level="DEBUG" />

完整的 loback.xml 內容如下


<!-- https://www.playframework.com/documentation/latest/SettingsLogger -->
<configuration>

<conversionRule conversionWord="coloredLevel" converterClass="play.api.libs.logback.ColoredLevel" />

<!--
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
    <file>${application.home:-.}/logs/application.log</file>
    <encoder>
        <pattern>%date [%level] from %logger in %thread\n\t%message%n%xException</pattern>
    </encoder>
</appender>
-->

<appender name="FILE"
          class="ch.qos.logback.core.rolling.RollingFileAppender">
    <append>true</append>
    <rollingPolicy
            class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <param name="FileNamePattern"
               value="${application.home:-.}/logs/application.%d{yyyy-MM-dd}.log.zip">
        </param>
    </rollingPolicy>
    <encoder>
        <!-- <pattern>%d %-5p %c %L%n %m%n</pattern> -->
        <!-- <charset class="java.nio.charset.Charset">UTF-8</charset>  -->
        <pattern>%date [%level] from %logger in %thread\n\t%message%n%xException</pattern>
    </encoder>
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
        <pattern>%coloredLevel %logger{15} in %thread\n\t%message%n%xException{10}</pattern>
    </encoder>
</appender>

<appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="FILE" />
</appender>

<appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="STDOUT" />
</appender>

<logger name="play" level="INFO" />
<logger name="application" level="DEBUG" />

<!-- Will log all statements -->
<logger name="slick.jdbc.JdbcBackend.statement"  level="DEBUG" />

<!-- Off these ones as they are annoying, and anyway we manage configuration ourselves -->
<logger name="com.avaje.ebean.config.PropertyMapLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.core.XmlConfigLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.lib.BackgroundThread" level="OFF" />
<logger name="com.gargoylesoftware.htmlunit.javascript" level="OFF" />

<root level="INFO">
    <appender-ref ref="ASYNCFILE" />
    <appender-ref ref="ASYNCSTDOUT" />
</root>

</configuration>

project 設定


修改 application.conf,slick 並不是使用 JDBC 的 DB connection,要另外設定 slick.dbs


#slick.dbs.default.driver="slick.driver.H2Driver$"
#slick.dbs.default.db.driver=org.h2.Driver
#slick.dbs.default.db.url="jdbc:h2:mem:play;DB_CLOSE_DELAY=-1"
//slick.dbs.default.db.user=user
//slick.dbs.default.db.password=""

slick.dbs.default.driver="slick.driver.MySQLDriver$"
slick.dbs.default.db.driver=com.mysql.jdbc.Driver
slick.dbs.default.db.url="jdbc:mysql://localhost:3306/playdb?useUnicode=true&amp;characterEncoding=utf-8"
slick.dbs.default.db.user="root"
slick.dbs.default.db.password="max168kit"
# HikariCP connection pool the min size is numThreads, and the max size is numThreads * 5
slick.dbs.default.db.numThreads=5
slick.dbs.default.db.queueSize=30
slick.dbs.default.db.connectionTimeout=15s
slick.dbs.default.db.connectionTestQuery="select 1"

ref: connection pool


DB evolution


slick 的 db evolution 跟 JDBC 的部分一樣。


application.conf


play.evolutions {
  # You can disable evolutions for a specific datasource if necessary
  db.default.enabled = true
  autoApply = true
  autoApplyDowns = true
}

conf/evolutons.default/1.sql


# DC schema
 
# --- !Ups


CREATE TABLE PROJECT (
    ID integer NOT NULL AUTO_INCREMENT PRIMARY KEY,
    NAME varchar(255) NOT NULL
);


CREATE TABLE TASK (
    ID integer NOT NULL AUTO_INCREMENT PRIMARY KEY,
    COLOR varchar(255) NOT NULL,
    STATUS varchar(255) NOT NULL,
    PROJECT integer NOT NULL,
    FOREIGN KEY (PROJECT) REFERENCES PROJECT (ID)
);


 
# --- !Downs

DROP TABLE TASK;
DROP TABLE PROJECT;

scala codes


這個部分的 code 都是由 play-slick3-example 這個 template 來的,並沒有做什麼修改,主要可以發現,Application 都是使用 Action.async 搭配 Future 的方式,進行非同步的處理。


conf/routes URI 的設定用到了 PUT, PATCH 比較少見的 HTTP Method,我們在測試時,可以用 Chrome Postman 進行測試。


GET           /                          controllers.Application.listProjects
PUT           /projects/:name            controllers.Application.createProject(name: String)
GET           /projects/list             controllers.Application.listProjects
GET           /projects/:id              controllers.Application.projects(id: Long)
PUT           /projects/:id/:name        controllers.Application.addTaskToProject(name: String, id: Long)
PATCH         /tasks/:id                 controllers.Application.modifyTask(id: Long, color:Option[String] ?= None)

DELETE        /projects/:name            controllers.Application.delete(name: String)

  • app/controllers/Application.scala

package controllers

import java.util.concurrent.{TimeoutException, TimeUnit}
import javax.inject.Inject

import akka.actor.ActorSystem
import models.{Project, ProjectRepo, TaskRepo}
import play.api.Logger
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.mvc.{Action, Controller}

import akka.pattern.after
import scala.concurrent.duration._
import scala.concurrent.Future

class Application @Inject()( projectRepo: ProjectRepo, taskRepo: TaskRepo, actorSystem: ActorSystem)
                           extends Controller {

  def addTaskToProject(color: String, projectId: Long) = Action.async { implicit rs =>
    projectRepo.addTask(color, projectId)
      .map{ _ =>  Redirect(routes.Application.projects(projectId)) }
  }

  def modifyTask(taskId: Long, color: Option[String]) = Action.async { implicit rs =>
    taskRepo.partialUpdate(taskId, color, None, None).map(i =>
    Ok(s"Rows affected : $i"))
  }
  def createProject(name: String)= Action.async { implicit rs =>
    projectRepo.create(name)
      .map(id => Ok(s"project $id created") )
  }

  def listProjects = Action.async { implicit rs =>
    projectRepo.all
      .map(projects => Ok(views.html.projects(projects)))
  }

  def projects(id: Long) = Action.async { implicit rs =>
    for {
      Some(project) <-  projectRepo.findById(id)
      tasks <- taskRepo.findByProjectId(id)
    } yield Ok(views.html.project(project, tasks))
  }

  def delete(name: String) = Action.async { implicit rs =>
    projectRepo.delete(name).map(num => Ok(s"$num projects deleted"))
  }
}

  • app/models/Project.scala

ProjectsTable 的部分是在定義 Projects


package models

import javax.inject.Inject
import play.api.Logger
import play.api.db.slick.DatabaseConfigProvider
import slick.dbio
import slick.dbio.Effect.Read
import slick.driver.JdbcProfile
import slick.jdbc.GetResult
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class Project(id: Long, name: String)


class ProjectRepo @Inject()(taskRepo: TaskRepo)(protected val dbConfigProvider: DatabaseConfigProvider) {

  val dbConfig = dbConfigProvider.get[JdbcProfile]
  val db = dbConfig.db
  import dbConfig.driver.api._
  private val Projects = TableQuery[ProjectsTable]

  private def _findById(id: Long): DBIO[Option[Project]] =
    Projects.filter(_.id === id).result.headOption

  private def _findByName(name: String): Query[ProjectsTable, Project, List] =
    Projects.filter(_.name === name).to[List]

  def findById(id: Long): Future[Option[Project]] =
    db.run(_findById(id))

  def findByName(name: String): Future[List[Project]] =
    db.run(_findByName(name).result)

  def all: Future[List[Project]] =
    db.run(Projects.to[List].result)

  def create(name: String): Future[Long] = {
    val project = Project(0, name)
    db.run(Projects returning Projects.map(_.id) += project)
  }

  def delete(name: String): Future[Int] = {
    val query = _findByName(name)

    val interaction = for {
      projects        <- query.result
      _               <- DBIO.sequence(projects.map(p => taskRepo._deleteAllInProject(p.id)))
      projectsDeleted <- query.delete
    } yield projectsDeleted

    db.run(interaction.transactionally)
  }

  def addTask(color: String, projectId: Long): Future[Long] = {
    val interaction = for {
      Some(project) <- _findById(projectId)
      id <- taskRepo.insert(Task(0, color, TaskStatus.ready, project.id))
    } yield id

    db.run(interaction.transactionally)
  }


  // 定義 Project 這個 table
  private class ProjectsTable(tag: Tag) extends Table[Project](tag, "PROJECT") {

    // primary key 為 ID
    def id = column[Long]("ID", O.AutoInc, O.PrimaryKey)
    def name = column[String]("NAME")

    def * = (id, name) <> (Project.tupled, Project.unapply)
    def ? = (id.?, name.?).shaped.<>({ r => import r._; _1.map(_ => Project.tupled((_1.get, _2.get))) }, (_: Any) => throw new Exception("Inserting into ? projection not supported."))

  }
}

  • app/models/Task.scala

比較特別的地方,是 TaskStatus,還有 taskStatusColumnType 做自動轉換,這裡對應到一個 DB 欄位,有 ready/set/go 這三種數值,所以用 Enumeration 定義該欄位。


package models

import javax.inject.Inject

import play.api.db.slick.DatabaseConfigProvider

import slick.driver.JdbcProfile

import scala.concurrent.Future

case class Task(id: Long, color: String, status: TaskStatus.Value, project: Long) {

  def patch(color: Option[String], status: Option[TaskStatus.Value], project: Option[Long]): Task =
    this.copy(color = color.getOrElse(this.color),
              status = status.getOrElse(this.status),
              project = project.getOrElse(this.project))

}

object TaskStatus extends Enumeration {
  val ready = Value("ready")
  val set = Value("set")
  val go = Value("go")
}

class TaskRepo @Inject()(protected val dbConfigProvider: DatabaseConfigProvider) {
  val dbConfig = dbConfigProvider.get[JdbcProfile]
  val db = dbConfig.db
  import dbConfig.driver.api._
  private val Tasks = TableQuery[TasksTable]


  def findById(id: Long): Future[Task] =
    db.run(Tasks.filter(_.id === id).result.head)

  def findByColor(color: String): DBIO[Option[Task]] =
    Tasks.filter(_.color === color).result.headOption

  def findByProjectId(projectId: Long): Future[List[Task]] =
    db.run(Tasks.filter(_.project === projectId).to[List].result)

  def findByReadyStatus: DBIO[List[Task]] =
    Tasks.filter(_.status === TaskStatus.ready).to[List].result

  def partialUpdate(id: Long, color: Option[String], status: Option[TaskStatus.Value], project: Option[Long]): Future[Int] = {
    import scala.concurrent.ExecutionContext.Implicits.global

    val query = Tasks.filter(_.id === id)

    val update = query.result.head.flatMap {task =>
      query.update(task.patch(color, status, project))
    }

    db.run(update)
  }

  def all(): DBIO[Seq[Task]] =
    Tasks.result

  def insert(Task: Task): DBIO[Long] =
    Tasks returning Tasks.map(_.id) += Task

  def _deleteAllInProject(projectId: Long): DBIO[Int] =
    Tasks.filter(_.project === projectId).delete

  private class TasksTable(tag: Tag) extends Table[Task](tag, "TASK") {

    def id = column[Long]("ID", O.AutoInc, O.PrimaryKey)
    def color = column[String]("COLOR")
    def status = column[TaskStatus.Value]("STATUS")
    def project = column[Long]("PROJECT")

    def * = (id, color, status, project) <> (Task.tupled, Task.unapply)
    def ? = (id.?, color.?, status.?, project.?).shaped.<>({ r => import r._; _1.map(_ => Task.tupled((_1.get, _2.get, _3.get, _4.get))) }, (_: Any) => throw new Exception("Inserting into ? Taskion not supported."))
  }

  implicit val taskStatusColumnType = MappedColumnType.base[TaskStatus.Value, String](
    _.toString, string => TaskStatus.withName(string))

}

測試


Postman 是一個 Chome APP,可以進行 http 測試



Database Persistence in Scala play 2.5 using slick.zip


plain SQL in slick


如果要在 slick 裡面使用 SQL,則用別的方式進行。


ref:
slick plain sql
play-slick 版本對應


activator 中有個 sample template,但只拿來參考而已。


activator new test7 slick-plainsql-3.0

  • conf/routes

首先在 routes 的地方加上 URI


GET           /pj/:id                    controllers.Application.getproject(id:Long)
GET           /pj2/:id                   controllers.Application.getproject2(id:Long)
GET           /pj3/:id                   controllers.Application.getproject3(id:Long)

GET           /update/:id/:name          controllers.Application.updateproject(id:Long, name:String)

Application 的 constructor 要 @Inject() actorSystem: ActorSystem,getproject 的部分是在測試 Future 的寫法,這個部分故意留下錯誤的寫法,因為 Future 區塊裡面的 callback codes,是使用不同的 thread 執行的。


getproject3 加上了非同步的 timeout 檢查,如果 2 seconds 後沒有完成,就會產生 Excetpion。


package controllers

import java.util.concurrent.{TimeoutException, TimeUnit}
import javax.inject.Inject

import akka.actor.ActorSystem
import models.{Project, ProjectRepo, TaskRepo}
import play.api.Logger
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.mvc.{Action, Controller}

import akka.pattern.after
import scala.concurrent.duration._
import scala.concurrent.Future

class Application @Inject()( projectRepo: ProjectRepo, taskRepo: TaskRepo, actorSystem: ActorSystem)
                           extends Controller {

  def getproject(id:Long) = Action {
    Logger.info(s"getproject id=${id}")
    var result = "DB project:\n"

    val pjs: Future[Seq[String]] = projectRepo.findByIdCustom2(id)
    pjs.map{
          // 因為是非同步,這裡是在不同 thread 執行的
      cs => {
        for(c<-cs) {
          Logger.info("c="+c.toString)
          // c=test
          result += c.toString
          Logger.info(s"result=$result")
        }
      }
    }
    // 這是錯誤的寫法
    // 只有 DB project:  沒有 db 查詢的結果....  錯誤 的結果
    Logger.info(s"result=$result")
    Ok(result)
  }

  def getproject2(id: Long) = Action.async {

    //val futureNumRowsDeleted = scala.concurrent.Future{ Transaction.delete(id) }
    val pjs: Future[Seq[String]] = projectRepo.findByIdCustom2(id)

    pjs.map {
      var result = "DB project:\n"
      cs => {
        for (c <- cs) {
          Logger.info("c=" + c.toString)
          // c=test
          result += c.toString+" "
          Logger.info(s"result=$result")
        }
      }
        Logger.info(s"result=$result")
        Ok(result)
    }
  }

  def getproject3(id: Long) = Action.async {

    //val futureNumRowsDeleted = scala.concurrent.Future{ Transaction.delete(id) }
    val pjs: Future[Seq[Project]] = projectRepo.findByIdCustom3(id)

    //val timeout = play.api.libs.concurrent.Promise.timeout("Past max time", 2, TimeUnit.SECONDS)

    //val timeoutFuture = after(2.second, actorSystem.scheduler)(Future.successful("Oops"))
    val timeoutFuture = after(2.second, actorSystem.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

    Future.firstCompletedOf(Seq(pjs, timeoutFuture)).map {
      case cs: Seq[Project]  => {
        var result = "DB project:\n"
        for (c <- cs) {
          Logger.info("c=" + c.name)
          // c=test
          result += c.name + " "
          Logger.info(s"result=$result")
        }
        Ok(result)
      }
      case t: TimeoutException => InternalServerError(t.getMessage)
    }
  }
  
  def updateproject(id: Long, name:String) = Action.async {
    val pjs: Future[Int] = projectRepo.updateproject(id, name)

    val timeoutFuture = after(2.second, actorSystem.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

    Future.firstCompletedOf(Seq(pjs, timeoutFuture)).map {
      case cs: Int  => {
        val result = s"DB update result:${cs}\n"
        Ok(result)
      }
      //case t: Any => InternalServerError()
    }
  }
}

app/models/Project.scala


findByIdCustom3 是在測試直接轉換成 Project 物件的方法。


package models

import javax.inject.Inject
import play.api.Logger
import play.api.db.slick.DatabaseConfigProvider
import slick.dbio
import slick.dbio.Effect.Read
import slick.driver.JdbcProfile
import slick.jdbc.GetResult
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class Project(id: Long, name: String)


class ProjectRepo @Inject()(taskRepo: TaskRepo)(protected val dbConfigProvider: DatabaseConfigProvider) {

  val dbConfig = dbConfigProvider.get[JdbcProfile]
  val db = dbConfig.db
  import dbConfig.driver.api._
  private val Projects = TableQuery[ProjectsTable]

  /////////////

  def findByIdCustom2(id:Long): Future[Seq[String]] = {
    val query = sql"select NAME from PROJECT where ID=$id".as[(String)]
    //db.run(query)
    Logger.info("findByIdCustom2")
    //db.run(query)

    val f: Future[Seq[String]] = db.run(query)

    //f.onSuccess { case s => println(s"Result: $s") }

    f
  }

  implicit val getProjectResult = GetResult(r => Project(r.nextLong, r.nextString))

  def findByIdCustom3(id:Long): Future[Seq[Project]] = {
    // as[(Project)] 的部分會參考到 上面的 getProjectResult 的 GetResult,並自動轉換為 Project 物件
    val query = sql"select ID, NAME from PROJECT where ID=$id".as[(Project)]
    //db.run(query)
    Logger.info("findByIdCustom3")
    //db.run(query)

    val f: Future[Seq[Project]] = db.run(query)

    //f.onSuccess { case s => println(s"Result: $s") }

    f
  }
  
  def updateproject(id:Long, name:String): Future[Int] = {
    val update = sqlu"update PROJECT set name=$name where ID=$id"
    //db.run(query)
    Logger.info("updateproject")
    //db.run(query)

    val f: Future[Int] = db.run(update)

    //f.onSuccess { case s => println(s"Result: $s") }

    f
  }
}

測試就直接用 curl 就可以了


curl -v 'http://localhost:9000/pj/1'
curl -v 'http://localhost:9000/pj2/1'
curl -v 'http://localhost:9000/pj3/1'

curl -v 'http://localhost:9000/update/1/test2'

沒有留言:

張貼留言