2014/4/28

erlang gen_server

OTP 函式庫中包含了完整的 web server、FTP server、CORBA ORB,OTP 的 behavior 將常用的行為模式包裝起來,可想成 「用 callback 模組進行參數化」的應用框架。

行為解決了問題的「非功能」部份, callback 解決了功能的部份
問題中的非功能部份(ex: 如何動態更新程式碼)對所有應用來說都是一樣的。

以下會

  1. 以 erlang 寫一個小 client-server 程式
  2. 將程式一般化
  3. 進入真正的程式碼

發展通用伺服器的過程

以下程式會慢慢地將程式中,非功能(一般化)與功能的部份區分開來。

server1: 基本伺服器, 用 callback module 將它參數化

server1 是一個基本的 server, 可以用 callback module 將它參數化,換句話說,就是把 callback 的程式跟 server 的程式分開。

%% server1.erl
-module(server1).
-export([start/2, rpc/2]).

start(Name, Mod) ->
    register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        {Name, Response} -> Response
    end.

loop(Name, Mod, State) ->
    receive
        {From, Request} ->
            {Response, State1} = Mod:handle(Request, State),
            From ! {Name, Response},
            loop(Name, Mod, State1)
    end.

以下是 name_server 的 callback module,一般 OTP 程式的習慣寫法,就是把 client 跟 server 端程式,放在同一個 module 裡面。

-module(name_server).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server1, [rpc/2]).

%% client apis,這是讓 client 端呼叫的程式介面
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines,這是讓 server 呼叫的
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

測試

1> server1:start(name_server, name_server).
true
2> name_server:add(tom, "home").
ok
3> name_server:whereis(tom).
{ok,"home"}

注意:name_server callback 沒有共時性、沒有生成、接收、送出、register,這表示我們可以寫 client-server module,而不需要知道下面的共時模型是什麼,這是所有伺服器的基本模型。

server2: 具有 Transaction 的伺服器

下面是一個會讓客戶當機的 server,如果查詢導致 Server 例外,客戶端就會當機。

-module(server2).
-export([start/2, rpc/2]).

start(Name, Mod) ->
    register(Name, spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        %% client 收到 server 回傳的 crash,就跳出程式
        {Name, crash} -> exit(rpc);
        {Name, ok, Response} -> Response
    end.

loop(Name, Mod, OldState) ->
    receive
        {From, Request} ->
            try Mod:handle(Request, OldState) of
                {Response, NewState} ->
                    From ! {Name, ok, Response},
                    loop(Name, Mod, NewState)
            catch
                _:Why ->
                    log_the_error(Name, Request, Why),
                    %% 因為程式出錯了,發送 crash 訊息給 client
                    From ! {Name, crash},
                    %% 再次 loop 等待處理下一個訊息
                    loop(Name, Mod, OldState)
            end
    end.

log_the_error(Name, Request, Why) ->
    io:format("Server ~p request ~p ~n"
                  "caused exception ~p~n", 
                  [Name, Request, Why]).

如果例外發生在處理器函數內,它以 State 的原始值進行迴圈
,如果函數成功,它會以「處理器函數所提供的」NewState值進行迴圈,當處理器發生錯誤時,伺服器的狀態會維持不變。

name_server 要改成使用 server2 的話,只需要修改 -import 的地方

-module(name_server).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server2, [rpc/2]).

%% client routines
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

測試

1> server2:start(name_server, name_server).
true
2> name_server:add(tom, "home").
ok
3> name_server:whereis(tom).
{ok,"home"}
4> name_server:whereis(jane).
error

server3: 具有程式碼熱抽換功能的伺服器

如果送給伺服器一個程式碼熱抽換 swap_code 的訊息,就會改變 callback module,變成呼叫新模組

-module(server3).
-export([start/2, rpc/2, swap_code/2]).

start(Name, Mod) ->
    register(Name, 
             spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

swap_code(Name, Mod) -> rpc(Name, {swap_code, Mod}).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        {Name, Response} -> Response
    end.

loop(Name, Mod, OldState) ->
    receive
        {From, {swap_code, NewCallBackMod}} ->
            From ! {Name, ack},
            loop(Name, NewCallBackMod, OldState);
        {From, Request} ->
            {Response, NewState} = Mod:handle(Request, OldState),
            From ! {Name, Response},
            loop(Name, Mod, NewState)
    end.

因為 name_server 所註冊綁定的 process name 不能在程式運作過程中修改掉,但我們可以動態地把 name_server process 運作的 module 換掉,達到抽換程式碼的功能。

-module(name_server1).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server3, [rpc/2]).

%% client routines
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

測試

1> server3:start(name_server, name_server1).
true
2> name_server1:add(tom, "home").
ok
3> name_server1:add(jane, "home").
ok

如果我們測試到一半,想要增加「找出 name_server 儲存的」所有名稱,由於目前 name_server1 並沒有這個函數,所以我們就撰寫一個新的 new_name_server,增加了 all_names 與 delete 函數。

-module(new_name_server).
-export([init/0, add/2, all_names/0, delete/1, whereis/1, handle/2]).
-import(server3, [rpc/2]).

%% interface
all_names()      -> rpc(name_server, allNames).
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
delete(Name)     -> rpc(name_server, {delete, Name}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle(allNames, Dict)           -> {dict:fetch_keys(Dict), Dict};
handle({delete, Name}, Dict)     -> {ok, dict:erase(Name, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

在 console 就可以直接用 swap_code 把 name_server 執行的 module 換成 new_name_server

4> server3:swap_code(name_server, new_name_server).
ack
5> new_name_server:all_names().
[jane,tom]

以往認為伺服器本身有記錄狀態,發送訊息就會改變狀態,如果要抽換程式邏輯,必須要把 server 停下來,改變程式,然後重新啟動,但實際上,可以透過訊息的方式,讓 server 的迴圈執行新版本的 server module。

Server 4: 具有 Transaction 與 程式碼熱抽換 合併

-module(server4).
-export([start/2, rpc/2, swap_code/2]).

start(Name, Mod) ->
    register(Name, spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

swap_code(Name, Mod) -> rpc(Name, {swap_code, Mod}).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        {Name, crash} -> exit(rpc);
        {Name, ok, Response} -> Response
    end.

loop(Name, Mod, OldState) ->
    receive
        {From, {swap_code, NewCallbackMod}} ->
            From ! {Name, ok, ack},
            loop(Name, NewCallbackMod, OldState);
        {From, Request} ->
            try Mod:handle(Request, OldState) of
                {Response, NewState} ->
                    From ! {Name, ok, Response},
                    loop(Name, Mod, NewState)
            catch
                _: Why ->
                    log_the_error(Name, Request, Why),
                    From ! {Name, crash},
                    loop(Name, Mod, OldState)
            end
    end.

log_the_error(Name, Request, Why) ->
    io:format("Server ~p request ~p ~n"
                  "caused exception ~p~n", 
                  [Name, Request, Why]).

server 5

此程式不做任何事,直到我們告訴它 {become, F},就會變成伺服器 F。

-module(server5).
-export([start/0, rpc/2]).

start() -> spawn(fun() -> wait() end).

wait() ->
    receive
        {become, F} -> F()
    end.

rpc(Pid, Q) ->
    Pid ! {self(), Q},
    receive
        {Pid, Reply} -> Reply
    end.

這是計算階層的 server

-module(fac_server).
-export([loop/0]).

loop() ->
    receive
    {From, {fac, N}} ->
        From ! {self(), fac(N)},
        loop();
    {become, Something} ->
        Something()
    end.

fac(0) -> 1;
fac(N) -> N * fac(N-1).

測試

1> Pid = server5:start().
<0.33.0>
2> Pid ! {become, fun fac_server:loop/0}
.
{become,#Fun<fac_server.loop.0>}
3> server5:rpc(Pid, {fac, 10}).
3628800

OTP 的 gen_server 就像是最後的 server5,是個一般化,可變成任何一種 server 的 server library,它提供了所有非功能的行為與錯誤處理。

使用 gen_server

三個步驟寫出一個 gen_server callback module

  1. 為 callback module 取名
  2. 寫 client 使用的介面函數
  3. 寫 callback module 中需要的六個 callback funciton

範例

  1. 為 callback module 取名
    my_bank
  2. 寫 client 使用的介面函數
    start() 啟動銀行
    stop() 關閉銀行
    new_account(Who) 建立新銀行帳號
    deposit(Who, Amount) 存款
    withdraw(Who, Amount) 提款

    這些都是呼叫 gen_server 的函數

     start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
     stop()  -> gen_server:call(?MODULE, stop).
    
     new_account(Who)      -> gen_server:call(?MODULE, {new, Who}).
     deposit(Who, Amount)  -> gen_server:call(?MODULE, {add, Who, Amount}).
     withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).

    2.1 gen_server:start_link({local, Name}, Mode, ...) 會啟動一個本地伺服器,如果第一個參數填為 {global,GlobalName} ,則此伺服器為 global server
    2.2 ?MODULE 會展開成 my_bank
    2.3 gen_server:call(?MODULE, Term) 用來對 server 進行遠端呼叫 rpc

  3. 寫 callback module 中需要的六個 callback funcitons

callbak module 中要撰寫六個 callback functions
init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3

以下為一個 gen_server 程式,最簡單的樣板

-module().
%% 編譯時,如果沒有定義適當的callback function,就會產生警告
-behaviour(gen_server).
-export([start_link/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
     terminate/2, code_change/3]).

%% 呼叫 gen_server:start_link(Name, CallBackMod, StartArgs, Opts) 啟動 server
%% 第一個呼叫的是 Mod:init(StartArgs)
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%% 啟動時會被呼叫的函數,回傳值裡面的 State 會在其他函數中使用
init([]) -> {ok, State}.

%% 這是rpc 遠端呼叫時 server 的處理函數,Reply 會被送回 client 端
handle_call(_Request, _From, State) -> {reply, Reply, State}.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.

terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, Extra) -> {ok, State}.

以下為 my_bank 實際上用 ets 實作的程式

-module(my_bank).

-behaviour(gen_server).
-export([start/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).
-compile(export_all).

%% client functions
start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop()  -> gen_server:call(?MODULE, stop).

new_account(Who)      -> gen_server:call(?MODULE, {new, Who}).
deposit(Who, Amount)  -> gen_server:call(?MODULE, {add, Who, Amount}).
withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).

%% 6 server callback functions
init([]) -> {ok, ets:new(?MODULE,[])}.

handle_call({new,Who}, _From, Tab) ->
    %% 先找看看帳號有沒有存在,舊帳號就不能再 ets:insert
    Reply = case ets:lookup(Tab, Who) of
                []  -> ets:insert(Tab, {Who,0}), 
                       {welcome, Who};
                [_] -> {Who, you_already_are_a_customer}
            end,
    {reply, Reply, Tab};
%% Who 存錢 X,最後得到餘額
handle_call({add,Who,X}, _From, Tab) ->
    Reply = case ets:lookup(Tab, Who) of
                []  -> not_a_customer;
                [{Who,Balance}] ->
                    NewBalance = Balance + X,
                    ets:insert(Tab, {Who, NewBalance}),
                    {thanks, Who, your_balance_is,  NewBalance}    
            end,
    {reply, Reply, Tab};
%% Who 提款 X,最後得到餘額
handle_call({remove,Who, X}, _From, Tab) ->
    Reply = case ets:lookup(Tab, Who) of
                []  -> not_a_customer;
                %% 扣錢之前,要先判斷一下夠不夠,餘額不能變成負值
                [{Who,Balance}] when X =< Balance ->
                    NewBalance = Balance - X,
                    ets:insert(Tab, {Who, NewBalance}),
                    {thanks, Who, your_balance_is,  NewBalance};    
                [{Who,Balance}] ->
                    {sorry,Who,you_only_have,Balance,in_the_bank}
            end,
    {reply, Reply, Tab};
handle_call(stop, _From, Tab) ->
    %% server 收到 stop,會回傳以下訊息,然後停止 server
    %% 第二個 normal 會變成 terminate 的第一個參數 _Reason
    %% 第三個參數 stopped 會變成 my_bank:stop() 的回傳值
    {stop, normal, stopped, Tab}.

handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, Extra) -> {ok, State}.

測試

1> my_bank:start().
{ok,<0.33.0>}
2> my_bank:deposit("tom", 10).
not_a_customer
3> my_bank:new_account("tom").
{welcome,"tom"}
4> my_bank:new_account("tom").
{"tom",you_already_are_a_customer}
5> my_bank:deposit("tom", 10).
{thanks,"tom",your_balance_is,10}
6> my_bank:deposit("tom", 15).
{thanks,"tom",your_balance_is,25}
7> my_bank:withdraw("tom", 5).
{thanks,"tom",your_balance_is,20}
8> my_bank:withdraw("tom", 35).
{sorry,"tom",you_only_have,20,in_the_bank}

gen_server callback functions 的細節

  1. 啟動 server
    呼叫 gen_server:start_link(Name,Mod,InitArgs,Opts) 就會啟動 server,建立一個名稱為 Name 的 server,callback module 為 Mod,Opts 控制這個 server 的行為:可指定訊息的記錄、除錯函數等等。在呼叫 Mod:init(InitArgs) 之後,此伺服器就被啟動完成

     @spec init(Args) -> {ok, State} |
                     {ok, State, Timeout} |
                     ignore |
                     {stop, Reason}

    當 init 回傳 {ok, State},就代表啟動成功,且初始狀態為 State

  2. 呼叫 server 時,客戶端會呼叫 gen_server:call(Name, Request),進而呼叫 callback function: handle_call/3

     @spec handle_call(Request, From, State) -> {reply, Reply, State} |
                     {reply, Reply, State, Timeout} |
                     {noreply, State} |
                     {noreply, State, Timeout} |
                     {stop, Reason, Reply, State} |
                     {stop, Reason, State}

    Request 是 gen_server:call 的第二個參數
    From 是 client 端發出 Request 的 process PID
    State 是server目前的狀態

    通常會回傳 {reply, Reply, NewState} ,Reply 會回傳給 client 端,變成 gen_server:call 的回傳值,而 NewState 是 server 的下一個狀態

  3. gen_server:cast(Name,Name) 這是不具有回傳值的呼叫,單純的發送訊息給 server

     @spec handle_cast(Msg, State) -> {noreply, NewState} |
                     {noreply, NewState, Timeout} |
                     {stop, Reason, NewState}

    通常會回傳 {noreply, NewState} 改變 server 的狀態

  4. gen_server:handle_info(Info, State) 是用來處理「自發訊息」的
    例如當 server process 被連結到另一個行程,且會捕捉離開訊息,那就可能會收到 {'EXIT', Pid, What} 訊息。或是系統中任何需要知道此 server 的 PID,都可以發送訊息過來,這樣的訊息最後會變成 Info 的值。

     @spec handle_info(Info, State) -> {noreply, State} |
                     {noreply, State, Timeout} |
                     {stop, Reason, State}
  5. server 終結的理由很多,但最後都會呼叫 terminate(Reason, NewState)
    5.1 handle_call, handle_cast, handle_info 可能會傳回 {stop, Reason, NewState}
    5.2 server可能會以 {'EXIT', reason } 當機

     @spec terminate(_Reason, State) -> void()

    可以在這裡把資料放進 disk,或是產生新訊息傳給其他 processes,或是直接丟棄

  6. 程式碼更新時,會呼叫 code_change

     @spec code_change(OldVsn, State, Extra) -> {ok, NewState}

在 emacs 提供的 gen_server template

%%%-------------------------------------------------------------------
%%% File    : gen_server_template.erl
%%% Author  : my name <yourname@localhost.localdomain>
%%% Description : 
%%%
%%% Created :  1 Jan 2014 by my name <yourname@localhost.localdomain>
%%%-------------------------------------------------------------------
-module().

-behaviour(gen_server).

%% API
-export([start_link/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
     terminate/2, code_change/3]).

-record(state, {}).

%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%====================================================================
%% gen_server callbacks
%%====================================================================

%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%%                         {ok, State, Timeout} |
%%                         ignore               |
%%                         {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
    {ok, #state{}}.

%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%%                                      {reply, Reply, State, Timeout} |
%%                                      {noreply, State} |
%%                                      {noreply, State, Timeout} |
%%                                      {stop, Reason, Reply, State} |
%%                                      {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.

%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%%                                      {noreply, State, Timeout} |
%%                                      {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
    {noreply, State}.

%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%%                                       {noreply, State, Timeout} |
%%                                       {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
    {noreply, State}.

%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
    ok.

%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World

2014/4/25

erlang Mnesia

Mnesia 是 erlang 內建的 DBMS,可儲存任何 erlang 資料結構。也可設定組態,儲存在記憶體或是磁碟中,Table 可被複製到其他機器上,以提供容錯機制。

原本 Mnesia 的名字是 Amnesia 失憶症,但被 Joe 的老闆否決了,因為 DB 不能失憶,所以就去掉了 A ,變成 Mnesia。

DB查詢

Mnesia 查詢就類似 SQL 與 list comprehension,他們都是用 set theory 設計的。

建立測試資料庫,有三個 table: shop, cost, design,分別以 record 定義 schema

-module(test_mnesia).
-import(lists, [foreach/2]).
-compile(export_all).

%% IMPORTANT: The next line must be included
%%            if we want to call qlc:q(...)
%% 使用 qlc 必須要引用 qlc.hrl
-include_lib("stdlib/include/qlc.hrl").

-record(shop, {item, quantity, cost}).
-record(cost, {name, price}).

% 建立測試資料庫
do_this_once() ->
    % 在 node() 本機節點,進行資料庫初始化
    mnesia:create_schema([node()]),
    mnesia:start(),
    mnesia:create_table(shop,   [{attributes, record_info(fields, shop)}]),
    mnesia:create_table(cost,   [{attributes, record_info(fields, cost)}]),
    mnesia:create_table(design, [{attributes, record_info(fields, design)}]),
    mnesia:stop().

測試

1> test_mnesia:do_this_once().
stopped

=INFO REPORT==== 19-Feb-2014::10:38:33 ===
    application: mnesia
    exited: stopped
    type: temporary

資料庫初始化

在這個節點進行資料庫初始化,只需要做一次

1>mnesia:create_schema([node()]).
ok
2>init:stop().
ok

這會建立一個名稱為 Mnesia.nonode@nohost 的目錄,用來儲存資料庫。

如果填入多個 nodes,呼叫 mnesia:create_schema(NodeList),就會在所有節點進行初始化。

另外也可以在啟動 erlang 時,直接設定資料庫的目錄

>erl -mnesia dir '"d:/temp/db"'
Eshell V5.10.4  (abort with ^G)
1> mnesia:create_schema([node()]).
ok
2> init:stop().
ok
3>

取得 table 內所有資料

% 啟動資料庫
start() ->
    mnesia:start(),
    mnesia:wait_for_tables([shop,cost,design], 20000).

% 重設 DB,並填入測試資料
reset_tables() ->
    % clear 可將 table 清空
    mnesia:clear_table(shop),
    mnesia:clear_table(cost),
    % 以 tuple 將資料寫入 table
    F = fun() ->
                foreach(fun mnesia:write/1, example_tables())
        end,
    % 將整個 F 包裝在一個 transaction 中
    mnesia:transaction(F).

example_tables() ->
    % tuple 的第一個元素是 table name,後面的資料要依照 record 定義的順序
    [%% The shop table
     {shop, apple,   20,   2.3},
     {shop, orange,  100,  3.8},
     {shop, pear,    200,  3.6},
     {shop, banana,  420,  4.5},
     {shop, potato,  2456, 1.2},
     %% The cost table
     {cost, apple,   1.5},
     {cost, orange,  2.4},
     {cost, pear,    2.2},
     {cost, banana,  1.5},
     {cost, potato,  0.6}
    ].

%% SQL equivalent
%%  SELECT * FROM shop;
demo(select_shop) ->
    do(qlc:q([X || X <- mnesia:table(shop)]));

%% SQL equivalent
%%  SELECT item, quantity FROM shop;
demo(select_some) ->
    do(qlc:q([{X#shop.item, X#shop.quantity} || X <- mnesia:table(shop)]));

%% ===========
%% SQL equivalent
%%   SELECT shop.item FROM shop
%%   WHERE  shop.quantity < 250;

demo(reorder) ->
    do(qlc:q([X#shop.item || X <- mnesia:table(shop),
                             X#shop.quantity < 250
             ]));

%% SQL equivalent
%%   SELECT shop.item
%%   FROM shop, cost 
%%   WHERE shop.item = cost.name 
%%     AND cost.price < 2
%%     AND shop.quantity < 250

demo(join) ->
    do(qlc:q([X#shop.item || X <- mnesia:table(shop),
                             X#shop.quantity < 250,
                             Y <- mnesia:table(cost),
                             X#shop.item =:= Y#cost.name,
                             Y#cost.price < 2
             ])).

do(Q) ->
    % Q 是一個 編譯的 QLC 查詢,qlc:e 會估算結果,交易成功會回傳 {atomic, Val}
    F = fun() -> qlc:e(Q) end,
    {atomic, Val} = mnesia:transaction(F),
    Val.

測試

2> test_mnesia:start().
ok
3> test_mnesia:reset_tables().
{atomic,ok}
4> test_mnesia:demo(select_shop).
[{shop,potato,2456,1.2},
 {shop,apple,20,2.3},
 {shop,orange,100,3.8},
 {shop,pear,200,3.6},
 {shop,banana,420,4.5}]
5> test_mnesia:demo(select_some).
[{potato,2456},
 {apple,20},
 {orange,100},
 {pear,200},
 {banana,420}]

qlc 是 query list comprehensions 的縮寫,語法

X || X <- mnesia:table(shop)

意思是 取得list: X,其中 X 是取自 Mnesia 的 table: shop,X 的值是 erlang 的 shop record,要注意 qlc:c 的語法

qlc:q([X || X <- mnesia:table(shop)])

跟下面分成兩段的寫法是不一樣的,不能這樣寫

Var = [X || X <- mnesia:table(shop)],
qlc:c(Var)

取得特定欄位資料

因為 X 是 shop record,可以用 record 的語法 X#shop.item,只取得我們想要的欄位資料

qlc:q([ {X#shop.item, X#shop.quantity} || X <- mnesia:table(shop)])

條件查詢

直接把條件 X#shop.quantity < 250 放在 list comprehension 的後面

qlc:q([X#shop.item || X <- mnesia:table(shop),
                             X#shop.quantity < 250 ])

測試

6> test_mnesia:demo(reorder).
[apple,orange,pear]

join table

以 X#shop.item =:= Y#cost.name join 兩個 table

%%   SELECT shop.item
%%   FROM shop, cost 
%%   WHERE shop.item = cost.name 
%%     AND cost.price < 2
%%     AND shop.quantity < 250

qlc:q([X#shop.item || X <- mnesia:table(shop),
                             X#shop.quantity < 250,
                             Y <- mnesia:table(cost),
                             X#shop.item =:= Y#cost.name,
                             Y#cost.price < 2
             ])

測試

7> test_mnesia:demo(join).
[apple]

新增/移除 資料

shop 的 primary key 是表格中的第一個欄位,此 table 是屬於 set 類型,如果新的 record 跟原本存在於 table 內的某個 row 有相同的 primary key,則新的 record 就會覆蓋掉舊資料。

移除資料,只需要知道 table 名稱,跟 primary key。

%% ===========
add_shop_item(Name, Quantity, Cost) ->
    Row = #shop{item=Name, quantity=Quantity, cost=Cost},
    F = fun() ->
                mnesia:write(Row)
        end,
    mnesia:transaction(F).

remove_shop_item(Item) ->
    Oid = {shop, Item},
    F = fun() ->
                mnesia:delete(Oid)
        end,
    mnesia:transaction(F).

測試

1> test_mnesia:start().
ok
2> test_mnesia:reset_tables().
{atomic,ok}
3> test_mnesia:demo(select_shop).
[{shop,potato,2456,1.2},
 {shop,apple,20,2.3},
 {shop,orange,100,3.8},
 {shop,pear,200,3.6},
 {shop,banana,420,4.5}]
4> test_mnesia:add_shop_item(orange, 200, 2.8).
{atomic,ok}
5> test_mnesia:demo(select_shop).
[{shop,potato,2456,1.2},
 {shop,apple,20,2.3},
 {shop,orange,200,2.8},
 {shop,pear,200,3.6},
 {shop,banana,420,4.5}]
6> test_mnesia:remove_shop_item(pear).
{atomic,ok}
7> test_mnesia:demo(select_shop).
[{shop,potato,2456,1.2},
 {shop,apple,20,2.3},
 {shop,orange,200,2.8},
 {shop,banana,420,4.5}]
8> mnesia:stop().
stopped
9>
=INFO REPORT==== 19-Feb-2014::11:25:00 ===
    application: mnesia
    exited: stopped
    type: temporary

transaction

在 transaction 的 F 裡面,可以呼叫 mnesia:write/1, mnesia:delete/1 或 qlc:e(Q)
qlc:e(Q) 裡面的 Q 是以 qlc:q/1 編譯的一個查詢

do_something() ->
    F = fun() ->
            % ...
            mnesia:write(Row),
            mnesia:delete(Old),
            qlc:e(Q)
        end,
    mnesia:transaction(F).

Mnesia 使用的交易策略是 悲觀上鎖 pessimistic locking,當交易管理員存取 table 時,會視情況將 record 或 整個 table 上鎖,如果偵測到可能會導致 deadlock,就會立刻放棄這個交易,並且還原先前做的任何改變。

如果交易一開始就失敗了,系統會自動等待一小段時間後,再試一次,所以交易內的 function 可能會被估算好幾次。

因此,交易內不應該做任何會產生副作用的行為,例如:

F = fun() ->
        ...
        io:format("reading..."),
        ...
    end,
mnesia:transaction(F),

我們可能會得到很多次 reading... 的畫面輸出

注意

  1. mnesia:write/1 與 mnesia:delete/1 只能在 由 mnesia:transaction/1 所處理的 fun 裡面使用
  2. 不應該寫程式去捕捉 mnesia 存取函式(mnesia:write/1, mnesia:delete/1 ...)的例外,因為Mnesia的交易機制,非常依賴這些函數丟出的例外,因此不能在 fun 裡面捕捉這些例外
farmer(Ntrade) ->
    %% Nwant = Number of oranges the farmer wants to trade 農夫想換掉的 oranges 數量
    % 可以用 1個orange 換 2個 apples
    F = fun() ->
                %% 取得  apple 的數量
                [Apple] = mnesia:read({shop,apple}),
                Napples = Apple#shop.quantity,
                Apple1  = Apple#shop{quantity = Napples + 2*Ntrade},
                %% 把最後 apples 的數量寫入 DB
                mnesia:write(Apple1),
                %% 取得 oranges 的數量
                [Orange] = mnesia:read({shop,orange}),
                NOranges = Orange#shop.quantity,
                if 
                    NOranges >= Ntrade ->
                        N1 =  NOranges - Ntrade,
                        Orange1 = Orange#shop{quantity=N1},
                        %% update the database
                        mnesia:write(Orange1);
                    true ->
                        %% 只要 oranges 不夠,就要 abort 放棄此交易
                        mnesia:abort(oranges)
                end
        end,
    mnesia:transaction(F).

測試

1> test_mnesia:start().
ok
2> test_mnesia:reset_tables().
{atomic,ok}
3> test_mnesia:demo(select_shop).
[{shop,potato,2456,1.2},
 {shop,apple,20,2.3},
 {shop,orange,100,3.8},
 {shop,pear,200,3.6},
 {shop,banana,420,4.5}]
4> test_mnesia:farmer(50).
{atomic,ok}
5> test_mnesia:demo(select_shop).
[{shop,potato,2456,1.2},
 {shop,apple,120,2.3},
 {shop,orange,50,3.8},
 {shop,pear,200,3.6},
 {shop,banana,420,4.5}]
6> test_mnesia:farmer(100).
{aborted,oranges}
7> test_mnesia:demo(select_shop).
[{shop,potato,2456,1.2},
 {shop,apple,120,2.3},
 {shop,orange,50,3.8},
 {shop,pear,200,3.6},
 {shop,banana,420,4.5}]

儲存複雜的資料

傳統 DBMS 的缺點是,欄位的資料型別有限,必須要以程式語言跟DB的資料型別做對應轉換,複雜的物件也沒有辦法直接儲存進去,而Mnesia 是設計來儲存所有的 erlang 的資料結構。

Mnesia 的 key 與 record 都可以是任意的 erlang term。

資料結構、資料庫、語言之間沒有 impedance mismatch 阻抗不協調的問題。插入或刪除複雜資料結構的速度都很快。

一開始要定義 record: design

-record(design, {id, plan}).

然後定義函數來儲存 plan

add_plans() ->
    % id 是第一個元素,裡面是複合 key:{joe,1}
    % plan 是一個 半徑 10 的圓形
    D1 = #design{id   = {joe,1},
                 plan = {circle,10}},
    % id 是單純的 fred
    D2 = #design{id   = fred, 
                 plan = {rectangle,10,5}},
    % id 跟 plan 都比較複雜
    D3 = #design{id   = {jane,{house,23}},
                 plan = {house,
                         [{floor,1,
                           [{doors,3},
                            {windows,12},
                            {rooms,5}]},
                          {floor,2,
                           [{doors,2},
                            {rooms,4},
                            {windows,15}]}]}},
    F = fun() -> 
                mnesia:write(D1),
                mnesia:write(D2),
                mnesia:write(D3)
        end,
    mnesia:transaction(F).

get_plan(PlanId) ->
    F = fun() -> mnesia:read({design, PlanId}) end,
    mnesia:transaction(F).

測試

1> test_mnesia:start().
ok
2> test_mnesia:add_plans().
{atomic,ok}
3> test_mnesia:get_plan(fred).
{atomic,[{design,fred,{rectangle,10,5}}]}
4> test_mnesia:get_plan({jane, {house,23}}).
{atomic,[{design,{jane,{house,23}},
                 {house,[{floor,1,[{doors,3},{windows,12},{rooms,5}]},
                         {floor,2,[{doors,2},{rooms,4},{windows,15}]}]}}]}

Mnesia 支援 fragmented table,可以水平切割的 table,這可以用來設計出超大的 table,而 table 內的資料被儲存在不同的機器上,這些 fragmented table 本身也是 Mnesia table,每個片段都可以獨立被複製、索引。

表格類型與位置

Mnesia table 可存在 RAM 或 DISK 上,另外也可以放在單一機器或複製到數個機器上。

  1. RAM
    速度很快,一旦當機或停止 DB,資料就會消失。

    使用時必須先測試實體記憶體能不能放得下所有資料,如果放不下,系統會因為常常要跟 Disk 做資料交換,而造成效能低下。

  2. DISK
    資料會存在 DISK 上,當機後還能保有資料。

    資料會先存到 disk log,等一定的時間後,disk log 會跟資料庫內其他資料 consolidate 在一起,且 disk log 裡面的項目會被建立出來,如果系統當機,當下次系統重新啟動時,會主動檢查 disk log 是否一致,log 裡面尚未處理的資料會被寫入到 DB。

    一旦交易成功,資料就會寫入到 disk log,如果在交易期間,系統當機了,那麼這一次的資料異動就會遺失。

因為 RAM table 是 transient 的,如果記憶體內的資料不見了,對系統有影響,那麼就必須要把 RAM table 複製到 disk 或是另一部機器的 RAM 或 disk。

建立表格

@spec mnesia:create_table(Name, ArgS) ->
        {atomic, ok} | {aborted, Reason}

Args 是 {Key, Val} 的 tuple list,參數說明如下

  1. Name
    表格的名稱,必須要是 atom。習慣上會使用 record 的名稱。

  2. {type, Type}
    表格的型別,可以是 set, ordered_set, bag 中的一種
    set:所有的 key 都不一樣
    ordered set:tuple 會根據 key 值被排序
    bag:key 可以重複

  3. {disc_copies, NodeList}
    NodeList 是 erlang 節點的清單,table 的 disk 副本會儲存到這些節點,使用此選項時,系統會在「進行此操作」的節點上,建立 RAM table 的複製版本

    把 disc_copies 的複製表格放在一個節點,將相同表格以不同型別存在別的節點上,這樣的作法很常見。因為
    3.1 從 RAM 讀取資料,速度很快
    3.2 可以把資料寫到 persistent disk 上

  4. {ram_copies, NodeList}
    NodeList 是 erlang 節點的清單,table 的 ram 副本會儲存到這些節點

  5. {disc_only_copies, NodeList}
    NodeList 是 erlang 節點的清單,table 的 disk 副本會儲存到這些節點,且沒有 ram 副本。

  6. {attributes, AtomList}
    這是特定table內的值的欄位名稱清單,想建立包含erlang record XXX 的表格,必須使用 {attribute, record_info(fields, XXX)} 的語法,或是指定記錄欄位名稱的清單

常見的屬性組合

  1. mnesia:create_table(shop, [Attrs])
    ram 放在單一節點
    節點當機,資料會遺失
    存取速度最快
    記憶體必須足夠存放所有資料

  2. mnesia:create_table(shop, [Attrs, {disc_copies, [node()]}])
    ram + disk,單一節點
    如果節點當機,表格資料會從 disk 中復原
    讀取很快,寫入很慢
    記憶體必須足夠存放所有資料

  3. mnesia:create_table(shop, [Attrs, {disc_only, [node()]}])
    disc,單一節點
    大型 table 不必擔心記憶體夠不夠
    比 RAM 的副本慢

  4. mnesia:create_table(shop, [Attrs, {ram_copies, [node(), someOtherNode()]}])
    ram, 兩個節點
    如果兩個節點都當機,資料會遺失
    記憶體必須足夠存放所有資料
    可從任一節點存取 table

  5. mnesia:create_table(shop, [Attrs, {disc_copies, [node(), someOtherNode()]}])
    disk,兩個節點
    可利用另一個節點的資料復原
    即使兩個節點都當機,資料也不會受損

表格行為

如果 table 被複製到跨越數個 erlang 節點,會盡量同步化,如果一個節點當機,系統仍然可以運作,當原本當機的節點再次上線,會和其他保有副本的節點重新同步。

Mnesia 的圖形化檢視器

1> tv:start().
<0.33.0>

這會啟動一個GUI使用介面,可查看 Mnesia Table

缺少的內容

  1. 備份與恢復
    Mnesia允許許多不同種類的備份組態及災難復原機制

  2. Dirty操作
    dirty_read, dirty_write ...
    這是指在交易外面進行操作,如果應用是單一線程,或是在某種特殊狀況下,使用 Dirty 操作可提高效率。

  3. SNMP Table
    Mnesia 內建了 SNMP table 型別,這使得實做 SNMP 管理系統變得比較容易。

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World

2014/4/21

erlang - ets dets

ets (erlang term storage) 與 dets (disk ets) 是用來負責大量 Erlang Term 的儲存機制。兩個都是提供大量的 key-value 尋找表,ETS 放記憶體, DETS 放 disk,都可被數個 process 共享,ETS 與 DETS 是 key-value 結構,只是 erlang tuples 的集合。

存在 ETS 表內的資料,是短暫的 transient,當 ETS 被 disposed,資料就會被刪除,而 DETS 內的資料是永續 persistent 的,即使erlang crash,資料也不會受損,在啟動 DETS 時,會自動檢查資料一致性 consistency,如果資料有問題,就會試圖維修,因為所有表格都要被檢查,這個動作會花上一些時間,但系統 crash 之前的最後一筆資料可能會遺失。

當系統 crash 而需要重置時,DETS 就會需要花上一些時間,才能將啟動的程序做完。

如果 APP 以 erlang 原生的機制:nondestructive assignment 與 pure erlang data structure 實作時,發現有實作的困難,而且需要高效率處理大量的資料,就可以使用 ETS。

ETS 並不是用 erlang 實作的,內部也跟普通的 erlang 物件不一樣,而且 ETS 並沒有 garbage collection 的機制,不會因為 gc 而影響效率。

基本操作

  1. 建立新 table 或開啟舊 table
    ets:new 或 dets:open_file
  2. 對某個 table 插入 tuple/tuple list
    insert(Tablename, X) ,其中 X 是一個 tuple/tuple list
  3. 對某個 table 查詢,以取得 tuple list
    lookup(TableName, Key) ,回傳符合 Key 的 tuple list
  4. 銷毀一個 table
    ets:delete(TableId) 或 dets:close(TableId)

table 型別

基本型別有兩種(1) set: key 不可重複 (2) bag: 允許數個 tuple(值組) 有相同的 key

set 或 bag 都有兩種變形, 所以有四種表格

  1. set:所有的 key 都不一樣
  2. ordered set:tuple 會根據 key 值被排序
  3. bag:key 可以重複
  4. duplicate bag:key 跟 tuple 都可以重複
-module(ets_test).
-export([start/0]).

start() ->
    lists:foreach(fun test_ets/1,
                  [set, ordered_set, bag, duplicate_bag]).

test_ets(Mode) ->
    TableId = ets:new(test, [Mode]),
    ets:insert(TableId, {a,1}),
    ets:insert(TableId, {b,2}),
    ets:insert(TableId, {a,1}),
    ets:insert(TableId, {a,3}),
    List = ets:tab2list(TableId),
    io:format("~-13w  => ~p~n", [Mode, List]),
    ets:delete(TableId).

測試

1> ets_test:start().
set            => [{b,2},{a,3}]
ordered_set    => [{a,3},{b,2}]
bag            => [{b,2},{a,1},{a,3}]
duplicate_bag  => [{b,2},{a,1},{a,1},{a,3}]
ok

ETS 表格的效率考量

在系統內部,ETS 表格是用 hash 表格呈現的(ordered set 例外, 內部使用的是平衡二元樹),這表示使用 set,空間有浪費,而使用 ordered set,時間有浪費。

插入 set,耗費的時間是固定的,插入ordered set,表格內的項目越多,花的時間越多。

bag 每次有新的元素加入,都要比對是否重複,如果相同 key 的值組很多,就會很耗時間,而 duplicate bag 插入資料所花的時間就比較短。

ETS 的 owner 是建立它的行程,行程死亡或呼叫 ets:delete,表格就會被刪除。

ETS 不是 gc 處理的對象,即使有大量資料存在 ETS 裡面,也不會因為 garbage collection 影響效率。

當一個 tuple 插入到ETS表,代表此 tuple 的所有資料結構從行程堆疊被複製到 ETS 表。進行 lookup 時,結果的 tuple 會從 ETS 複製到行程的堆疊 & heap,除了 binary 資料之外的所有資料結構都是這樣。

大型二元資料會存在獨立的 heap 儲存區域,此區域可被數個行程及 ETS表共享,「計算參考個數」的gc會管理個別的二元資料,當使用到此 binary 的行程和表格的總數量降為 0 時,此儲存區域會被回收。

換句話說:「含有大型二元資料」的訊息,成本很低。在 tuple 插入包含二元的資料到 ETS 表中,也很便宜。

原則:盡可能地使用二元,來表示字串&大型的無型別記憶體區塊。

建立 ETS表

呼叫 ets:new 的行程被認為是該 table 的 owner,如果 owner process 死亡,table 的空間就會自動被回收。

@spec ets:new(Name, [Opt]) -> TableId
    Name 是 atom
    [Opt]是選項清單
        set | ordered_set | bag | duplicate_bag
        private        -> 只有 owner 行程可以讀寫
        public        -> 只要知道 TableId 都可以讀寫此table
        protected    -> 只有 owner 行程可以讀寫,其他行程可以讀取資料
        named_table    -> 可使用 Name 作為後續的操作
        {keypos, K}    -> 把 K 當作 key的位置,正常狀況 K 為 1

        預設值    [set, protected, {keypos,1}]

public table 因任何行程都可以讀寫,使用者必須自己確定此 table 的讀寫有一致性,資料才不會亂掉。

最常使用的是 protected,因為只有 owner 行程可以改變 table 的資料,而所有行程都可以讀取資料,資料是共享的,成本幾乎為 0。

ets 範例: trigram

功能:寫一個探索程式,試圖預測某個字串是否為一個英文單字

方法:使用三個字母組成的序列 trigram,測試字串中所有連續字母的序列,是否符合「產生自大量英文字」的 trigram 集合

程式:從一個很大的英文字集合中,找出所有 trigram

  1. 做一個 iterator,在英文字典中,取得所有 trigram
  2. 建立 ETS,型別是集合 & 有序集合,代表所有的 trigram,建立一個集合包含所有的 trigram
  3. 測量建立這些不同表格所需的時間
  4. 測量存取這些不同表格所需要的時間
  5. 根據量測結果,選取最佳者,並為它寫存取常式
-module(lib_trigrams).
-export([for_each_trigram_in_the_english_language/2,
         make_tables/0, timer_tests/0,
         open/0, close/1, is_word/2,
         how_many_trigrams/0, 
         make_ets_set/0, make_ets_ordered_set/0, make_mod_set/0,
         lookup_all_ets/2, lookup_all_set/2
        ]).
-import(lists, [reverse/1]).

% 測量產生 set, ordered set, 或使用 erlang sets 模組 所要花的時間,以及平均每個 trigram 耗用的位元組大小
make_tables() ->
    {Micro1, N} = timer:tc(?MODULE, how_many_trigrams, []),
    io:format("Counting - No of trigrams=~p time/trigram=~p~n",[N,Micro1/N]),

    {Micro2, Ntri} = timer:tc(?MODULE, make_ets_ordered_set, []),
    FileSize1 = filelib:file_size("trigramsOS.tab"),
    io:format("Ets ordered Set size=~p time/trigram=~p~n",[FileSize1/Ntri, 
                                                           Micro2/N]),

    {Micro3, _} = timer:tc(?MODULE, make_ets_set, []),
    FileSize2 = filelib:file_size("trigramsS.tab"),
    io:format("Ets set size=~p time/trigram=~p~n",[FileSize2/Ntri, Micro3/N]),

    {Micro4, _} = timer:tc(?MODULE, make_mod_set, []),
    FileSize3 = filelib:file_size("trigrams.set"),
    io:format("Module sets size=~p time/trigram=~p~n",[FileSize3/Ntri, Micro4/N]).

% 測量 lookup 每一個 trigram 所要花的平均時間
timer_tests() ->
    time_lookup_ets_set("Ets ordered Set", "trigramsOS.tab"),
    time_lookup_ets_set("Ets set", "trigramsS.tab"),
    time_lookup_module_sets().

time_lookup_ets_set(Type, File) ->
    % 以 ets:file2tab 將 檔案轉換為 table
    {ok, Tab} = ets:file2tab(File),
    % table 換成 list
    L = ets:tab2list(Tab),
    Size = length(L),
    % 開始測量 lookup 的總時間
    {M, _} = timer:tc(?MODULE, lookup_all_ets, [Tab, L]),
    io:format("~s lookup=~p micro seconds~n",[Type, M/Size]),
    ets:delete(Tab).

lookup_all_ets(Tab, L) ->
    lists:foreach(fun({K}) -> ets:lookup(Tab, K) end, L).

time_lookup_module_sets() ->
    % 讀取檔案,取得 Bin
    {ok, Bin} = file:read_file("trigrams.set"),
    % binary_to_term 換成 sets
    Set = binary_to_term(Bin),
    % sets:to_list 換成 list
    Keys = sets:to_list(Set),
    Size = length(Keys),
    % 開始測量 lookup 的總時間
    {M, _} = timer:tc(?MODULE, lookup_all_set, [Set, Keys]),
    io:format("Module set lookup=~p micro seconds~n",[M/Size]).

lookup_all_set(Set, L) ->
    lists:foreach(fun(Key) -> sets:is_element(Key, Set) end, L).

% 計算總共有幾個 trigrams
how_many_trigrams() ->
    F = fun(_, N) -> 1 + N  end,
    for_each_trigram_in_the_english_language(F, 0).

%% ==========================
% 利用 trigrams 建立 set, ordered set table
make_ets_ordered_set() -> make_a_set(ordered_set, "trigramsOS.tab").
make_ets_set()         -> make_a_set(set, "trigramsS.tab").

make_a_set(Type, FileName) ->
    % 建立 set 或 ordered set 的 table
    Tab = ets:new(table, [Type]),
    % 定義 fun,將 Str 轉換為 binary ,然後以 {Key} 形式的 tuple 存到 ets Tab 裡面
    F = fun(Str, _) -> ets:insert(Tab, {list_to_binary(Str)}) end,
    % 將 F 送入 for_each_trigram_in_the_english_language
    for_each_trigram_in_the_english_language(F, 0),
    % 把 Table 的資料,轉存到 Filename 檔案裡面
    % Dumps the table Tab to the file Filename
    ets:tab2file(Tab, FileName),

    % 查詢 Tab 的 size
    Size = ets:info(Tab, size),
    % 刪除 Tab
    ets:delete(Tab),
    % 把 Size 傳回去
    Size.

% 改用 sets 模組來存放 trigrams
make_mod_set() ->
    D = sets:new(),
    F = fun(Str, Set) -> sets:add_element(list_to_binary(Str),Set) end,
    D1 = for_each_trigram_in_the_english_language(F, D),
    % term_to_binary 把 sets 轉成 binary,然後再寫入到檔案中
    file:write_file("trigrams.set", [term_to_binary(D1)]).

%% ==========================
%% An iterator that iterates through all trigrams in the language
%%     將英文中每個 trigram 套用 fun F
%        F 是 fun(Str,A) -> A
%        Str 的範圍遍及語言中所有 trigram,A是 accumulator
%    使用 354984 個英文字產生 trigram

for_each_trigram_in_the_english_language(F, A0) ->
    {ok, Bin0} = file:read_file("354984si.ngl.gz"),
    % 以 zlib:gunzip 解壓縮
    Bin = zlib:gunzip(Bin0),
    scan_word_list(binary_to_list(Bin), F, A0).

scan_word_list([], _, A) ->
    A;
scan_word_list(L, F, A) ->
    {Word, L1} = get_next_word(L, []),
    % Word 就是 reverse([$\s|L])
    % 把 $\s 加到 Word 前面,就等於  $\s + L + $\s
    % 每個單字的前後刻意加上一個空白字元,在這裡刻意將空白字元當作正常的字母
    A1 = scan_trigrams([$\s|Word], F, A),
    % 處理完成,取得 $\s + L + $\s 的所有 trigram 後,再繼續處理下一個 word
    scan_word_list(L1, F, A1).

%% scan the word looking for \r\n
%% the second argument is the word (reversed) so it
%% has to be reversed when we find \r\n or run out of characters
% 取得下一行的單字
% 一個字元一個字元慢慢累加到 L,如果 pattern 吻合 [\r\n|T],
% 就代表到該行行末,這時候就回傳 reverse([$\s|L])
% $\s 是一個空白字元,就等於把空白字元加到 L 前面,然後再 reverse 這個 list

get_next_word([$\r,$\n|T], L) -> {reverse([$\s|L]), T};
get_next_word([H|T], L)       -> get_next_word(T, [H|L]);
% 最後一行的 word 沒有行末的 $\r $\n,最後會是 [],因此就直接回傳 reverse([$\s|L])
get_next_word([], L)          -> {reverse([$\s|L]), []}.

% 三個字元的 list
scan_trigrams([X,Y,Z], F, A) ->
    F([X,Y,Z], A);
% 超過三個字元的 list
scan_trigrams([X,Y,Z|T], F, A) ->
    A1 = F([X,Y,Z], A),
    scan_trigrams([Y,Z|T], F, A1);
% 其他狀況的 list,就直接把 accumulator A 傳回去
scan_trigrams(_, _, A) ->
    A.

%% ==========================
%% access routines
%%   open() -> Table
%%   close(Table)
%%   is_word(Table, String) -> Bool

is_word(Tab, Str) -> is_word1(Tab, "\s" ++ Str ++ "\s").

is_word1(Tab, [_,_,_]=X) -> is_this_a_trigram(Tab, X);
is_word1(Tab, [A,B,C|D]) ->
    case is_this_a_trigram(Tab, [A,B,C]) of
        true  -> is_word1(Tab, [B,C|D]);
        false -> false
    end;
is_word1(_, _) ->
    false.

is_this_a_trigram(Tab, X) ->
    % 以  lookup 的方式,確認是不是在 字典產生的所有 trigram 裡面
    case ets:lookup(Tab, list_to_binary(X)) of
        [] -> false;
        _  -> true
    end.

open() ->
    % filename:dirname(code:which(?MODULE)) 可取得載入目前模組的目錄
    {ok, I} = ets:file2tab(filename:dirname(code:which(?MODULE)) 
                               ++ "/trigramsS.tab"),
    I.

close(Tab) -> ets:delete(Tab).

建立 table 要花的時間

測量產生 set, ordered set, 或使用 erlang sets 模組 所要花的時間,以及平均每個 trigram 耗用的位元組大小

1> lib_trigrams:make_tables().
Counting - No of trigrams=3357707 time/trigram=0.30437438406626904
Ets ordered Set size=19.029156153630503 time/trigram=1.0733515461593284
Ets set size=19.028408559947668 time/trigram=0.7421731556684368
Module sets size=9.433978132884777 time/trigram=2.5252352274930483
ok

總共有 3357707 個trigram,每個trigram處理了 0.3 sec。
在 ordered set,插入一個 trigram 要花 1.07 sec,耗用 19.029 bytes
在 set,插入一個 trigram 要花 0.74 sec,耗用 19.028 bytes
在 sets,插入一個 trigram 要花 2.52 sec,耗用 9.433 bytes

存取 table 要花的時間

2> lib_trigrams:timer_tests().
Ets ordered Set lookup=0.6541444724792076 micro seconds
Ets set lookup=0.37379684141669 micro seconds
Module set lookup=0.5606952621250351 micro seconds
ok

結果

ETS set 的效能最好

預測某個字串是否為一個英文單字

3> Table=lib_trigrams:open().
32784
4> lib_trigrams:is_word(Table, "test").
true
5> lib_trigrams:is_word(Table, "tess").
true
6> lib_trigrams:is_word(Table, "ess").
true
7> lib_trigrams:is_word(Table, "esx").
false
8> lib_trigrams:is_word(Table, "esxit").
false
9> lib_trigrams:close(Table).
true

DETS

DETS 將 erlang tuple 儲存在 disk 裡面,檔案最大的體積為 2GB,必須先開啟才能使用,使用完畢也要關閉檔案。如果沒有正確關閉,下次開啟會自動修復,但會花費一些時間。

當一個 DETS 表被開啟,必須指定一個全域的名稱,如果兩個以上的本地行程用相同名稱 & 選項開啟一個 DETS,這些行程就會共享表格,此表格會一直開啟,直到所有行程把表格關閉/行程當機後,才真正地關閉。

範例:檔名索引

建立一個磁碟式表格,將檔案對應到整數,或反向對應:filename2index & index2filename。

建立一個 DETS表,在內部放入三個不同型別的值組 tuple

{free, N}
    N 是表格中的第一個自由的索引,當我們在表格中輸入一個新檔名,會被指定索引 N

{FileNameBin, K}
    FileNameBin(二元) 被指定索引 K

{K, FileNameBin}
    K (整數) 代表檔案 FilenameBin

注意每個新檔案的加入,如何在表格內加入兩個項目:
File -> Index
Index -> Filename

先寫一個程式,會開啟/關閉「儲存所有檔案名稱」的 DETS表

-module(lib_filenames_dets).
-export([open/1, close/0, test/0, filename2index/1, index2filename/1]).

open(File) ->
    io:format("dets opened:~p~n", [File]),
    Bool = filelib:is_file(File),
    % ?MODULE 會自動展開成 lib_filenames_dets,因為 DETS 必須要有一個唯一的 table 名稱
    % 而 模組名稱也是唯一的,所以就使用 ?MODULE 來當作 Table 名稱
    case dets:open_file(?MODULE, [{file, File}]) of
        % 開啟 dets,但以 filelib:is_file 判斷是不是新的檔案
        % 如果檔案原本不存在,就 insert 一個 {free,1} 來初始化這個 DETS
        {ok, ?MODULE} ->
            case Bool of
                true  -> void;
                false -> ok = dets:insert(?MODULE, {free,1})
            end,
            true;
        {error,_Reason} ->
            io:format("cannot open dets table~n"),
            exit(eDetsOpen)
    end.

close() -> dets:close(?MODULE).

% 為求高效率,檔名必須要是 binary
% 在 ETS, DETS 裡面,要習慣用 binary 來表示字串
%% 注意: 這裡可能會產生 race condition,在 dets:insert 被呼叫前,
%% 如果有兩個平行的 process 呼叫了 dets:lookup,filename2index將產生錯誤的結果
filename2index(FileName) when is_binary(FileName) ->
    case dets:lookup(?MODULE, FileName) of
        [] ->
            [{_,Free}] = dets:lookup(?MODULE, free),
            ok = dets:insert(?MODULE,
                             [{Free,FileName},{FileName,Free},{free,Free+1}]),
            Free;
        [{_,N}] ->
            N
    end.

% 將索引值換成檔名
index2filename(Index) when is_integer(Index) ->
    case dets:lookup(?MODULE, Index) of
        % 發生錯誤時,回傳 error
        []        -> error;
        [{_,Bin}] -> Bin
    end.

test() ->
    file:delete("./filenames.dets"),
    open("./filenames.dets"),
    F = lib_files_find:files(".", ".ebeam$", true),
    lists:foreach(fun(I) -> filename2index(list_to_binary(I)) end, F).

測試

1> lib_filenames_dets:test().
dets opened:"./filenames.dets"
ok

ETS 與 DETS 的其他功能

  1. 根據 pattern 來取出或刪除物件
  2. 在 ETS 與 DETS 之間做轉換,或是在 ETS 與磁碟檔案之間做轉換
  3. 為一個 Table 找出 resource usage
  4. traverse Table 內的元素
  5. 修復破碎的 DETS table
  6. 視覺化 Table

ETS 與 DETS 原本是設計用來實現 Mnesia 的,Mnesia 內部使用了 ETS 與 DETS Table,並把一些細節隱藏起來,提供了高階的功能。

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World

2014/4/14

erlang - Socket Programming

有兩個主要的函式庫:gen_tcp 與 gen_udp

從 web server 取得資料, tcp client

nano_get_url 可取得網頁的 html 內容

nano_get_url() ->
    nano_get_url("www.google.com").

nano_get_url(Host) ->
    %% 對 Host:80 開啟 TCP socket
    {ok,Socket} = gen_tcp:connect(Host,80,[binary, {packet, 0}]),
    %% 對此 Socket,傳送 GET / HTTP/1.0\r\n\r\n 字串的資料
    ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"),
    %% 等待接收server 回傳的資料
    receive_data(Socket, []).

receive_data(Socket, SoFar) ->
    receive
    %% 因為是用 binary 的方式開啟 socket,如果收到資料片段 Bin
    %% 就將它接到 SoFar 中暫存起來
    {tcp,Socket,Bin} ->
        receive_data(Socket, [Bin|SoFar]);
    %% 如果收到 tcp_closed 訊號,就表示 web server 送完資料,把 socket 斷線
    %% 就將資料 reverse,再將 binary list 連接成一整個 binary 資料
    {tcp_closed,Socket} ->
        list_to_binary(reverse(SoFar))
    end.

測試後,可取得 B 這一塊 binary 資料,然後可以用 io:format("~p~n", [B]) 將所有資料列印到畫面上,也可以用 string:tokens(binary_to_list(B), "\r\n") ,以 \r\n 切割資料,然後一行一行印出來。

1> B = socket_examples:nano_get_url().
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.com.tw/?gws_rd=cr&ei=W534Ur
qDKMLnkAWA-IHIDA\r\nCache-Control: private\r"...>>
2> io:format("~p~n", [B]).
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.com.tw/?gws_rd=cr&ei=W534Ur
qDKMLnkAWA-IHIDA\r\nCache-Control: private\r\nContent-Type: text/html; charset=U
TF-8\r\nSet-Cookie: PREF=ID=2d1f3f73d3fdb6d9:FF=0:TM=1392024923:LM=1392024923:S=
P-4nPCjpFPUCiXBs; expires=Wed, 10-Feb-2016 09:35:23 GMT; path=/; domain=.google.
...
3> string:tokens(binary_to_list(B), "\r\n").
["HTTP/1.0 302 Found",
 "Location: http://www.google.com.tw/?gws_rd=cr&ei=W534UrqDKMLnkAWA-IHIDA",
 "Cache-Control: private",
 "Content-Type: text/html; charset=UTF-8",
...

TCP Server: evaluate erlang expression, tcp server

需求是要有一個 tcp server, port 為 2345,他會等待 binary 訊息,裡面是一個 erlang expression,server運算後,將結果回傳給 client。

要想寫出任何一個 tcp 程式,必須先回答下列問題,因為 tcp socket 資料只是一個 bytes streaming,在傳輸期間,資料可以被打碎成任意長度的片段。

  1. 資料的格式,要如何知道 request 或 response 是由多少資料組成
  2. 在 request/response 內的資料,要如何編碼(marshaling)與解碼(de-marshaling)

{packet, N}

在 erlang 中,request/response可以用前置的 N(1或2或4) 個 bytes,來表示資料的長度,這也是 gen_tcp:connect 與 gentcp:listen 中 {packet, N} 參數的意義。

當我們利用 {packet, N} 開啟 socket 時,erlang driver 會自動將被打碎的資料片段,接合在一起。

term_to_binary 與 binary_to_term

erlang term 的編碼與解碼,可直接使用 term_to_binary 與 binary_to_term,這樣就不需要處理 http 或 xml 的文字編碼,不只速度快,傳送的資料也比較少。

程式

server 端的程式

start_nano_server() ->
    %% 開啟 tcp server port 2345,{packet, 4} 表示用 4 bytes 的資料長度 header
    %% gen_tcp:listen 會傳回 {ok, Socket} 或 {error, Why}
    {ok, Listen} = gen_tcp:listen(2345, [binary,     {packet, 4}, {reuseaddr, true}, {active, true}]),
    %% 將 Listen 綁定至 listen socket
    %% 在這裡,程式會暫停並等待 tcp client 連線
    {ok, Socket} = gen_tcp:accept(Listen),
    %% 當有 tcp client 連線後,就馬上關閉 Listen,這樣就不會再收到新的連線
    %% 且關閉後,不會影響既有的連線
    gen_tcp:close(Listen),
    %% 處理 socket 資料
    loop(Socket).

loop(Socket) ->
    receive
    {tcp, Socket, Bin} ->
        io:format("Server received binary = ~p~n",[Bin]),
        %% 將收到的資料 unmarshaling
        Str = binary_to_term(Bin),
        io:format("Server (unpacked)  ~p~n",[Str]),

        %% 估算 term
        Reply = lib_misc:string2value(Str),
        io:format("Server replying = ~p~n",[Reply]),

        %% 把結果 marshaling 之後,送進 socket
        gen_tcp:send(Socket, term_to_binary(Reply)),

        %% 等待此 tcp client 發送下一個 term,並處理
        loop(Socket);
    {tcp_closed, Socket} ->
        io:format("Server socket closed~n")
    end.

client 端的程式

nano_client_eval(Str) ->
    %% 開啟 socket,連接到 tcp port 2345
    {ok, Socket} = gen_tcp:connect("localhost", 2345,
            [binary, {packet, 4}]),
    %% 將 term 以 term_to_binary 編碼後,發送到 socket
    ok = gen_tcp:send(Socket, term_to_binary(Str)),

    %% 等待接收結果
    receive
    {tcp,Socket,Bin} ->
        io:format("Client received binary = ~p~n",[Bin]),
        %% 以 binary_to_term 解碼後,列印到畫面上
        Val = binary_to_term(Bin),
        io:format("Client result = ~p~n",[Val]),
        gen_tcp:close(Socket)
    end.

測試,啟動server時,畫面會停在這邊

1> socket_examples:start_nano_server().

啟動 client

1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])").

server 會收到資料,並估算結果

Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50,48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4, 10+20])"
Server replying = {14,30}
Server socket closed
ok

同時,client也會收到 server 回傳的結果

Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok

改進 server

剛剛的 server 只會接受一個 client 連線,接下來嘗試修改server,讓它能接受多個連線。改進的方式有以下兩種

  1. 序列伺服器:一次接受一個連線
  2. 平行伺服器:同時接收多個連線
序列伺服器:一次接受一個連線

原本的程式為

start_nano_server() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
    {ok, Socket} = gen_tcp:accept(Listen),
    gen_tcp:close(Listen),
    loop(Socket).

將程式改為以下的樣子,在 loop 完成後,繼續呼叫 seq_loop,讓它等候下一個連線。

start_seq_server() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
    seq_loop(Listen).

seq_loop(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    loop(Socket),
    seq_loop(Listen).

測試

1> socket_examples:start_seq_server().
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,
                           112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50,
                           48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4, 10+20])"
Server replying = {14,30}
Server socket closed
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,
                           112,108,101,40,91,50,43,51,42,52,44,32,50,48,43,50,
                           48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4, 20+20])"
Server replying = {14,40}
Server socket closed

client 的部份

1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])").
Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok
2> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 20+20])").
Client received binary = <<131,104,2,97,14,97,40>>
Client result = {14,40}
ok
平行伺服器:同時接收多個連線

在每一次 gen_tcp:accept 一產生新的連線後,就馬上 spawn 產生一個新的 process。

start_parallel_server() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
    spawn(fun() -> par_connect(Listen) end).

par_connect(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen) end),
    loop(Socket).
註記
  1. 建立 socket 的行程(呼叫 gen_tcp:accept 或是 gen_tcp:connect )被稱為該 socket 的控制行程,來自 socket 的所有訊息都會送到控制行程中,如果控制行程死亡,socket 就會被關閉。可使用 gen_tcp:controlling_process(Socket, NewPid) 將控制行程換成 NewPid
  2. 平行伺服器可建立數千個連線,我們可能想限制同時連線的數量,可以使用一個 counter 記錄連線數。
  3. 接受連線後,最好明確設定 socket 選項:
     {ok, Socket} = gen_tcp:accept(Listen),
     inet:setopts(Socket, [{packet, 4}, binary, {nodelay,true}, {active, true}]),
     loop(Socket).
  4. 在 Erlang R11B-3,數個 erlang process 可對相同的 listen socket 呼叫 gen_tcp:accept/1,這可調整平行伺服器,用一些 pre-spawned processes pool,全部都等待 gen_tcp:accept/1。

控制 socket

erlang socket 可以三種模式開啟:active, active once, passive。active once 是建立主動 socket,收到訊息後,想接收下一個訊息,必須先重新啟用才行。

方式是在 gen_tcp:connect(Address, Port, Options) 或是 gen_tcp_listen(Port, Options) 的 Options 中使用 {active, true | false | once} 的設定。

主動與被動 socket 的差異:當 Socket 收到訊息時

  1. active socket 收到資料時,會送出 {tcp, Socket, Data} 給控制行程,控制行程無法控制這些訊息的流入。惡劣的客戶端就可能會送出數千個訊息給伺服器。
  2. passive socket 的控制行程必須呼叫 gen_tcp:recv(Socket, N),才能從 socket 接收 N bytes 的訊息,如果 N = 0,全部有效的 bytes data 都會被送出來。因此伺服器就能自行選擇何時呼叫 gen_tcp:recv ,這樣才能控制訊息流。

我們可用三種方式,撰寫 server 接收資料的迴圈

  1. active 訊息接收 - nonblocking
  2. passive 訊息接收 - blocking
  3. 混合訊息接收 - 部份 blocking
active 訊息接收 - nonblocking

process 無法控制進入 server 迴圈的訊息流,如果 client 端產生資料的速度比 server 消化還快,系統就會受到 message flooding,存放訊息的 mailbox 就有可能會 overflow,造成系統 crash。

因為無法阻塞 client,只有在我們確信能夠應付 client 的資料量時,才能使用 nonblocking server。

{ok, Listen} = gen_tcp:listen(Port, [..., {active, true}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            ... do something with the data
        {tcp_closed, Socket} ->
            ...
    end.
passive 訊息接收 - blocking

server loop 只會在想要接收訊息時,才會呼叫 gen_tcp:recv。客戶端會被阻塞,直到伺服器呼叫 recv 為止。

注意:OS有做一些緩衝處理,即使尚未呼叫 recv,OS也會允許客戶端在被阻塞之前,可以送少量資料進來。

{ok, Listen} = gen_tcp:listen(Port, [..., {active, false}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    case gen_tcp:recv(Socket, N) of
        {ok, B} ->
            ... do something with the data
            loop(Socket);
        {error, closed} ->
            ...
    end.
混合訊息接收 - 部份 blocking

只使用 passive mode 並不是最正確的作法,因為在 passive mode,只能等待一個 socket 的資料,對於必須等待多個 socket 資料的 server 來說,這是行不通的。

我們可以在開啟 Socket 時,使用 {active, once} ,這時候,Socket 只會對一個訊息主動。在控制行程收到一個訊息後,必須主動呼叫 inet:setopts 才能再次讓下一個訊息被接收,在此之前,系統會阻塞訊息。

使用 {active,once} ,使用者可實現 traffic shaping ,且避免 server 被過度積極湧入的訊息淹沒。

{ok, Listen} = gen_tcp:listen(Port, [..., {active, once}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            ... do something with the data

            %% when you are ready to receive next message
            inet:setopts(Socket, [{active, once}]),
            loop(Socket);
        {tcp_closed, Socket} ->
            ...
    end.

連線來源

要知道 client 端的資訊,可以呼叫 inet:peername(Socket)

@spec inet:peername(Socket) -> {ok, {IP_Address, Port} | {error, Why}}

IP_Address 中
{N1, N2, N3, N4} 代表 IPv4
{K1, K2, K3, K4, K5, K6, K7, K8} 代表 IPv6

所有 Ni 與 Ki 都是介於 0 ~ 255 的整數

Socket 的錯誤處理

因為每一個 Socket 都有控制行程,當控制行程死亡,Socket 就會自動關閉。

error_test() ->
    %% 產生 server process
    spawn(fun() -> error_test_server() end),
    %% 先暫停兩秒,讓 server process 啟動
    lib_misc:sleep(2000),
    %% client 連線到 server
    {ok,Socket} = gen_tcp:connect("localhost",4321,[binary, {packet, 2}]),
    io:format("connected to:~p~n",[Socket]),
    %% 發送訊息 123
    gen_tcp:send(Socket, <<"123">>),
    receive
        %% 接收所有的 response 訊息
        Any ->
            io:format("Any=~p~n",[Any])
    end.

error_test_server() ->
    {ok, Listen} = gen_tcp:listen(4321, [binary,{packet,2}]),
    {ok, Socket} = gen_tcp:accept(Listen),
    error_test_server_loop(Socket).

error_test_server_loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            io:format("received:~p~n",[Data]),
            %% <<"123">> 會讓這一行當掉,server 的控制行程會 crash
            %% 因此會讓客戶端收到 {tcp_closed, Socket} 訊息
            atom_to_list(Data),
            error_test_server_loop(Socket)
    end.

測試

2> socket_examples:error_test().
connected to:#Port<0.594>
received:<<"123">>
Any={tcp_closed,#Port<0.594>}
ok
3>
=ERROR REPORT==== 11-Feb-2014::15:50:16 ===
Error in process <0.35.0> with exit value: {badarg,[{erlang,atom_to_list,[<<3 bytes>>],[]},{socket_examples,error_test_server_loop,1,[{file,"d:/projectcase/erlang/erlangotp/src/socket_examples.erl"},{line,117}]}]}

UDP server and client

UDP datagram 是不可靠的,順序可能被調換、可能會遺失、也可能會重複,且是 connectionless 的,客戶端不需要建立連線,就可以送訊息。

UDP 相當適合「大量客戶端,傳送小訊息到server」的應用情境。

UDP 比 TCP 簡單,因為 server 不需要管理與維護連線。

UDP server 一般的形式如下,server不會收到 socket 關閉的訊息:

server(Port) ->
    {ok, Socket} = gen_udp:open(Port, [binary]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp, Socket, Host, Port, Bin} ->
            BinReply = ...,
            gen_udp:send(Socket, Host, Port, BinReply),
            loop(Socket)
    end.

client 必須要有 after timeout 機制,因為可能永遠等不到回應的訊息。

client(Request) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    ok = gen_udp:send(Socket, "localhost", 4000, Request),
    Value = receive
            {udp, Socket, _, _, Bin} ->
                {ok, Bin}
            after 2000 ->
                error
        end,
    gen_udp:close(Socket),
    Value.

UDP 階乘 server

範例程式

-module(udp_test).
-export([start_server/0, client/1]).

start_server() ->
    %% 產生 server process
    spawn(fun() -> server(4000) end).

%% The server           
server(Port) ->
    %% 開啟 udp server,接收 binary 資料
    {ok, Socket} = gen_udp:open(Port, [binary]),
    io:format("server opened socket:~p~n",[Socket]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp, Socket, Host, Port, Bin} = Msg ->
            io:format("server received:~p~n",[Msg]),
            %% 將 binary 轉換為 erlang term
            N = binary_to_term(Bin),
            %% 運算階乘
            Fac = fac(N),
            %% 把結果回傳給 client
            gen_udp:send(Socket, Host, Port, term_to_binary(Fac)),
            %% 處理下一個訊息
            loop(Socket)
    end.

fac(0) -> 1;
fac(N) -> N * fac(N-1).

%% The client
client(N) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    io:format("client opened socket=~p~n",[Socket]),
    %% 以 term_to_binary 將 term 轉換為 binary
    ok = gen_udp:send(Socket, "localhost", 4000, 
                      term_to_binary(N)),
    Value = receive
                {udp, Socket, _, _, Bin} = Msg ->
                    io:format("client received:~p~n",[Msg]),
                    binary_to_term(Bin)
            after 2000 ->
                    0
            end,
    gen_udp:close(Socket),
    Value.

測試,server 的部份

1> udp_test:start_server().
server opened socket:#Port<0.516>
<0.33.0>
2> server received:{udp,#Port<0.516>,{127,0,0,1},54201,<<131,97,40>>}
2> server received:{udp,#Port<0.516>,{127,0,0,1},54202,<<131,97,20>>}
2> server received:{udp,#Port<0.516>,{127,0,0,1},60708,<<131,97,10>>}

client 的部份

1> udp_test:client(40).
client opened socket=#Port<0.516>
client received:{udp,#Port<0.516>,
                     {127,0,0,1},
                     4000,
                     <<131,110,20,0,0,0,0,0,64,37,5,255,100,222,15,8,126,242,
                       199,132,27,232,234,142>>}
815915283247897734345611269596115894272000000000
2> udp_test:client(20).
client opened socket=#Port<0.527>
client received:{udp,#Port<0.527>,
                     {127,0,0,1},
                     4000,
                     <<131,110,8,0,0,0,180,130,124,103,195,33>>}
2432902008176640000
3> udp_test:client(10).
client opened socket=#Port<0.528>
client received:{udp,#Port<0.528>,{127,0,0,1},4000,<<131,98,0,55,95,0>>}
3628800

補充說明

要注意,因為 UDP 是 connectionless 的協定,server無法藉由拒絕讀取資料而阻塞客戶端。

大型的 UDP packet 可能會被切成片段,以利在網路上傳輸,切割會發生在 router 所接受的 MTU(maximum transfer unit) 比 UDP packet 還小的時候 。一般會建議在 UDP 裡,一開始先使用較小的封包,然後慢慢增加資料量,並量測 throughput,如果 throughput 突然下降,就表示封包太大了。

UDP packet 有可能會被傳送兩次,所以在寫 RPC code 時要小心,不然可能就會執行兩次,傳回兩次。要避免發生這個問題時,可以加上 make_ref。

client(Request) ->
    {ok, Socket} -> gen_udp:open(0, [binary]),

    %% 產生唯一的識別參考
    Ref = make_ref(),
    B1 = term_to_binary(Ref, Request),
    ok = gen_udp:send(Socket, "localhost", 4000, B1),
    wait_for_ref(Socket, Ref).

wait_for_ref(Socket, Ref) ->
    receive
        {udp, Socket, _, _, Bin} ->
            case binary_to_term(Bin) of
                {Ref, Val} ->
                    Val;
                {_SomeoOtherRef, _} ->
                    wait_for_ref(Socket, Ref)
        end;
    after 1000 ->
        ...
    end.

廣播到多台機器

我們需要兩個 ports,一個用來送出廣播,一個用來傾聽廣播。

-module(broadcast).
-compile(export_all).

% 將IoList廣播到LAN的所有機器
% 負責廣播的 process 會開啟 port 5010
send(IoList) ->
    case inet:ifget("eth0", [broadaddr]) of
        {ok, [{broadaddr, Ip}]} ->
            {ok, S} =  gen_udp:open(5010, [{broadcast, true}]),
            gen_udp:send(S, Ip, 6000, IoList),
            gen_udp:close(S);
        _ ->
            io:format("Bad interface name, or\n"
                          "broadcasting not supported\n")
    end.

% 因為 windows 的 network interface 並沒有辦法直接取得 broadaddr
% 就直接把 broadcast address 寫在程式裡面
sendwindows(IoList) ->
    {ok, S} =  gen_udp:open(5010, [{broadcast, true}]),
    gen_udp:send(S, "192.168.1.255", 6000, IoList),
    gen_udp:close(S).

% 負責接收廣播的 process 會開啟 port 6000,並等待接收訊息
listen() ->
    {ok, _} = gen_udp:open(6000),
    loop().

loop() ->
    receive
        Any ->
            io:format("received:~p~n", [Any]),
            loop()
    end.

測試:server的部份

1> broadcast:listen().
received:{udp,#Port<0.516>,{192,168,1,57},5010,"{test"}

client

1> broadcast:sendwindows([123, "test"]).
ok

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World

2014/4/1

erlang - interfacing technology

想要把 erlang 跟 C 或 Python 程式連接起來,或是想要在 erlang 裡面執行 shell script,作法是在「獨立的作業系統process」中,執行外部程式,並利用「byte oriented」的communication channel 和該行程通訊。



erlang 利用 port 控制通訊,「負責建立一個 port」的行程,稱為該 port 的 connected process,所有送往外部的訊息都必須貼上connected process的 PID,而外部程式的所有訊息都會被送到 connected process。

port 的作用就像是一個 erlang process,可送訊息給它,可以註冊,如果外部程式 crash,離開訊號就會送到 connected process,如果 connected process 死亡,外部程式就會被 kill。

Port

建立 port

Port = open_port(PortName, PortSettings)

對 Port 發送訊息

Port ! {PidC, {command, Data}}

將 connected process 由 PidC 改為 Pid1

Port ! {PidC, {connect, Pid1}}

關閉 port

Port ! {PidC, close}

connected process 可以收到外部程式送來的訊息,類似這樣:

receive
    {Port, {data, Data}} ->
        ...

open_port

open_port可接受很多Opt設定值,以下列出常見的設定:

@spec open_port(PortName, [Opt]) -> Port

PortName 可以是下列其中之一

  1. {spawn, Command}
    啟動一個外部程式,Command 是外部程式的名稱,除非是 linked-in driver,否則它會是在 erlang 工作空間外部的地方執行
  2. {fd, In, Out}

Opt 可以是下列其中之一

  1. {packet, N}
  2. stream
  3. {line, Max}
  4. {cd, Dir}
  5. {env, Env}

連接外部的 C 程式

如果要撰寫 erlang 程式,呼叫以下的 C 函式

// example1.c
int twice(int x){
  return 2*x;
}

int sum(int x, int y){
  return x+y;
}

在 erlang 中,希望將 example1定義為模組,且用以下方式呼叫,實作的細節隱藏在 example1 中

X1 = example1:twice(23),
Y1 = example1:sum(45, 32),

定義 port 與 外部程式 之間的協定

我們要先定義一個簡單的協定,並分別以 erlang, C 實作。

  1. 所有封包一開始都是 2 bytes 的 Len,後面接著 Len 個 bytes 資料
  2. 呼叫 twice(N),在協定中轉換為 [1,N],1代表 twice,N為參數
  3. 呼叫 sum(N,M), 在協定中轉換為 [2,N,M]
  4. 回傳值的長度為 1 byte

範例

  1. port 送出 [0,3,2,45,32] 給外部程式,0,3 表示封包長度為 3,2 表示呼叫 sum,45 與 32是 sum 的參數
  2. 外部程式從 stdin 讀取這五個位元,呼叫 sum 函數,寫出位元組序列「0,1,77」到 stdout,0,1 表示封包長度為 1,資料內容為 77

C 語言外部程式,實作協定

  1. example1.c: 包含了 twice 與 sum 兩個函式
  2. example1_driver.c: 會終結 byte 串流協定,且呼叫 example1.c 內的函式
  3. erl_comm.c: 具有讀寫記憶體緩衝區的函式

example1_drive.c 執行一個無窮迴圈,持續從 stdin 讀取資料,並把結果寫入 stdout

// example1_drive.c
#include <stdio.h>
typedef unsigned char byte;

int read_cmd(byte *buff);
int write_cmd(byte *buff, int len);

int main() {
  int fn, arg1, arg2, result;
  byte buff[100];

  while (read_cmd(buff) > 0) {
    fn = buff[0];

    if (fn == 1) {
      arg1 = buff[1];
      result = twice(arg1);
    } else if (fn == 2) {
      arg1 = buff[1];
      arg2 = buff[2];
      /* debug -- you can print to stderr to debug
     fprintf(stderr,"calling sum %i %i\n",arg1,arg2); */
      result = sum(arg1, arg2);
    }

    buff[0] = result;
    write_cmd(buff, 1);
  }
}

erl_comm.c,負責在 stdin/stdout 讀寫 2 bytes 開頭的封包。

/* erl_comm.c */
#include <unistd.h>

typedef unsigned char byte;

int read_cmd(byte *buf);
int write_cmd(byte *buf, int len);
int read_exact(byte *buf, int len);
int write_exact(byte *buf, int len);

int read_cmd(byte *buf)
{
  int len;

  if (read_exact(buf, 2) != 2)
    return(-1);
  len = (buf[0] << 8) | buf[1];
  return read_exact(buf, len);
}

int write_cmd(byte *buf, int len)
{
  byte li;

  li = (len >> 8) & 0xff;
  write_exact(&li, 1);

  li = len & 0xff;
  write_exact(&li, 1);

  return write_exact(buf, len);
}

int read_exact(byte *buf, int len)
{
  int i, got=0;

  do {
    if ((i = read(0, buf+got, len-got)) <= 0)
      return(i);
    got += i;
  } while (got<len);

  return(len);
}

int write_exact(byte *buf, int len)
{
  int i, wrote = 0;

  do {
    if ((i = write(1, buf+wrote, len-wrote)) <= 0)
      return (i);
    wrote += i;
  } while (wrote<len);

  return (len);
}

erlang 程式

-module(example1).
-export([start/0, stop/0]).
-export([twice/1, sum/2]).

start() ->
    spawn(fun() ->
          register(example1, self()),
          process_flag(trap_exit, true),
          Port = open_port({spawn, "./example1"}, [{packet, 2}]),
          loop(Port)
      end).

stop() ->
    % 發送訊息,讓 example1 停止,關閉 port 與 外部程式
    example1 ! stop.

twice(X) -> call_port({twice, X}).
sum(X,Y) -> call_port({sum, X, Y}).

call_port(Msg) ->
    % 以訊息方式發送 API request 給 example1
    example1 ! {call, self(), Msg},
    receive
    % 等待接收結果
    {example1, Result} ->
        Result
    end.

loop(Port) ->
    receive
    {call, Caller, Msg} ->
        % 對 Port 發送訊息, 資料內容是將呼叫的參數,轉換為 list
        % self() 為 connected process 的 PID
        Port ! {self(), {command, encode(Msg)}}, 
        receive
        % 收到外部程式送來的訊息
        {Port, {data, Data}} ->
            % 將結果解碼後,發送給 Caller
            Caller ! {example1, decode(Data)}
        end,
        loop(Port);
    stop ->
        % 關閉 port
        Port ! {self(), close},
        receive
        % 收到外部程式 送來關閉的訊息
        {Port, closed} ->
            % 送出 exit signal
            exit(normal)
        end;
    % 收到 exit signal
    {'EXIT', Port, Reason} ->
        exit({port_terminated,Reason})
    end.

encode({twice, X}) -> [1, X];  
encode({sum, X, Y}) -> [2, X, Y]. 

decode([Int]) -> Int.

編譯與測試

編譯
gcc -o example1 example1.c erl_comm.c example1_driver.c
erlc -W *.erl

測試

1> example1:start().
<0.34.0>
2> example1:sum(45,32).
77
3> example1:twice(10).
20
4> example1:twice(14).
28

注意

  1. 此範例並沒有統一 erlang 與 c 對整數的定義。直接假設兩個都是用單一個byte來當作整數,並忽略精確度、正負號的問題。
  2. 必須要先啟動負責界面的driver程式,也就是要先執行 example1:start(),然後才能執行此程式。

附註

erlang 跟外部程式之間傳遞資料,其資料內容的結構必須由 programmer 自行處理,這跟 socket programming 一樣, socket 在兩個程式之間提供 byte streaming 的傳輸,至於建構在 socket 上面的 app 要如何使用,就要由 app 自行決定。

erlang 有幾個函式庫可簡化界面銜接的問題。

  1. http://www.erlang.org/doc/pdf/erl_interface.pdf
    ei 是一組 C 函式與巨集,可編解碼 erlang 外部格式。在 erlang 端,一個 erlang 程式使用 term_to_binary 將 erlang terms 序列化,在 C 語言端, ei 的函式可用來解碼此 binary 資料。相反地,ei 可用來建構二元資料,而 erlang 端就以 binary_to_term 將 binary 資料解碼。
  2. http://www.erlang.org/doc/pdf/ic.pdf
    erlang IDL 編譯器 ic,這是 erlang 對 OMG IDL 編譯器的實作。
  3. http://www.erlang.org/doc/pdf/jinterface.pdf
    Jinterface 是處理 java 跟 erlang 之間的介面,它可以將 erlang 型別完整地對應到 java 物件,為 erlang terms 編碼解碼,連結到 erlang process等等

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World