Phoenix channel 是一個 conversation,channel 會發送, 接收 messages,並保存 state,這些 messages 稱為 events,state 會存放在稱為 socket 的 struct 內。
conversation 就是 topic,就像是 chat room, local map , a game, or a video。超過一個人在同一時間對相同的主題有興趣,Channels 可用在多個 users 之間互通訊息。因為這是用 erlang isolated, dedicated process 實作的。
網頁的 request/response 是 stateless,但 conversation 是長時間運作的的 process,是 stateful 的。
Phoenix Clients with ES6
利用 ECMAScript 6 JavaScript 功能實作 client,ES6的code 可以 transpile 為 ES5,實作 client 對 video 新增 annotations,並發送給所有 users。
每一個 Phonenix conversation 是一個 Topic,因此要先確認,要以什麼為 Topic,目前是以 video 為 topic。
新增 /web/static/js/video.js
import Player from "./player"
let Video = {
init(socket, element){ if(!element){ return }
let playerId = element.getAttribute("data-player-id")
let videoId = element.getAttribute("data-id")
socket.connect()
Player.init(element.id, playerId, () => {
this.onReady(videoId, socket)
})
},
onReady(videoId, socket){
let msgContainer = document.getElementById("msg-container")
let msgInput = document.getElementById("msg-input")
let postButton = document.getElementById("msg-submit")
let vidChannel = socket.channel("videos:" + videoId)
// TODO join the vidChannel
}
}
export default Video
socket.connect()
可產生一個 websocket
注意 let vidChannel = socket.channel("videos:" + videoId)
,這是 ES6 Client 連接 Phoenix VideoChannel 的 channel。
Topic 需要一個 identifier,我們選用 "videos:" + videoId 這個格式,我們需要在 topic 內對其他相同 topic 的 users 發送 events。
修改 /web/static/js/app.js
import "phoenix_html"
import socket from "./socket"
import Video from "./video"
Video.init(socket, document.getElementById("video"))
如果瀏覽 http://localhost:4000/watch/2-elixir 時,js console 會出現這樣的錯誤訊息。
Unable to join – {reason: "unmatched topic"}
Preparing Our Server for the Channel
傳統 web request/response 每一次都會產生一個 connection,也就是 Plug.Conn,每個新的 request 都會有新的 conn,接下來用 pipeline 處理,最後 die。
channel 的流程跟上面的不同,client 會用 socket 建立 connection,在建立連線後,socket 會在整個 connection 的過程中持續被 transformed。socket 就是 client/server 之間持續運作的 conversation。
首先要決定是否能建立 connection,然後要產生 initial socket,包含所有 custom application setup。
修改 /web/static/js/socket.js
import {Socket} from "phoenix"
let socket = new Socket("/socket", {
params: {token: window.userToken},
logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
})
export default socket
let socket = new Socket("/socket"....
會讓 Phoenix 建立新的 socket。
查看 /lib/rumbl/endpoint.ex,有一個 /socket 的定義,UserSocket 就是處理 socket connection 的 module。
socket "/socket", Rumbl.UserSocket
先看一下 /web/channels/user_socket.ex 的內容
defmodule Rumbl.UserSocket do
use Phoenix.Socket
## Transports
transport :websocket, Phoenix.Transports.WebSocket
# transport :longpoll, Phoenix.Transports.LongPoll
def connect(_params, socket) do
{:ok, socket}
end
def id(_socket), do: nil
end
UserSocket 使用 connection 處理所有 channel processes。 Phoenix 支援兩種 Transport protocols: websocket 或是 longpoll,也可以自訂一個。除了 transport 不同之外,其他的部分都是一樣的。
使用 shared socket abstraction,然後讓 Phoenix 處理其他的工作。
UserSocket 有兩個 functions: connect 及 id。id 是用來識別 socket,以便儲存不同的 state。目前 id 為 nil,connect 基本上就是接受所有連線。
接下來要利用 rumbl.Auth 增加 socket authentication。
如果再一次瀏覽網址 http://localhost:4000/watch/2-elixir,在 js console 就可看到此 debug message,表示已經連上 server。
transport: connected to ws://localhost:4000/socket/websocket?token=undefined&vsn=1.0.0
Creating the Channel
channel 就是 a conversation on a topic,topic 的 id 為 videos:video_id,我們希望 user 能取得某個 topic 的所有 events,也就是 video 的所有 annotations。
topic id 的一般形式就是 topic:subtopic,topic 為 resource name,subtopic 為 ID
因為 URL 就有參數,可以識別 conversation,也就是 :id
Joining a Channel
在 /web/channels/user_socket.ex 增加一行
channel "videos:*", Rumbl.VideoChannel
videos:* conversation 以 resource name 及 ID 作為 topic 的分類方式
Building the Channel Module
新增 /web/channels/video_channel.ex
defmodule Rumbl.VideoChannel do
use Rumbl.Web, :channel
def join("videos:" <> video_id, _params, socket) do
{:ok, assign(socket, :video_id, String.to_integer(video_id))}
end
end
channel 的第一個 callback 就是 join,clients 可 join topics on a channel,如果成功就回傳 {:ok, socket},拒絕連線就回傳 {:error, socket}
現在先讓所有 socket 可任意 join video topics,並新增 video ID (由 topic 取得) 到 socket.assigns。socket 會在 socket.assigns 儲存某個 conversation 的所有狀態 state。
socket 會被 transformed 為 loop,而不是一連串的 pipelines。當 events 進出 channel 時,可同時存取 socket state。
修改 /web/static/js/video.js
import Player from "./player"
let Video = {
init(socket, element){ if(!element){ return }
let playerId = element.getAttribute("data-player-id")
let videoId = element.getAttribute("data-id")
socket.connect()
Player.init(element.id, playerId, () => {
this.onReady(videoId, socket)
})
},
onReady(videoId, socket){
let msgContainer = document.getElementById("msg-container")
let msgInput = document.getElementById("msg-input")
let postButton = document.getElementById("msg-submit")
// 以 "videos:" videoId 產生新的 channel object
let vidChannel = socket.channel("videos:" + videoId)
vidChannel.join()
.receive("ok", resp => console.log("joined the video channel", resp) )
.receive("error", reason => console.log("join failed", reason) )
}
}
export default Video
如果再一次瀏覽網址 http://localhost:4000/watch/2-elixir,在 js console 就可看到
[Log] push: videos:2 phx_join (1) – {} (app.js, line 1586)
[Log] receive: ok videos:2 phx_reply (1) – {status: "ok", response: {}} (app.js, line 1586)
[Log] joined the video channel – {}
server 的 console log 為
[info] JOIN videos:2 to Rumbl.VideoChannel
Transport: Phoenix.Transports.WebSocket
Parameters: %{}
[info] Replied videos:2 :ok
Sending and Receiving Event
在 channel 收到的訊息為 event name + payload + arbitrary data
channel 有三種接收訊息的方式:
handle_in
receives direct channel events
handle_out
intercepts broadcast events
handle_info
receives OTP messages
Taking Channels for a Trial run
目前是讓 join function 每 5 seconds 就發送一次 :ping message 到 channel
修改 /web/channels/video_channel.ex
defmodule Rumbl.VideoChannel do
use Rumbl.Web, :channel
def join("videos:" <> video_id, _params, socket) do
:timer.send_interval(5_000, :ping)
{:ok, socket}
end
# 當 elixir message 到達 channel 就會呼叫 handle_info
# 目前每收到一次就將 :count +1
def handle_info(:ping, socket) do
count = socket.assigns[:count] || 1
push socket, "ping", %{count: count}
# :noreply 代表不發送 reply,並將 transformed 後的 socket 回傳回去
{:noreply, assign(socket, :count, count + 1)}
end
end
client 要對應修改 video.js,增加 vidChannel.on("ping", ({count}) => console.log("PING", count) )
let vidChannel = socket.channel("videos:" + videoId)
vidChannel.on("ping", ({count}) => console.log("PING", count) )
vidChannel.join()
.receive("ok", resp => console.log("joined the video channel", resp) )
.receive("error", reason => console.log("join failed", reason) )
js console 會持續看到
[Log] receive: videos:2 ping – {count: 1} (app.js, line 1586)
[Log] receive: videos:2 ping – {count: 2} (app.js, line 1586)
[Log] receive: videos:2 ping – {count: 3} (app.js, line 1586)
[Log] receive: videos:2 ping – {count: 4} (app.js, line 1586)
handle_info 就是 loop
client js 的部分是以 vidChannel.on(event, callback) 處理訊息
後面會看到怎麼用 handle_in 處理 synchronous messaging
controller 處理 request 而 channels hold a conversation
Annotating Videos
需要一個 Annotation model 儲存 user annotations
修改 /web/static/js/video.js
import Player from "./player"
let Video = {
init(socket, element){ if(!element){ return }
let playerId = element.getAttribute("data-player-id")
let videoId = element.getAttribute("data-id")
socket.connect()
Player.init(element.id, playerId, () => {
this.onReady(videoId, socket)
})
},
onReady(videoId, socket){
let msgContainer = document.getElementById("msg-container")
let msgInput = document.getElementById("msg-input")
let postButton = document.getElementById("msg-submit")
let vidChannel = socket.channel("videos:" + videoId)
// 處理 post 按鈕的 click event
// 利用 vidChannel.push 發送 new_annotation
postButton.addEventListener("click", e => {
let payload = {body: msgInput.value, at: Player.getCurrentTime()}
vidChannel.push("new_annotation", payload)
.receive("error", e => console.log(e) )
msgInput.value = ""
})
// 接收 server 發送的 new_annotation,把 annotation 顯示在畫面 msgContainer 上
vidChannel.on("new_annotation", (resp) => {
this.renderAnnotation(msgContainer, resp)
})
vidChannel.join()
.receive("ok", resp => console.log("joined the video channel", resp) )
.receive("error", reason => console.log("join failed", reason) )
},
// safely escape user input,可避免發生 XSS attack
esc(str){
let div = document.createElement("div")
div.appendChild(document.createTextNode(str))
return div.innerHTML
},
renderAnnotation(msgContainer, {user, body, at}){
let template = document.createElement("div")
template.innerHTML = `
<a href="#" data-seek="${this.esc(at)}">
<b>${this.esc(user.username)}</b>: ${this.esc(body)}
</a>
`
msgContainer.appendChild(template)
msgContainer.scrollTop = msgContainer.scrollHeight
}
}
export default Video
Adding Annotation on the Server
修改 /web/channels/video_channel.ex
defmodule Rumbl.VideoChannel do
use Rumbl.Web, :channel
def join("videos:" <> video_id, _params, socket) do
{:ok, socket}
end
# 處理 new_annotation,並 broadcast! 給目前 topic 的所有 users
# broadcast! 有三個參數 socket, name of the event, payload (任意的 map)
def handle_in("new_annotation", params, socket) do
broadcast! socket, "new_annotation", %{
user: %{username: "anon"},
body: params["body"],
at: params["at"]
}
# :reply 有兩種 :ok 或是 :error
# 不然就是用 :noreply
{:reply, :ok, socket}
end
end
注意: 將原始的 message payload 直接轉送給其他人,而沒有 inspection,可能會有 security 問題
如果這樣寫,就有可能有資安問題
broadcast! socket, "new_annotation", Map.put(params, "user", %{
username: "anon"
})
現在打開兩個網頁,就可以互傳訊息
Socket Authentication
因為 channel 是 long-duration connection,利用 Phoenix.Token token authentication,可為每個 user 指定一個 unique token
不使用 session cookie 的原因是,可能會有 cross-domain attack。
因為已經有利用 Rumbl.Auth plug 增加的 current_user,現在要做的是利用 authenticated user 產生 token 並傳給 socket 前端。
首先修改 /web/templates/layout/app.html.eex,將 userToken 由 layout assigns 取出並放在 browser window 中
</div> <!-- /container -->
<script>window.userToken = "<%= assigns[:user_token] %>"</script>
<script src="<%= static_path(@conn, "/js/app.js") %>"></script>
修改 /web/controllers/auth.ex
def call(conn, repo) do
user_id = get_session(conn, :user_id)
cond do
user = conn.assigns[:current_user] ->
put_current_user(conn, user)
user = user_id && repo.get(Rumbl.User, user_id) ->
put_current_user(conn, user)
true ->
assign(conn, :current_user, nil)
end
end
def login(conn, user) do
conn
|> put_current_user(user)
|> put_session(:user_id, user.id)
|> configure_session(renew: true)
end
# 將 current_user 及 user_token 放到 conn.assigns
defp put_current_user(conn, user) do
token = Phoenix.Token.sign(conn, "user socket", user.id)
conn
|> assign(:current_user, user)
|> assign(:user_token, token)
end
修改 /web/static/js/socket.js ,將 user token 傳入 socket.connect,並在 UserSocket.connect callback 中驗證 token。
import {Socket} from "phoenix"
let socket = new Socket("/socket", {
// :params 會出現在 UserSocket.connect 的第一個參數
params: {token: window.userToken},
logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
})
export default socket
修改 /web/channles/user_socket.ex
defmodule Rumbl.UserSocket do
use Phoenix.Socket
## Channels
channel "videos:*", Rumbl.VideoChannel
## Transports
transport :websocket, Phoenix.Transports.WebSocket
# transport :longpoll, Phoenix.Transports.LongPoll
# 2 weeks
@max_age 2 * 7 * 24 * 60 * 60
def connect(%{"token" => token}, socket) do
case Phoenix.Token.verify(socket, "user socket", token, max_age: @max_age) do
{:ok, user_id} ->
{:ok, assign(socket, :user_id, user_id)}
{:error, _reason} ->
:error
end
end
def connect(_params, _socket), do: :error
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end
利用 Phoenix.Token.verify 檢查 token,可設定 max_age
如果 token 正確,就會收到 user_id 並存在 socket.assigns,回傳 {:ok, socket} 用以建立 connection。token 錯誤,就 return :error
refresh your page,application 還是能正常運作,但已經有了 user authentication
Persisting Annotations
建立 Annotation model,create annotations on videos,每個 annotation 會 belong to a user and a video
$ mix phoenix.gen.model Annotation annotations body:text at:integer user_id:references:users video_id:references:videos
* creating web/models/annotation.ex
* creating test/models/annotation_test.exs
* creating priv/repo/migrations/20170906163920_create_annotation.exs
Remember to update your repository by running migrations:
$ mix ecto.migrate
$ mix ecto.migrate
Compiling 1 file (.ex)
Generated rumbl app
[info] == Running Rumbl.Repo.Migrations.CreateAnnotation.change/0 forward
[info] create table annotations
[info] create index annotations_user_id_index
[info] create index annotations_video_id_index
[info] == Migrated in 0.0s
還要處理 User, Video 的 relationships
修改 /web/models/user.ex 及 /web/models/video.ex,增加 has_many
has_many :annotations, Rumbl.Annotation
回到 /web/channels/video_channel.ex
defmodule Rumbl.VideoChannel do
use Rumbl.Web, :channel
def join("videos:" <> video_id, _params, socket) do
{:ok, assign(socket, :video_id, String.to_integer(video_id))}
end
# 確保所有 events 都會有 current_user,然後再呼叫其他 handle_in
def handle_in(event, params, socket) do
user = Repo.get(Rumbl.User, socket.assigns.user_id)
handle_in(event, params, user, socket)
end
def handle_in("new_annotation", params, user, socket) do
# 以 new_annotation 產生 changeset 並透過 Repo 存到 DB
changeset =
user
|> build_assoc(:annotations, video_id: socket.assigns.video_id)
|> Rumbl.Annotation.changeset(params)
case Repo.insert(changeset) do
# insert 成功,才 broadcast 給所有 subscribers
# 也可以用 {:noreply, socket} 不送 reply
{:ok, annotation} ->
broadcast! socket, "new_annotation", %{
id: annotation.id,
user: Rumbl.UserView.render("user.json", %{user: user}),
body: annotation.body,
at: annotation.at
}
{:reply, :ok, socket}
{:error, changeset} ->
{:reply, {:error, %{errors: changeset}}, socket}
end
end
end
因為也需要 notify subscribers 該 user 的資訊,在 UserView 新增 user.json template
defmodule Rumbl.UserView do
use Rumbl.Web, :view
alias Rumbl.User
def first_name(%User{name: name}) do
name
|> String.split(" ")
|> Enum.at(0)
end
def render("user.json", %{user: user}) do
%{id: user.id, username: user.username}
end
end
現在新增 annotation, server log 就會出現
INSERT INTO `annotations` (`at`,`body`,`user_id`,`video_id`,`inserted_at`,`updated_at`) VALUES (?,?,?,?,?,?) [0, "test", 1, 2, {{2017, 9, 6}, {16, 49, 10, 444088}}, {{2017, 9, 6}, {16, 49, 10, 447017}}]
[debug] QUERY OK db=0.6ms
在 refresh page 後,annotations 就會消失,所以在 user join channel 時,要把 messages 送給 client
修改 /web/channels/video_channel.ex,改寫 join,取得 video's annotations
defmodule Rumbl.VideoChannel do
use Rumbl.Web, :channel
alias Rumbl.AnnotationView
def join("videos:" <> video_id, _params, socket) do
video_id = String.to_integer(video_id)
video = Repo.get!(Rumbl.Video, video_id)
# 要 preload user
annotations = Repo.all(
from a in assoc(video, :annotations),
order_by: [asc: a.at, asc: a.id],
limit: 200,
preload: [:user]
)
resp = %{annotations: Phoenix.View.render_many(annotations, AnnotationView,
"annotation.json")}
{:ok, resp, assign(socket, :video_id, video_id)}
end
def handle_in("new_annotation", params, socket) do
user = Rumbl.Repo.get(Rumbl.User, socket.assigns.user_id)
changeset =
user
|> build_assoc(:annotations, video_id: socket.assigns.video_id)
|> Rumbl.Annotation.changeset(params)
case Repo.insert(changeset) do
{:ok, annotation} ->
broadcast! socket, "new_annotation", %{
id: annotation.id,
user: Rumbl.UserView.render("user.json", %{user: user}),
body: annotation.body,
at: annotation.at
}
{:reply, :ok, socket}
{:error, changeset} ->
{:reply, {:error, %{errors: changeset}}, socket}
end
end
end
Phoenix.View.render_many 能夠 collects the render results for all elements in the enumerable passed to it
新增 /web/views/annotation_view.ex
defmodule Rumbl.AnnotationView do
use Rumbl.Web, :view
def render("annotation.json", %{annotation: ann}) do
%{
id: ann.id,
body: ann.body,
at: ann.at,
user: render_one(ann.user, Rumbl.UserView, "user.json")
}
end
end
注意 annotaion's user 的 render_one,可處理 nil results
更新 vidChannle.join() 以便 render list of annotations on join
vidChannel.join()
.receive("ok", ({annotations}) => {
annotations.forEach( ann => this.renderAnnotation(msgContainer, ann) )
})
.receive("error", reason => console.log("join failed", reason) )
現在 reload 頁面就能看到所有 annotations
現在我們需要 schedule the annotations to appear synced up with the video playback
更新 /web/static/js/video.js
import Player from "./player"
let Video = {
init(socket, element){ if(!element){ return }
let playerId = element.getAttribute("data-player-id")
let videoId = element.getAttribute("data-id")
socket.connect()
Player.init(element.id, playerId, () => {
this.onReady(videoId, socket)
})
},
onReady(videoId, socket){
let msgContainer = document.getElementById("msg-container")
let msgInput = document.getElementById("msg-input")
let postButton = document.getElementById("msg-submit")
let vidChannel = socket.channel("videos:" + videoId)
postButton.addEventListener("click", e => {
let payload = {body: msgInput.value, at: Player.getCurrentTime()}
vidChannel.push("new_annotation", payload)
.receive("error", e => console.log(e) )
msgInput.value = ""
})
msgContainer.addEventListener("click", e => {
e.preventDefault()
let seconds = e.target.getAttribute("data-seek") ||
e.target.parentNode.getAttribute("data-seek")
if(!seconds){ return }
Player.seekTo(seconds)
})
vidChannel.on("new_annotation", (resp) => {
this.renderAnnotation(msgContainer, resp)
})
vidChannel.join()
.receive("ok", resp => {
this.scheduleMessages(msgContainer, resp.annotations)
})
.receive("error", reason => console.log("join failed", reason) )
},
renderAnnotation(msgContainer, {user, body, at}){
let template = document.createElement("div")
template.innerHTML = `
<a href="#" data-seek="${this.esc(at)}">
[${this.formatTime(at)}]
<b>${this.esc(user.username)}</b>: ${this.esc(body)}
</a>
`
msgContainer.appendChild(template)
msgContainer.scrollTop = msgContainer.scrollHeight
},
scheduleMessages(msgContainer, annotations){
setTimeout(() => {
let ctime = Player.getCurrentTime()
let remaining = this.renderAtTime(annotations, ctime, msgContainer)
this.scheduleMessages(msgContainer, remaining)
}, 1000)
},
renderAtTime(annotations, seconds, msgContainer){
return annotations.filter( ann => {
if(ann.at > seconds){
return true
} else {
this.renderAnnotation(msgContainer, ann)
return false
}
})
},
formatTime(at){
let date = new Date(null)
date.setSeconds(at / 1000)
return date.toISOString().substr(14, 5)
},
esc(str){
let div = document.createElement("div")
div.appendChild(document.createTextNode(str))
return div.innerHTML
}
}
export default Video
根據 current player time 去 render annotations
scheduleMessages 每秒都會執行一次,每次都呼叫 renderAtTime
renderAtTime 會 filter 要 render 的 messages
現在再次 reload 頁面,就可看到 annotation 的時間
增加讓 annotation 可以點擊的功能,就可以直接跳躍影片到該 annotaion 產生的時間
修改 /web/static/js/video.js
msgContainer.addEventListener("click", e => {
e.preventDefault()
let seconds = e.target.getAttribute("data-seek") ||
e.target.parentNode.getAttribute("data-seek")
if(!seconds){ return }
Player.seekTo(seconds)
})
Handling Disconnects
JS client 可斷線再 reconnect,Server 可能會 restart,或是網路可能發生問題,這些問題都會造成斷線。
如果發送一個 annotation,然後馬上把 server 關掉,client 會以 exponential back-off 的方式進行 reconnect。重新啟動 server,會發現 server 會認為是新的連線,然後發送所有的 annotations,client 會出現重複的 annotations。client 必須偵測 duplicate annotations 並忽略處理。
我們可以在 client 追蹤 lastseenid,並在每次收到新的 annotation 時更新這個值。
當 client 重連時,可將 lastseenid 發送給 server,server 就只需要發送未收到的訊息。
修改 /web/static/js/video.js,增加 vidChannel.params.lastseenid
vidChannel.on("new_annotation", (resp) => {
vidChannel.params.last_seen_id = resp.id
this.renderAnnotation(msgContainer, resp)
})
vidChannel.join()
.receive("ok", resp => {
let ids = resp.annotations.map(ann => ann.id)
if(ids.length > 0){ vidChannel.params.last_seen_id = Math.max(...ids) }
this.scheduleMessages(msgContainer, resp.annotations)
})
.receive("error", reason => console.log("join failed", reason) )
client 的 channel 會儲存 params 物件,並在每次 join 時,發送給 server。在 join 也要更新這個參數。
修改 /web/channels/video_channel.ex 的 join
def join("videos:" <> video_id, params, socket) do
last_seen_id = params["last_seen_id"] || 0
video_id = String.to_integer(video_id)
video = Repo.get!(Rumbl.Video, video_id)
annotations = Repo.all(
from a in assoc(video, :annotations),
where: a.id > ^last_seen_id,
order_by: [asc: a.at, asc: a.id],
limit: 200,
preload: [:user]
)
resp = %{annotations: Phoenix.View.render_many(annotations, AnnotationView,
"annotation.json")}
{:ok, resp, assign(socket, :video_id, video_id)}
end
References
Programming Phoenix