2014/3/27

erlang - distributed programming

分散式程式

為什麼要開發分散式程式

  1. 效能:將程式不同的部份,指派給不同的電腦運算
  2. 可靠度:容錯
  3. 規模:服務更多的使用者
  4. 本質分散的應用:網路遊戲或聊天這些天生就分散的應用

分散式模型

  1. 分散式 erlang:以erlang 的 process 為基礎,寫出來的程式,不同 process 運作的程式,可以將 process 分配到不同機器節點上,程式不需要修改。
  2. 以 socket 為基礎的分散式應用:分散式erlang必須要在信賴的區域網路內運作,在非信賴的公用網路上,必須要以 TCP/IP 網路為基礎,撰寫網路分散式程式

開發的步驟

  1. 在單一erlang 節點中,撰寫並測試程式
  2. 在同一台電腦,兩個 erlang 節點中,測試程式
  3. 在兩台電腦的 erlang 節點中,測試程式
  4. 兩台電腦在不同 domain,不同區域網路上,測試程式

最後一個步驟是很重要的,在遠端網路的環境中運作程式,就需要一併確認防火牆是不是有開放連線。

Sample: key-value cache server

在單一erlang 節點中,撰寫並測試程式

server 的程式界面規格

  1. @spec kvs:start() -> true
    啟動 server,註冊名稱為 kvs
  2. @spec kvs:store(Key, Value) -> true
    儲存 Key - Value
  3. @spec kvs:lookup(Key) -> {ok, Value} | undefined
    尋找 key

因為這只是一個簡單的範例,所以是用 process dictionary 做出來的,使用 process dictionary 要注意以下條件:盡量不要使用 process dictionary,可能會導致一些 bug,難以除錯,只有一個地方可以使用,就是拿來儲存「只寫入一次」的變數,如果一個 key 得到的一個值僅此一次,且不會再變動,那麼就可以放入 process dictionary。

-module(kvs).
-export([start/0, store/2, lookup/1]).

start() -> register(kvs, spawn(fun() -> loop() end)).

store(Key, Value) -> rpc({store, Key, Value}).

lookup(Key) -> rpc({lookup, Key}).

rpc(Q) ->
    kvs ! {self(), Q},
    receive
        {kvs, Reply} ->
            Reply
    end.

loop() ->
    receive
        {From, {store, Key, Value}} ->
            put(Key, {ok, Value}),
            From ! {kvs, true},
            loop();
        {From, {lookup, Key}} ->
            From ! {kvs, get(Key)},
            loop()
    end.

測試

1> kvs:start().
true
2> kvs:store({location, joe}, "Island").
true
3> kvs:store(weather, raining).
true
4> kvs:lookup(weather).
{ok,raining}
5> kvs:lookup({location, joe}).
{ok,"Island"}
6> kvs:lookup({location, mary}).
undefined
7> kvs:store(weather, sunny).
true
8> kvs:lookup(weather).
{ok,sunny}

在同一台電腦,兩個 erlang 節點中,測試程式

以 erl -sname kvs_server 啟動 kvs_server 節點

>erl -sname kvs_server
Eshell V5.10.4  (abort with ^G)
(kvs_server@yaoclNB)1> kvs:start().
true

以 erl -sname kvs_client 啟動 kvs_client 節點,然後用 rpc:call(kvs_server@yaoclNB, kvs, store, [weather, fine]) 進行遠端呼叫。

>erl -sname kvs_client
Eshell V5.10.4  (abort with ^G)
(kvs_client@yaoclNB)1> rpc:call(kvs_server@yaoclNB, kvs, store, [weather, fine]).
true
(kvs_client@yaoclNB)2> rpc:call(kvs_server@yaoclNB, kvs, lookup, [weather]).
{ok,fine}

在 kvs_server 節點,kvs:lookup 確實可以查詢出,遠端呼叫填寫進去的 weather - fine 的資料。

(kvs_server@yaoclNB)2> kvs:lookup(weather).
{ok,fine}

在 LAN 的兩台電腦的 erlang 節點中,測試程式

以 erl -name kvs_server -setcookie abc 啟動 kvs_server 節點,因為要遠端使用,所以是以 -name 要求 erlang 以長的網域名稱建立此 erlang 節點。-setcookie 是 erlang 節點之間的安全檢查,當此 cookie 值不同時,這兩個 erlang 節點就無法互相進行遠端呼叫。

[root@git temp]# erl -name kvs_server -setcookie abc
Erlang R16B03 (erts-5.10.4) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
(kvs_server@git.maxkit.com.tw)1>

如果執行上面的指令後產生以下這個錯誤訊息。

{error_logger,{{2014,1,27},{15,3,21}},"Can't set long node name!\nPlease check your configuration\n",[]}

這時候,就要去檢查機器的 hostname,此 hostname 必須要包含網域資料,這樣才能正確以長的網域名稱建立此 erlang 節點。

以 CentOS 來說,必須要修改 /etc/sysconfig/network 檔案,並以 hostname 指令設定。

>vi /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=git.maxkit.com.tw
GATEWAY=192.168.1.1
>hostname git.maxkit.com.tw

完整的測試過程如下

kvs_server 的部份

[root@git temp]# erl -name kvs_server -setcookie abc
Erlang R16B03 (erts-5.10.4) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
(kvs_server@git.maxkit.com.tw)1> kvs:start().
true

kvs_client 的部份

[root@koko temp]# erl -name kvs_client -setcookie abc
Erlang R16B03 (erts-5.10.4) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
(kvs_client@koko.maxkit.com.tw)1> rpc:call('kvs_server@git.maxkit.com.tw', kvs, store, [weather, fine]).
true
(kvs_client@koko.maxkit.com.tw)2> rpc:call('kvs_server@git.maxkit.com.tw', kvs, lookup, [weather]).
{ok,fine}

-setcookie 是 erlang 節點之間的安全檢查,當此 cookie 值不同時,這兩個 erlang 節點就無法互相進行遠端呼叫。

kvs_server 的 cookie 為 abc

[root@git temp]# erl -name kvs_server -setcookie abc
Erlang R16B03 (erts-5.10.4) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
(kvs_server@git.maxkit.com.tw)1> kvs:start().
true
(kvs_server@git.maxkit.com.tw)2>
=ERROR REPORT==== 27-Jan-2014::15:16:20 ===
** Connection attempt from disallowed node 'kvs_client@koko.maxkit.com.tw' **

kvs_client 的 cookie 為 def,在遠端呼叫 rpc:call 時,就會得到 {badrpc,nodedown} 的錯誤訊息,且 kvs_server 會出現 dissallowed 連線的錯誤訊息。

[root@koko temp]# erl -name kvs_client -setcookie def
Erlang R16B03 (erts-5.10.4) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
(kvs_client@koko.maxkit.com.tw)1> rpc:call('kvs_server@git.maxkit.com.tw', kvs, store, [weather, fine]).
{badrpc,nodedown}

在兩個遠端遠端機器中,進行 erlang 分散式運算,有下列步驟。

  1. 啟動 erlang 時要使用 -name,如果是單一機器,則使用 -sname 短機器名稱就可以了。
  2. 確認兩個 erlang nodes 有相同的 cookie,使用 -setcookie。erl 預設會讀取 $HOME/.erlang_cookie 檔案,作為預設的 cookie value,因此在單一機器上,運作兩個 nodes,就不需要加上 -setcookie,直接修改 .erlang_cookie 檔案內容,也可以讓兩個 erlang nodes 環境 cookie 一樣。
  3. 確認 fully qualified hostname 可以被 DNS 解析,如果不能修改 DNS entries,則直接修改機器上的 hosts 檔案,CentOS 為 /etc/hosts,Windows 則在 c:\windows\system32\hosts
  4. 確認兩個系統有相同的程式碼版本,有幾種方式可達到此要求
    (a) 自行複製 kvs.erl
    (b) NFS 共享的磁碟
    (c) 設定程式碼伺服器:erl_prim_loader 模組
    (d) 使用 nl(Mod) 命令,這會在所有連結的電腦上載入 Mod 模組,但前提是要先用 net_admin:ping(Node) 的方式,將 erlang 節點連結在一起

兩台電腦在不同 domain,不同區域網路上,測試程式

基本上就跟在 LAN 的兩台電腦的 erlang 節點中一樣,但因為區域網路之間各有防火牆保護,要進行分散式 erlang 就要採取下列步驟。

  1. 網路要開放 TCP 與 UDP Port 4369,因為 epmd(erlang port mapper daemon) 會使用到這兩個 port
  2. 選擇一個 port 或是一個範圍的 port,進行分散式 erlang,確保這些 port 是開放的。如果這個 port 範圍是 Min 與 Max,就用以下指令啟動 erlang node
    >erl -name ... -setcookie ... -kernel inet_dist_listen_min Min inet_dist_listen_max Max
    如果只有開放一個 Port,就讓上面指令中的 Min=Max 即可。

分散式應用的相關函數

erlang node是一個有自己的位址空間,自己的行程集合,完整的vm的系統。

每個節點都有一個單一的 cookie,在一組 erlang node 之間,cookie 必須要一樣。互連且有相同 cookie 的節點,集合起來就是一個 erlang cluster。

分散式應用的相關BIF如下

  1. @spec spawn(Node, Fun) -> Pid
    在 Node 節點上生成 process
  2. @spec spawn(Node, Mod, Fun, ArgList) -> Pid
    在 Node 節點上生成 process
  3. @spec spawn_link(Node, Fun) -> Pid
  4. @spec spawn_link(Node, Mod, Fun, ArgList) -> Pid
  5. @spec disconnect_node(Node) -> bool() | ignored
    強迫節點斷線
  6. @spec monitor_node(Node, Flag) -> true
    當 Flag 為 true,會打開監控功能,如果 Node 加入或離開「連接的 erlang 節點」集合時,負責估算此 BIF 的行程,將會收到 {nodeup, Node} 與 {nodedown, Node} 訊息
  7. @spec node() -> Node
    本地節點的名稱,如果不是分散式節點,會傳回 nonode@nohost
  8. @spec node(Arg) -> Node
    Arg 為 PID、Ref或Port,會傳回 Arg 所在的節點
  9. @spec nodes() -> [Node]
    傳出連結的所有其他節點的 list
  10. @spec is_alive() -> bool()
    如果是系統的一部分,就傳出 true
  11. {RegName, Node}!Msg
    會送出 Msg 到 Node 節點註冊的行程 RegName

分散式應用相關的函式庫

  1. rpc
    call(Node, Mod, Fun, ArgList) -> Result | {badrpc, Reason}
  2. global
    提供註冊分散式系統名稱與鎖的函數

遠端生成的範例

-module(dist_demo).
-export([rpc/4, start/1]).

start(Node) ->
    spawn(Node, fun() -> loop() end).

rpc(ServerPid, M, F, A) ->
    ServerPid ! {rpc, self(), M, F, A},
    receive
        {SenderPid, Response} ->
            Response
    end.

loop() ->
    receive
        {rpc, SenderPid, M, F, A} ->
            SenderPid ! {self(), (catch apply(M, F, A))},
            loop()
    end.

測試
啟動 server 節點

[root@git temp]# erl -name server -setcookie abc
Erlang R16B03 (erts-5.10.4) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
(server@git.maxkit.com.tw)1>

啟動 client 節點

[root@koko temp]# erl -name client -setcookie abc
Erlang R16B03 (erts-5.10.4) [source] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V5.10.4  (abort with ^G)
(client@koko.maxkit.com.tw)1>

在 client 節點呼叫遠端生成程式,產生 server 節點的 process,並投過訊息傳遞,遠端估算 erlang:node(),然後取得結果。

(client@koko.maxkit.com.tw)1> Pid = dist_demo:start('server@git.maxkit.com.tw').
<6016.43.0>
(client@koko.maxkit.com.tw)2> dist_demo:rpc(Pid, erlang, node, []).
'server@git.maxkit.com.tw'
(client@koko.maxkit.com.tw)3>

設定 erlang cookie 的方法

  1. 將相同的 cookie 值儲存到 $HOME/.erlang.cookie
  2. 啟動 erl 時,以 -setcookie 指定
  3. erlang:set_cookie(node(), C)

以 socket 為基礎的分散式應用

分散式 erlang 適合寫「彼此信任」的叢集應用,不適合開放式的「不信任」環境。當我們有所有機器的控制權,我們可以用分散式 erlang 統一控管所有的機器。

當不同人管理不同機器,想控制哪些程式可以在他們的機器執行時,就要使用受限制版本的 spawn。

lib_chan

lib_chan 是作者 Joe Armstrong 提供的一個 module,可讓使用者手動控制那個行程可以在這個機器上生成。

lib_chan 需要自己編譯,請到 書本的網頁 取得原始程式碼,需要的程式有 lib_chan, lib_chan_auth, lib_chan_cs, lib_chan_mm, lib_md5。

  1. @spec start_server() -> true
    在本地主機上,啟動一個伺服器,該伺服器的行為由 $HOME/.erlang/lib_chan.conf 決定
  2. @spec start_server(Conf) -> true
    在本地主機上,啟動一個伺服器,該伺服器的行為由 Conf 決定
    configuration file 內容包含下列兩項
    (a) {port, NNNN}:對 port number: NNNN 進行監聽
    (b) {service, S, password, P, mfa, SomeMod, SomeFunc, SomeArgsS}:定義一個服務 S,密碼為 P,如果服務開始,就會以 SomeMod:SomeFunc(MM, ArgsC, SomeArgsS) 建立行程,處理來自客戶端的訊息。MM 是一個 proxy 行程的 PID,此代理行程用來傳送訊息給客戶端,參數 ArgsC 是來自客戶端的連線呼叫

  3. @spec connect(Host, Port, S, P, ArgsC) -> {ok, Pid} | {error, Why}
    對主機 Host 開啟 Port,啟動密碼為 P 的伺服器 S,如果密碼正確,會傳回 {ok, Pid},此 Pid 為「負責送訊息到伺服器」的代理行程PID

當客戶端呼叫 connect/5,會產生兩個 proxy process,一個在客戶端,一個在伺服器端。proxy process 負責處理 TCP 封包資料轉換,攔截控制行程的離開訊號,關閉 socket。

範例

configuration file: conf

%% conf
{port, 1234}.
{service, nameServer, password, "ABXy45", mfa, mod_name_server, start_me_up, notUsed}.

當客戶端呼叫
connect(Host, 1234, nameServer, "ABXy45", nil)
伺服器就會生成
mod_name_server:start_me_up(MM, nil, notUsed)
其中 MM 是 proxy process 的 PID,使用它來對客戶端溝通

%% mod_name_server.erl
-module(mod_name_server).
-export([start_me_up/3]).

start_me_up(MM, _ArgsC, _ArgS) ->
    loop(MM).

loop(MM) ->
    receive
        {chan, MM, {store, K, V}} ->
            kvs:store(K, V),
            loop(MM);
        {chan, MM, {lookup, K}} ->
            MM ! {send, kvs:lookup(K)},
            loop(MM);
        {chan_closed, MM} ->
            true
    end.

mod_name_server 遵循下面的協定:

  1. 如果客戶端送出訊息 {send, X} 給伺服器,它會出現在 mod_name_server,訊息格式類似 {chan, MM, X} (MM 是伺服器 proxy process 的 PID)
  2. 如果客戶端終結或用來溝通的 socket 關閉,伺服器會收到 {chan_closed, MM} 的訊息
  3. 如果伺服器想送訊息 X 到客戶端,它會呼叫 MM!{send, X}
  4. 如果伺服器想手動關閉連線,可執行 MM!close

此協定為 middle-man protocol,客戶端與伺服器端都要遵守

測試

1> kvs:start().
true
2> lib_chan:start_server().
lib_chan starting:"lib_chan.conf"
ConfigData=[{port,1234},
            {service,nameServer,password,"ABXy45",mfa,mod_name_server,start_me_up,notUsed}
true

在另一個 erl

1> {ok, Pid} = lib_chan:connect("localhost", 1234, nameServer, "ABXy45", "").
{ok,<0.41.0>}
2> lib_chan:cast(Pid, {store, joe, "writing a book"}).
{send,{store,joe,"writing a book"}}
3> lib_chan:rpc(Pid, {lookup, joe}).
{ok,"writing a book"}
4> lib_chan:rpc(Pid, {lookup, jim}).
undefined

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World