OTP 函式庫中包含了完整的 web server、FTP server、CORBA ORB,OTP 的 behavior 將常用的行為模式包裝起來,可想成 「用 callback 模組進行參數化」的應用框架。
行為解決了問題的「非功能」部份, callback 解決了功能的部份
問題中的非功能部份(ex: 如何動態更新程式碼)對所有應用來說都是一樣的。
以下會
- 以 erlang 寫一個小 client-server 程式
- 將程式一般化
- 進入真正的程式碼
發展通用伺服器的過程
以下程式會慢慢地將程式中,非功能(一般化)與功能的部份區分開來。
server1: 基本伺服器, 用 callback module 將它參數化
server1 是一個基本的 server, 可以用 callback module 將它參數化,換句話說,就是把 callback 的程式跟 server 的程式分開。
%% server1.erl
-module(server1).
-export([start/2, rpc/2]).
start(Name, Mod) ->
register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).
rpc(Name, Request) ->
Name ! {self(), Request},
receive
{Name, Response} -> Response
end.
loop(Name, Mod, State) ->
receive
{From, Request} ->
{Response, State1} = Mod:handle(Request, State),
From ! {Name, Response},
loop(Name, Mod, State1)
end.
以下是 name_server 的 callback module,一般 OTP 程式的習慣寫法,就是把 client 跟 server 端程式,放在同一個 module 裡面。
-module(name_server).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server1, [rpc/2]).
%% client apis,這是讓 client 端呼叫的程式介面
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name) -> rpc(name_server, {whereis, Name}).
%% callback routines,這是讓 server 呼叫的
init() -> dict:new().
handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict) -> {dict:find(Name, Dict), Dict}.
測試
1> server1:start(name_server, name_server).
true
2> name_server:add(tom, "home").
ok
3> name_server:whereis(tom).
{ok,"home"}
注意:name_server callback 沒有共時性、沒有生成、接收、送出、register,這表示我們可以寫 client-server module,而不需要知道下面的共時模型是什麼,這是所有伺服器的基本模型。
server2: 具有 Transaction 的伺服器
下面是一個會讓客戶當機的 server,如果查詢導致 Server 例外,客戶端就會當機。
-module(server2).
-export([start/2, rpc/2]).
start(Name, Mod) ->
register(Name, spawn(fun() -> loop(Name,Mod,Mod:init()) end)).
rpc(Name, Request) ->
Name ! {self(), Request},
receive
%% client 收到 server 回傳的 crash,就跳出程式
{Name, crash} -> exit(rpc);
{Name, ok, Response} -> Response
end.
loop(Name, Mod, OldState) ->
receive
{From, Request} ->
try Mod:handle(Request, OldState) of
{Response, NewState} ->
From ! {Name, ok, Response},
loop(Name, Mod, NewState)
catch
_:Why ->
log_the_error(Name, Request, Why),
%% 因為程式出錯了,發送 crash 訊息給 client
From ! {Name, crash},
%% 再次 loop 等待處理下一個訊息
loop(Name, Mod, OldState)
end
end.
log_the_error(Name, Request, Why) ->
io:format("Server ~p request ~p ~n"
"caused exception ~p~n",
[Name, Request, Why]).
如果例外發生在處理器函數內,它以 State 的原始值進行迴圈
,如果函數成功,它會以「處理器函數所提供的」NewState值進行迴圈,當處理器發生錯誤時,伺服器的狀態會維持不變。
name_server 要改成使用 server2 的話,只需要修改 -import 的地方
-module(name_server).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server2, [rpc/2]).
%% client routines
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name) -> rpc(name_server, {whereis, Name}).
%% callback routines
init() -> dict:new().
handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict) -> {dict:find(Name, Dict), Dict}.
測試
1> server2:start(name_server, name_server).
true
2> name_server:add(tom, "home").
ok
3> name_server:whereis(tom).
{ok,"home"}
4> name_server:whereis(jane).
error
server3: 具有程式碼熱抽換功能的伺服器
如果送給伺服器一個程式碼熱抽換 swap_code 的訊息,就會改變 callback module,變成呼叫新模組
-module(server3).
-export([start/2, rpc/2, swap_code/2]).
start(Name, Mod) ->
register(Name,
spawn(fun() -> loop(Name,Mod,Mod:init()) end)).
swap_code(Name, Mod) -> rpc(Name, {swap_code, Mod}).
rpc(Name, Request) ->
Name ! {self(), Request},
receive
{Name, Response} -> Response
end.
loop(Name, Mod, OldState) ->
receive
{From, {swap_code, NewCallBackMod}} ->
From ! {Name, ack},
loop(Name, NewCallBackMod, OldState);
{From, Request} ->
{Response, NewState} = Mod:handle(Request, OldState),
From ! {Name, Response},
loop(Name, Mod, NewState)
end.
因為 name_server 所註冊綁定的 process name 不能在程式運作過程中修改掉,但我們可以動態地把 name_server process 運作的 module 換掉,達到抽換程式碼的功能。
-module(name_server1).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server3, [rpc/2]).
%% client routines
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name) -> rpc(name_server, {whereis, Name}).
%% callback routines
init() -> dict:new().
handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict) -> {dict:find(Name, Dict), Dict}.
測試
1> server3:start(name_server, name_server1).
true
2> name_server1:add(tom, "home").
ok
3> name_server1:add(jane, "home").
ok
如果我們測試到一半,想要增加「找出 name_server 儲存的」所有名稱,由於目前 name_server1 並沒有這個函數,所以我們就撰寫一個新的 new_name_server,增加了 all_names 與 delete 函數。
-module(new_name_server).
-export([init/0, add/2, all_names/0, delete/1, whereis/1, handle/2]).
-import(server3, [rpc/2]).
%% interface
all_names() -> rpc(name_server, allNames).
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
delete(Name) -> rpc(name_server, {delete, Name}).
whereis(Name) -> rpc(name_server, {whereis, Name}).
%% callback routines
init() -> dict:new().
handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle(allNames, Dict) -> {dict:fetch_keys(Dict), Dict};
handle({delete, Name}, Dict) -> {ok, dict:erase(Name, Dict)};
handle({whereis, Name}, Dict) -> {dict:find(Name, Dict), Dict}.
在 console 就可以直接用 swap_code 把 name_server 執行的 module 換成 new_name_server
4> server3:swap_code(name_server, new_name_server).
ack
5> new_name_server:all_names().
[jane,tom]
以往認為伺服器本身有記錄狀態,發送訊息就會改變狀態,如果要抽換程式邏輯,必須要把 server 停下來,改變程式,然後重新啟動,但實際上,可以透過訊息的方式,讓 server 的迴圈執行新版本的 server module。
Server 4: 具有 Transaction 與 程式碼熱抽換 合併
-module(server4).
-export([start/2, rpc/2, swap_code/2]).
start(Name, Mod) ->
register(Name, spawn(fun() -> loop(Name,Mod,Mod:init()) end)).
swap_code(Name, Mod) -> rpc(Name, {swap_code, Mod}).
rpc(Name, Request) ->
Name ! {self(), Request},
receive
{Name, crash} -> exit(rpc);
{Name, ok, Response} -> Response
end.
loop(Name, Mod, OldState) ->
receive
{From, {swap_code, NewCallbackMod}} ->
From ! {Name, ok, ack},
loop(Name, NewCallbackMod, OldState);
{From, Request} ->
try Mod:handle(Request, OldState) of
{Response, NewState} ->
From ! {Name, ok, Response},
loop(Name, Mod, NewState)
catch
_: Why ->
log_the_error(Name, Request, Why),
From ! {Name, crash},
loop(Name, Mod, OldState)
end
end.
log_the_error(Name, Request, Why) ->
io:format("Server ~p request ~p ~n"
"caused exception ~p~n",
[Name, Request, Why]).
server 5
此程式不做任何事,直到我們告訴它 {become, F},就會變成伺服器 F。
-module(server5).
-export([start/0, rpc/2]).
start() -> spawn(fun() -> wait() end).
wait() ->
receive
{become, F} -> F()
end.
rpc(Pid, Q) ->
Pid ! {self(), Q},
receive
{Pid, Reply} -> Reply
end.
這是計算階層的 server
-module(fac_server).
-export([loop/0]).
loop() ->
receive
{From, {fac, N}} ->
From ! {self(), fac(N)},
loop();
{become, Something} ->
Something()
end.
fac(0) -> 1;
fac(N) -> N * fac(N-1).
測試
1> Pid = server5:start().
<0.33.0>
2> Pid ! {become, fun fac_server:loop/0}
.
{become,#Fun<fac_server.loop.0>}
3> server5:rpc(Pid, {fac, 10}).
3628800
OTP 的 gen_server 就像是最後的 server5,是個一般化,可變成任何一種 server 的 server library,它提供了所有非功能的行為與錯誤處理。
使用 gen_server
三個步驟寫出一個 gen_server callback module
- 為 callback module 取名
- 寫 client 使用的介面函數
- 寫 callback module 中需要的六個 callback funciton
範例
- 為 callback module 取名
my_bank 寫 client 使用的介面函數
start() 啟動銀行
stop() 關閉銀行
new_account(Who) 建立新銀行帳號
deposit(Who, Amount) 存款
withdraw(Who, Amount) 提款這些都是呼叫 gen_server 的函數
start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). stop() -> gen_server:call(?MODULE, stop). new_account(Who) -> gen_server:call(?MODULE, {new, Who}). deposit(Who, Amount) -> gen_server:call(?MODULE, {add, Who, Amount}). withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).
2.1 gen_server:start_link({local, Name}, Mode, ...) 會啟動一個本地伺服器,如果第一個參數填為 {global,GlobalName} ,則此伺服器為 global server
2.2 ?MODULE 會展開成 my_bank
2.3 gen_server:call(?MODULE, Term) 用來對 server 進行遠端呼叫 rpc寫 callback module 中需要的六個 callback funcitons
callbak module 中要撰寫六個 callback functions
init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3
以下為一個 gen_server 程式,最簡單的樣板
-module().
%% 編譯時,如果沒有定義適當的callback function,就會產生警告
-behaviour(gen_server).
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% 呼叫 gen_server:start_link(Name, CallBackMod, StartArgs, Opts) 啟動 server
%% 第一個呼叫的是 Mod:init(StartArgs)
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% 啟動時會被呼叫的函數,回傳值裡面的 State 會在其他函數中使用
init([]) -> {ok, State}.
%% 這是rpc 遠端呼叫時 server 的處理函數,Reply 會被送回 client 端
handle_call(_Request, _From, State) -> {reply, Reply, State}.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, Extra) -> {ok, State}.
以下為 my_bank 實際上用 ets 實作的程式
-module(my_bank).
-behaviour(gen_server).
-export([start/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-compile(export_all).
%% client functions
start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() -> gen_server:call(?MODULE, stop).
new_account(Who) -> gen_server:call(?MODULE, {new, Who}).
deposit(Who, Amount) -> gen_server:call(?MODULE, {add, Who, Amount}).
withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).
%% 6 server callback functions
init([]) -> {ok, ets:new(?MODULE,[])}.
handle_call({new,Who}, _From, Tab) ->
%% 先找看看帳號有沒有存在,舊帳號就不能再 ets:insert
Reply = case ets:lookup(Tab, Who) of
[] -> ets:insert(Tab, {Who,0}),
{welcome, Who};
[_] -> {Who, you_already_are_a_customer}
end,
{reply, Reply, Tab};
%% Who 存錢 X,最後得到餘額
handle_call({add,Who,X}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> not_a_customer;
[{Who,Balance}] ->
NewBalance = Balance + X,
ets:insert(Tab, {Who, NewBalance}),
{thanks, Who, your_balance_is, NewBalance}
end,
{reply, Reply, Tab};
%% Who 提款 X,最後得到餘額
handle_call({remove,Who, X}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] -> not_a_customer;
%% 扣錢之前,要先判斷一下夠不夠,餘額不能變成負值
[{Who,Balance}] when X =< Balance ->
NewBalance = Balance - X,
ets:insert(Tab, {Who, NewBalance}),
{thanks, Who, your_balance_is, NewBalance};
[{Who,Balance}] ->
{sorry,Who,you_only_have,Balance,in_the_bank}
end,
{reply, Reply, Tab};
handle_call(stop, _From, Tab) ->
%% server 收到 stop,會回傳以下訊息,然後停止 server
%% 第二個 normal 會變成 terminate 的第一個參數 _Reason
%% 第三個參數 stopped 會變成 my_bank:stop() 的回傳值
{stop, normal, stopped, Tab}.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, Extra) -> {ok, State}.
測試
1> my_bank:start().
{ok,<0.33.0>}
2> my_bank:deposit("tom", 10).
not_a_customer
3> my_bank:new_account("tom").
{welcome,"tom"}
4> my_bank:new_account("tom").
{"tom",you_already_are_a_customer}
5> my_bank:deposit("tom", 10).
{thanks,"tom",your_balance_is,10}
6> my_bank:deposit("tom", 15).
{thanks,"tom",your_balance_is,25}
7> my_bank:withdraw("tom", 5).
{thanks,"tom",your_balance_is,20}
8> my_bank:withdraw("tom", 35).
{sorry,"tom",you_only_have,20,in_the_bank}
gen_server callback functions 的細節
啟動 server
呼叫 gen_server:start_link(Name,Mod,InitArgs,Opts) 就會啟動 server,建立一個名稱為 Name 的 server,callback module 為 Mod,Opts 控制這個 server 的行為:可指定訊息的記錄、除錯函數等等。在呼叫 Mod:init(InitArgs) 之後,此伺服器就被啟動完成@spec init(Args) -> {ok, State} | {ok, State, Timeout} | ignore | {stop, Reason}
當 init 回傳 {ok, State},就代表啟動成功,且初始狀態為 State
呼叫 server 時,客戶端會呼叫 gen_server:call(Name, Request),進而呼叫 callback function: handle_call/3
@spec handle_call(Request, From, State) -> {reply, Reply, State} | {reply, Reply, State, Timeout} | {noreply, State} | {noreply, State, Timeout} | {stop, Reason, Reply, State} | {stop, Reason, State}
Request 是 gen_server:call 的第二個參數
From 是 client 端發出 Request 的 process PID
State 是server目前的狀態通常會回傳 {reply, Reply, NewState} ,Reply 會回傳給 client 端,變成 gen_server:call 的回傳值,而 NewState 是 server 的下一個狀態
gen_server:cast(Name,Name) 這是不具有回傳值的呼叫,單純的發送訊息給 server
@spec handle_cast(Msg, State) -> {noreply, NewState} | {noreply, NewState, Timeout} | {stop, Reason, NewState}
通常會回傳 {noreply, NewState} 改變 server 的狀態
gen_server:handle_info(Info, State) 是用來處理「自發訊息」的
例如當 server process 被連結到另一個行程,且會捕捉離開訊息,那就可能會收到 {'EXIT', Pid, What} 訊息。或是系統中任何需要知道此 server 的 PID,都可以發送訊息過來,這樣的訊息最後會變成 Info 的值。@spec handle_info(Info, State) -> {noreply, State} | {noreply, State, Timeout} | {stop, Reason, State}
server 終結的理由很多,但最後都會呼叫 terminate(Reason, NewState)
5.1 handle_call, handle_cast, handle_info 可能會傳回 {stop, Reason, NewState}
5.2 server可能會以 {'EXIT', reason } 當機@spec terminate(_Reason, State) -> void()
可以在這裡把資料放進 disk,或是產生新訊息傳給其他 processes,或是直接丟棄
程式碼更新時,會呼叫 code_change
@spec code_change(OldVsn, State, Extra) -> {ok, NewState}
在 emacs 提供的 gen_server template
%%%-------------------------------------------------------------------
%%% File : gen_server_template.erl
%%% Author : my name <yourname@localhost.localdomain>
%%% Description :
%%%
%%% Created : 1 Jan 2014 by my name <yourname@localhost.localdomain>
%%%-------------------------------------------------------------------
-module().
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {}).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
參考
Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World
沒有留言:
張貼留言