2019/10/14

ZeroMQ 4 Reliable Request-Reply Patterns


討論 reliability 的問題,在 ZMQ core request-reply pattern 上,提供 reliable messaging Patterns


  • The Lazy Pirate pattern: reliable request-reply from the client side
  • The Simple Pirate pattern: reliable request-reply using load balancing
  • The Paranoid Pirate pattern: reliable request-reply with heartbeating
  • The Majordomo pattern: service-oriented reliable queuing
  • The Titanic pattern: disk-based/disconnected reliable queuing
  • The Binary Star pattern: primary-backup server failover
  • The Freelance pattern: brokerless reliable request-reply

"Reliability" 是什麼?


以 failure 定義 reliability,如果可以處理 well-defined and unstood failure,就是 reliable。


  1. application code 會 crash, exit, freeze, stop responding to input,或是處理太慢,把記憶體耗盡...
  2. system code (例如 broker) 因某個原因 crash,system code 應該要比 application code 穩定。但還是可能因為 slow clients 造成 queued messages 把記憶體耗盡。
  3. message queue 會發生 overflow,通常在 system code 針對 slow clients 有對應處理方式,就是直接丟棄 message,因此會造成 "lost" message
  4. network 異常,ZMQ 會自動 reconnect,但是 message 會發生 lost
  5. 硬體異常
  6. network 以奇怪的方式故障,例如 switch 的某些 port 故障,造成某些網路區段沒有回應
  7. 因為 lightning, earthquakes, fire, 電力或降溫故障,造成 data centers 異常

為了增加軟體可靠度,有些問題的對應處理已超過 ZMQ 的範圍
前五個異常已經佔了 99.9% 的比例


設計 reliability


"要讓程式在 freezes/crashes 還能正常運作"


  • Request-reply:
    如果 server 在處理某個 request 時掛了,client 會因為沒有收到回應,知道發生問題。可以找另一個 server 重做一次。client 部分如果掛掉,可以由 client 開發者處理。

  • Pub-sub:
    如果 client dies,server 不會知道。因為 Pub-sub 的 client 不會回傳資訊給 server。但 client 可透過另一個管道,發送訊息通知 server。server crash 的部分,目前無法處理。subscribers 可在運作太慢時,採取行動通知管理員。

  • Pipeline:
    如果 worker dies,ventilator 不會知道。pipeline 只會單向運作,但 downstream collector 可偵測某個 task 沒有完成的狀況,可回送訊息給 ventilator,通知要重送某個 task。如果 ventilator/collector 掛了,client等待會逾時,然後可重送所有 tasks


本章只專注討論 request-reply 的部分


基本的 request-reply pattern (REQ client socket 會 blocking send/receive to REP server socket。如果 server 在處理 request 時 crash、遺失 request 或reply,client 會一直等待。


request-reply 因為ZMQ 有 reconnect peer and load balance message 的功能,表現比傳統 tcp 好,如果在沒有網路或不同 process 之間運作時,會不太穩定,在 threads 之間運作會很穩定(因為沒有網路)。


可設計 reliable request-reply (RRR) pattern 稱為 Pirate pattern


有 3 種方式連接 client 及 server,每一種處理 reliability 的方式不同


  1. 多個 clients 跟單一 server 溝通。Use Case: clients 連到一個已知的 server。要處理的異常為:server crashes, restarts,網路斷線
  2. 多個 clients 跟 broker proxy 溝通,發布工作到多個 workers。Use Case: service oriented transaction processing。要處理的異常為:worker crashes/restarts、worker busy looping/overload、queue crashes/restarts、network 斷線
  3. 多個 clients 跟多個 servers 直接溝通,沒有透過 proxies。Use Case: distributed service 例如 dns。要處理的異常為:service crashes/restart、service busy looping/overload、網路斷線

Lazy Pirate pattern, Client-Side Reliability


  • poll REQ socket,只在有 reply 到達時,才取得 reply
  • 如果在 timeout 後,沒有收到 reply,就重發 request
  • 在幾次 request 後還是沒有收到 reply,就放棄這個 transaction

如果像讓 REQ socket 不遵循 send/receive 順序時,會收到 "EFSM" error,一位 REQ socket 是以 finite-state machine 實作,限制一定會遵循 send/receive 順序。


但在 pirate pattern 會因為沒收到 reply 重發 request,就會出現這個 error。解決方式是在收到 error 時 close 並 reopen REQ socket。



lpclient.py


#
#  Lazy Pirate client
#  Use zmq_poll to do a safe request-reply
#  To run, start lpserver and then randomly kill/restart it
from __future__ import print_function

import zmq

REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"

context = zmq.Context(1)

print("I: Connecting to server...")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)

poll = zmq.Poller()
poll.register(client, zmq.POLLIN)

sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
    sequence += 1
    request = str(sequence).encode()
    print("I: Sending (%s)" % request)
    client.send(request)

    expect_reply = True
    while expect_reply:
        socks = dict(poll.poll(REQUEST_TIMEOUT))
        if socks.get(client) == zmq.POLLIN:
            reply = client.recv()
            if not reply:
                break
            if int(reply) == sequence:
                print("I: Server replied OK (%s)" % reply)
                retries_left = REQUEST_RETRIES
                expect_reply = False
            else:
                print("E: Malformed reply from server: %s" % reply)

        else:
            print("W: No response from server, retrying...")
            # close 舊的 socket,重試次數 -1
            # Socket is confused. Close and remove it.
            client.setsockopt(zmq.LINGER, 0)
            client.close()
            poll.unregister(client)
            retries_left -= 1
            if retries_left == 0:
                # retry 超過3次,就認為 server 斷線無法復原
                print("E: Server seems to be offline, abandoning")
                break

            # 還可以retry,建立新的 client REQ socket,重發 request
            print("I: Reconnecting and resending (%s)" % request)
            # Create new connection
            client = context.socket(zmq.REQ)
            client.connect(SERVER_ENDPOINT)
            poll.register(client, zmq.POLLIN)
            client.send(request)

context.term()

lpserver.py


#
#  Lazy Pirate server
#  Binds REQ socket to tcp://*:5555
#  Like hwserver except:
#   - echoes request as-is
#   - randomly runs slowly, or exits to simulate a crash.
from __future__ import print_function

from random import randint
import time
import zmq

context = zmq.Context(1)
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")

cycles = 0
while True:
    request = server.recv()
    cycles += 1

    # Simulate various problems, after a few cycles
    # 在處理幾次後,就故意產生 error
    if cycles > 3 and randint(0, 3) == 0:
        print("I: Simulating a crash")
        break
    elif cycles > 3 and randint(0, 3) == 0:
        print("I: Simulating CPU overload")
        time.sleep(2)

    print("I: Normal request (%s)" % request)
    time.sleep(1) # Do some heavy work
    server.send(request)

server.close()
context.term()

執行結果


$ python lpserver.py
I: Normal request (b'1')
I: Normal request (b'2')
I: Normal request (b'3')
I: Normal request (b'4')
I: Simulating a crash

$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')
I: Server replied OK (b'3')
I: Sending (b'4')
I: Server replied OK (b'4')
I: Sending (b'5')
W: No response from server, retrying...
I: Reconnecting and resending (b'5')
W: No response from server, retrying...
I: Reconnecting and resending (b'5')
W: No response from server, retrying...
E: Server seems to be offline, abandoning

client 依序傳送每個訊息,檢查 replies 是否依照順序。也就是沒有 request/replies 遺失,沒有超過一個以上的 replies。不需要加上序號,也可以保證訊息會依照順序傳遞。


client 使用 REQ socket,當沒有遵循 send/receive cycle 時,就會強制 close/reopne socket。不建議換成 DEALER,因為需要處理成類似 REQ socket 的 envelopes,另外還有可能會取得異常的 replies。


雖然有多個 clients,單一 server,但只需要在 client 處理 failures。


優點


  1. 容易實作及了解
  2. 容易在既有 client/server app code 實作
  3. ZeroMQ 會自動 reconnect

缺點


  1. 無法 failover 到 backup/alternate servers

Simple Pirate pattern, Basic Reliable Queueing


以 queue proxy 擴充 Lazy Pirate pattern,可連線到多個 servers (workers)。


在 Pirate pattern 裡面,workers 都是 stateless。如果 application 需要 shared state (ex: shared database),可自行處理,不需要放在 messaging framework 裡面。workers 可自行連線或斷線,不需要通知 clients。



queue proxy 適用 chap 3 的 load balancing broker 實作的,但需要加上一些處理 dead/blocked workers 的機制。


因 clients 已經有 retry 機制,同樣方式也可以在 load balancing pattern 運作。


spqueue.py: simple pirate queue


#
#  Simple Pirate queue
#  This is identical to the LRU pattern, with no reliability mechanisms
#  at all. It depends on the client for recovery. Runs forever.

import zmq

LRU_READY = "\x01"

context = zmq.Context(1)

frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556")  # For workers

poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)

poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)

workers = []

while True:
    if workers:
        socks = dict(poll_both.poll())
    else:
        socks = dict(poll_workers.poll())

    # Handle worker activity on backend
    if socks.get(backend) == zmq.POLLIN:
        # Use worker address for LRU routing
        msg = backend.recv_multipart()
        if not msg:
            break
        address = msg[0]
        workers.append(address)

        # Everything after the second (delimiter) frame is reply
        reply = msg[2:]

        # Forward message to client if it's not a READY
        if reply[0] != LRU_READY:
            frontend.send_multipart(reply)

    if socks.get(frontend) == zmq.POLLIN:
        #  Get client request, route to first available worker
        # msg = frontend.recv_multipart()
        # request = [workers.pop(0), b''] + msg
        # backend.send_multipart( request )

        client, empty, request = frontend.recv_multipart()
        worker = workers.pop(0)
        backend.send_multipart([worker, b"", client, b"", request])

spworker.py: simple pirate worker


#
#  Simple Pirate worker
#  Connects REQ socket to tcp://*:5556
#  Implements worker part of LRU queueing

from random import randint
import time
import zmq

LRU_READY = "\x01"

context = zmq.Context(1)
worker = context.socket(zmq.REQ)

identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
worker.connect("tcp://localhost:5556")

print( "I: (%s) worker ready" % identity)
worker.send_string(LRU_READY)

cycles = 0
while True:
    msg = worker.recv_multipart()
    if not msg:
        break

    cycles += 1
    if cycles > 3 and randint(0, 5) == 0:
        print( "I: (%s) simulating a crash" % identity)
        break
    elif cycles > 3 and randint(0, 5) == 0:
        print( "I: (%s) simulating CPU overload" % identity)
        time.sleep(3)
    print( "I: (%s) normal reply" % identity)
    time.sleep(1) # Do some heavy work
    worker.send_multipart(msg)

測試時啟動多個 workers,一個 Lazy Pirate client 及 queue,誰先啟動都沒關係。


$ python spworker.py
I: (AF11-C837) worker ready
I: (AF11-C837) normal reply
I: (AF11-C837) normal reply

$ python spqueue.py

$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')

Paranoid Pirate pattern, Robust Reliable Queueing



Simple Pirate Pattern 只是把兩個既有的 pattern 合併再一起,但還是有兩個問題:


  1. 當遇到 queue crash/restart 時,client 會 recover 但 worker 不行。ZeroMQ 會自動重連 workers' sockets,但因為是新的 queue,workers 沒有初始化。因此要增加 queue 到 worker 的 heartbeating,用來偵測 queue 斷線。

  2. queue 無法偵測 worker failure,如果 worker 在 idle 時 crash,queue 在發送一個 request 前,無法由 worker queue 中移除,client 會進入 wait/retry 程序。雖不是很嚴重的問題,但不是很好。在 worker 到 queue 增加 heartbeating,可偵測 lost worker。


原本是在 worker 使用 REQ socket,但在 Paranoid Pirate worker,要換成 DEALER socket。


ppqueue.py: 擴充 load balancing pattern with heartbeating of workers


#
##  Paranoid Pirate queue

from collections import OrderedDict
import time

import zmq

HEARTBEAT_LIVENESS = 3     # 3..5 is reasonable
HEARTBEAT_INTERVAL = 1.0   # Seconds

#  Paranoid Pirate Protocol constants
PPP_READY = b"\x01"      # Signals worker is ready
PPP_HEARTBEAT = b"\x02"  # Signals worker heartbeat


class Worker(object):
    def __init__(self, address):
        self.address = address
        self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

class WorkerQueue(object):
    def __init__(self):
        self.queue = OrderedDict()

    def ready(self, worker):
        self.queue.pop(worker.address, None)
        self.queue[worker.address] = worker

    def purge(self):
        """Look for & kill expired workers."""
        t = time.time()
        expired = []
        for address,worker in self.queue.items():
            if t > worker.expiry:  # Worker expired
                expired.append(address)
        for address in expired:
            print( "W: Idle worker expired: %s" % address )
            self.queue.pop(address, None)

    def next(self):
        address, worker = self.queue.popitem(False)
        return address

context = zmq.Context(1)

frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER)  # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556")  # For workers

poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)

poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)

workers = WorkerQueue()

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

while True:
    if len(workers.queue) > 0:
        poller = poll_both
    else:
        poller = poll_workers
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # Handle worker activity on backend
    if socks.get(backend) == zmq.POLLIN:
        # Use worker address for LRU routing
        frames = backend.recv_multipart()
        if not frames:
            break

        address = frames[0]
        workers.ready(Worker(address))

        # Validate control message, or return reply to client
        msg = frames[1:]
        if len(msg) == 1:
            if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
                print( "E: Invalid message from worker: %s" % msg )
        else:
            frontend.send_multipart(msg)

        # Send heartbeats to idle workers if it's time
        if time.time() >= heartbeat_at:
            for worker in workers.queue:
                msg = [worker, PPP_HEARTBEAT]
                backend.send_multipart(msg)
            heartbeat_at = time.time() + HEARTBEAT_INTERVAL
    if socks.get(frontend) == zmq.POLLIN:
        frames = frontend.recv_multipart()
        if not frames:
            break

        frames.insert(0, workers.next())
        backend.send_multipart(frames)


    workers.purge()

ppworker.py


#
##  Paranoid Pirate worker

from random import randint
import time

import zmq

HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32

#  Paranoid Pirate Protocol constants
PPP_READY = b"\x01"      # Signals worker is ready
PPP_HEARTBEAT = b"\x02"  # Signals worker heartbeat

def worker_socket(context, poller):
    """Helper function that returns a new configured socket
       connected to the Paranoid Pirate queue"""
    worker = context.socket(zmq.DEALER) # DEALER
    identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
    worker.setsockopt_string(zmq.IDENTITY, identity)
    poller.register(worker, zmq.POLLIN)
    worker.connect("tcp://localhost:5556")
    worker.send(PPP_READY)
    return worker


context = zmq.Context(1)
poller = zmq.Poller()

liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

worker = worker_socket(context, poller)
cycles = 0
while True:
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # Handle worker activity on backend
    if socks.get(worker) == zmq.POLLIN:
        #  Get message
        #  - 3-part envelope + content -> request
        #  - 1-part HEARTBEAT -> heartbeat
        frames = worker.recv_multipart()
        if not frames:
            break # Interrupted

        if len(frames) == 3:
            # Simulate various problems, after a few cycles
            cycles += 1
            if cycles > 3 and randint(0, 5) == 0:
                print( "I: Simulating a crash" )
                break
            if cycles > 3 and randint(0, 5) == 0:
                print( "I: Simulating CPU overload" )
                time.sleep(3)
            print( "I: Normal reply" )
            worker.send_multipart(frames)
            liveness = HEARTBEAT_LIVENESS
            time.sleep(1)  # Do some heavy work
        elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
            print( "I: Queue heartbeat" )
            liveness = HEARTBEAT_LIVENESS
        else:
            print( "E: Invalid message: %s" % frames )
        interval = INTERVAL_INIT
    else:
        liveness -= 1
        if liveness == 0:
            print( "W: Heartbeat failure, can't reach queue" )
            print( "W: Reconnecting in %0.2fs..." % interval )
            time.sleep(interval)

            if interval < INTERVAL_MAX:
                interval *= 2
            poller.unregister(worker)
            worker.setsockopt(zmq.LINGER, 0)
            worker.close()
            worker = worker_socket(context, poller)
            liveness = HEARTBEAT_LIVENESS
    if time.time() > heartbeat_at:
        heartbeat_at = time.time() + HEARTBEAT_INTERVAL
        print( "I: Worker heartbeat" )
        worker.send(PPP_HEARTBEAT)

comments:


  1. 程式裡面有 failure 的模擬,實際使用時要去掉
  2. workers 有 reconnect 機制,類似 Lazy Pirate client 的做法,不同點在於 (a) exponential back-off (b) retries indefinitely

可任意 stop/restart queue/client/workers,client 永遠不會收到 out-of-order reply。


可用 script 啟動


python ppqueue.py &
for i in 1 2 3 4; do
    python ppworker.py &
    sleep 1
done
python lpclient.py &

或直接開多個 terminal 啟動


$ python ppqueue.py

$ python ppworker.py
I: Queue heartbeat
I: Normal reply
I: Worker heartbeat
I: Queue heartbeat

$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')

Heartbeating


heartbeating 解決 peer 兩端偵測連線狀態的問題,這不是 ZeroMQ 單有的問題,TCP 有 long timeout (30 mins),因此會發生 peer 無法正常取得連線狀態的問題。


以下是三個 heartbeating 的常見問題


Shrugging It Off

常常有很多 application 不做 heartbeating,但如果不做,可能會發生以下問題。


  • 使用 ROUTER socket 追蹤 peers,當 peers 斷線或 reconnect,application 會發生 memory leak,變得越來越慢。
  • 使用 SUB- 或 DEALER-based data 接收器,無法分辨 good silence (沒有資料) 或是 bad silence (另一端斷線)。如果接收端知道另一端已經斷線,可切換到 backup route。
  • 如果使用 TCP connection,並保持靜默,在某些網路會自動斷線。因此要做 keep-alive,保持使用狀態,讓network connection 持續存活。

One-Way Heartbeats

second option: 在每個 node 每秒發送一個 heartbeat message 到 peers。當一個 node 超過 timeout 時間沒有收到資料,就將該 peer 視為斷線。但這種做法有些狀況會異常。


在 Pub-Sub socket,可用這種做法,也是唯一可用的方法。因為 SUB socket 無法發送資料給 PUB,但 PUB 可發送 "I'm alive" message 給 subscribers。


最好的方式,是沒有資料可發送時,就送 heartbeats。也可以慢速發送 heartbeat,只要接收端可以偵測 failure 就可以了。


設計時可能會遇到的問題:


  1. 發送大量資料時,可能會發生錯誤,因為 heartbeat 會在 data 後面發送,但因為 heartbeat delayed,會造成 timeout,並判斷為斷線。解決方式是將所有收到的 data,都視為 heartbeat。

  2. 在 pub-sub pattern,會因為接收端斷線,而丟棄 messages。PUSH/DEALER 會放到 queue。如果發送 heartbeat 給斷線的 peer,在該 peer 恢復連線後,會收到一堆在 queue 裡面的 heartbeat。

  3. 這個方案假設 timeout 時間長度是固定的,但實際上不一定是這樣。


Ping-Pong Heartbeat

3rd option: 使用 ping-pong dialog。一端送 ping,另一端 reply pong。ping/pong 是獨立的、沒有相關。通常是 client 發 ping,server 回應 pong。


這種方式在 all ROUTER-based brokers 可以運作。也可用上一個方法加強:將每一個 incoming data 都視為 pong,只在嗎有 data 時,發送 ping。


Heartbeating for Paranoid Pirate

Paranoid Pirate 是採用第二個方法。但第三種方法會比較簡單。heartbeat message flow 是雙向非同步的,任一端都可偵測到斷線。


在 worker 如何處理 queue 裡面接收的 heartbeats:


  • 計算 "liveness",也就是在判斷是斷線前,還可以遺失幾個 heartbeats。通常設定為 3,每遺失一個 heartbeat 就減 1。
  • 在 `zmq_poll loop 裡面 wait 時,heartbeat interval 每次為 1s
  • 如果在 queue 裡面收到了訊息,就重設 "liveness" 為 3
  • 如果沒有訊息,就倒數 liveness
  • 當 liveness 為 0,就將該 peer/queue 視為斷線
  • 如果 queue 斷線,就要 destry socket,產生新的 socket,並 reconnect
  • 為了避免 opening/closing 太多 sockets,在 reconnect 前要等待一段時間,每一次要 2 倍,最多是 32 秒。

以下是處理發送給 queue 的 heartbeats


  • 因為只需要跟一個另一端的 queue 溝通,所以用一個變數,設定下一次要發送 heartbeat 的時間點。
  • zmq_poll loop 裡面,只要超過時間,就要發送 heartbeat 到 queue。

這是 worker heartbeat code (C)


#define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable
#define HEARTBEAT_INTERVAL  1000    //  msecs
#define INTERVAL_INIT       1000    //  Initial reconnect
#define INTERVAL_MAX       32000    //  After exponential backoff

…
//  If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;

//  Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
    zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };
    int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);

    if (items [0].revents & ZMQ_POLLIN) {
        //  Receive any message from queue
        liveness = HEARTBEAT_LIVENESS;
        interval = INTERVAL_INIT;
    }
    else
    if (--liveness == 0) {
        zclock_sleep (interval);
        if (interval < INTERVAL_MAX)
            interval *= 2;
        zsocket_destroy (ctx, worker);
        …
        liveness = HEARTBEAT_LIVENESS;
    }
    //  Send heartbeat to queue if it's time
    if (zclock_time () > heartbeat_at) {
        heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        //  Send heartbeat message to queue
    }
}

queue 的部分還需要對每一個 worker 記錄 expiration time


  • 使用 zmq_poll 或是 reactor 作為 application 的 main task
  • 在兩端啟動 heartbeating,利用failure 模擬測試,然後再實作 message flow
  • 使用 simple tracing (ex: console print),可協助 peers 之間的 message trace,使用 zmsg 提供的 dump method,在訊息裡面加上遞增的 numbers,檢查是否會發生 gap
  • 實際 application 的 heartbeating 必須要能設定,通常是由 peers 進行協調。有些 peer 需要 aggressive heartbeating,可設定為 10ms,有些則只需要設定為 30s
  • 如果針對不同 peers 要有不同的 heartbeat,poll timeout 必須設定為最短的那個時間,不要用 infinite timeout
  • 在發送 message 的 socket 實作 heartbeat,同時可讓 heartbeat 作為 network connection 的 keep-alive 機制。

Contracts and Protocols


因為 heartbeat 機制不同,Paranoid Pirate 無法 "interoperable" with Simple Pirate。interoperability 就像是 contract,也可以說是 protocol。


http://rfc.zeromq.org/ 有提供 ZeroMQ 可實作的 protocol contacts,例如 Pirate Pattern Protocol


要實作 PPP 時,要注意以下工作


  1. 在 READY command 有 protocol version number
  2. READY 及 HEARTBEAT 無法跟 requests/replies 區分開來。為了要分辨,可在 message structure 裡面加上 "message type" part

Majordomo pattern: Service Oriented Reliable Queueing



Majordomo Protocol (MDP) 擴充 PPP,增加 "service name",讓 client 指定某個服務的 request,workers 要註冊提供哪一種服務。


Paranoid Pirate queue 增加 service name 後,變成 service-oriented broker。


有兩個 contacts: (1) MDP,說明 distributed architecture (2) 定義 user application 如何跟 framework 溝通


majordomo 有兩個部分: client side, worker side。


MDP.py: Majordomo Protocol definitions


"""Majordomo Protocol definitions"""
#  This is the version of MDP/Client we implement
C_CLIENT = b"MDPC01"

#  This is the version of MDP/Worker we implement
W_WORKER = b"MDPW01"

#  MDP/Server commands, as strings
W_READY         =   b"\001"
W_REQUEST       =   b"\002"
W_REPLY         =   b"\003"
W_HEARTBEAT     =   b"\004"
W_DISCONNECT    =   b"\005"

commands = [None, "READY", "REQUEST", "REPLY", "HEARTBEAT", "DISCONNECT"]

mdcliapi.py: mojordemo client api


"""Majordomo Protocol Client API, Python version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""

import logging

import zmq

import MDP
from zhelpers import dump

class MajorDomoClient(object):
    """Majordomo Protocol Client API, Python version.

      Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """
    broker = None
    ctx = None
    client = None
    poller = None
    timeout = 2500
    retries = 3
    verbose = False

    def __init__(self, broker, verbose=False):
        self.broker = broker
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.client:
            # client 已經存在時,表示要先 close,然後再 reconnect
            self.poller.unregister(self.client)
            self.client.close()

        self.client = self.ctx.socket(zmq.REQ)
        self.client.linger = 0
        self.client.connect(self.broker)
        self.poller.register(self.client, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

    def send(self, service, request):
        """Send request to broker and get reply by hook or crook.

        Takes ownership of request message and destroys it when sent.
        Returns the reply message or None if there was no reply.
        """
        if not isinstance(request, list):
            request = [request]
        request = [MDP.C_CLIENT, service] + request
        if self.verbose:
            logging.warn("I: send request to '%s' service: ", service)
            dump(request)
        reply = None

        retries = self.retries
        while retries > 0:
            self.client.send_multipart(request)
            try:
                items = self.poller.poll(self.timeout)
            except KeyboardInterrupt:
                break # interrupted

            if items:
                msg = self.client.recv_multipart()
                if self.verbose:
                    logging.info("I: received reply:")
                    dump(msg)

                # Don't try to handle errors, just assert noisily
                # 確認收到的訊息的 frame 長度
                assert len(msg) >= 3

                header = msg.pop(0)
                assert MDP.C_CLIENT == header

                reply_service = msg.pop(0)
                assert service == reply_service

                reply = msg
                break
            else:
                # 超過 timeout 時間,沒有收到 reply,必須進入 retry
                if retries:
                    logging.warn("W: no reply, reconnecting...")
                    self.reconnect_to_broker()
                else:
                    logging.warn("W: permanent error, abandoning")
                    break
                retries -= 1

        return reply

    def destroy(self):
        self.context.destroy()

mdclient.py: client application


"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects

"""

import sys
from mdcliapi import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    count = 0
    while count < 100000:
        request = b"Hello world"
        try:
            # 將 request 發送給 echo service
            reply = client.send(b"echo", request)
        except KeyboardInterrupt:
            break
        else:
            # also break on failure to reply:
            if reply is None:
                break
        count += 1
    print( "%i requests/replies processed" % count )

if __name__ == '__main__':
    main()

mdwrkapi.py: worker api


"""Majordomo Protocol Worker API, Python version

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

"""

import logging
import time
import zmq

from zhelpers import dump
# MajorDomo protocol constants:
import MDP

class MajorDomoWorker(object):
    """Majordomo Protocol Worker API, Python version

    Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """

    HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
    broker = None
    ctx = None
    service = None

    worker = None # Socket to broker
    heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
    liveness = 0 # How many attempts left
    heartbeat = 2500 # Heartbeat delay, msecs
    reconnect = 2500 # Reconnect delay, msecs

    # Internal state
    expect_reply = False # False only at start

    timeout = 2500 # poller timeout
    verbose = False # Print activity to stdout

    # Return address, if any
    reply_to = None

    def __init__(self, broker, service, verbose=False):
        self.broker = broker
        self.service = service
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.worker:
            # worker 已經存在時,表示要先 close,然後再 reconnect
            self.poller.unregister(self.worker)
            self.worker.close()
        self.worker = self.ctx.socket(zmq.DEALER)
        self.worker.linger = 0
        self.worker.connect(self.broker)
        self.poller.register(self.worker, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

        # Register service with broker
        self.send_to_broker(MDP.W_READY, self.service, [])

        # If liveness hits zero, queue is considered disconnected
        self.liveness = self.HEARTBEAT_LIVENESS
        # 1e-3 = 1 * 10^(-3) = 1/1000
        self.heartbeat_at = time.time() + 1e-3 * self.heartbeat


    def send_to_broker(self, command, option=None, msg=None):
        """Send message to broker.

        If no msg is provided, creates one internally
        """
        if msg is None:
            msg = []
        elif not isinstance(msg, list):
            msg = [msg]

        if option:
            msg = [option] + msg

        msg = [b'', MDP.W_WORKER, command] + msg
        if self.verbose:
            logging.info("I: sending %s to broker", command)
            dump(msg)
        self.worker.send_multipart(msg)


    def recv(self, reply=None):
        """Send reply, if any, to broker and wait for next request."""
        # Format and send the reply if we were provided one
        # expect_reply 只在第一次時為 False,因為第一次的 reply 為 None
        assert reply is not None or not self.expect_reply

        if reply is not None:
            assert self.reply_to is not None
            reply = [self.reply_to, b''] + reply
            # ex:
            # I: sending b'\x03' to broker
            # ----------------------------------------
            # [000]
            # [006] MDPW01
            # [001]
            # [005] 0x00800041aa
            # [000]
            # [011] Hello world
            self.send_to_broker(MDP.W_REPLY, msg=reply)

        self.expect_reply = True

        while True:
            # Poll socket for a reply, with timeout
            try:
                items = self.poller.poll(self.timeout)
            except KeyboardInterrupt:
                break # Interrupted

            if items:
                msg = self.worker.recv_multipart()
                if self.verbose:
                    logging.info("I: received message from broker: ")
                    # I: received message from broker:
                    # ----------------------------------------
                    # [000]
                    # [006] MDPW01
                    # [001]
                    # [005] 0x00800041aa
                    # [000]
                    # [011] Hello world
                    dump(msg)

                self.liveness = self.HEARTBEAT_LIVENESS
                # Don't try to handle errors, just assert noisily
                assert len(msg) >= 3

                empty = msg.pop(0)
                assert empty == b''

                header = msg.pop(0)
                assert header == MDP.W_WORKER

                command = msg.pop(0)
                if command == MDP.W_REQUEST:
                    # We should pop and save as many addresses as there are
                    # up to a null part, but for now, just save one...
                    self.reply_to = msg.pop(0)
                    # pop empty
                    empty = msg.pop(0)
                    assert empty == b''

                    return msg # We have a request to process
                elif command == MDP.W_HEARTBEAT:
                    # Do nothing for heartbeats
                    pass
                elif command == MDP.W_DISCONNECT:
                    self.reconnect_to_broker()
                else :
                    logging.error("E: invalid input message: ")
                    dump(msg)

            else:
                self.liveness -= 1
                if self.liveness == 0:
                    if self.verbose:
                        logging.warn("W: disconnected from broker - retrying...")
                    try:
                        time.sleep(1e-3*self.reconnect)
                    except KeyboardInterrupt:
                        break
                    self.reconnect_to_broker()

            # Send HEARTBEAT if it's time
            # 定時發送 HEARTBEAT
            if time.time() > self.heartbeat_at:
                self.send_to_broker(MDP.W_HEARTBEAT)
                self.heartbeat_at = time.time() + 1e-3*self.heartbeat

        logging.warn("W: interrupt received, killing worker...")
        return None


    def destroy(self):
        # context.destroy depends on pyzmq >= 2.1.10
        self.ctx.destroy(0)

mkworker.py: worker application


"""Majordomo Protocol worker example.

Uses the mdwrk API to hide all MDP aspects

"""

import sys
from mdwrkapi import MajorDomoWorker

def main():
    verbose = '-v' in sys.argv
    worker = MajorDomoWorker("tcp://localhost:5555", b"echo", verbose)
    reply = None
    while True:
        request = worker.recv(reply)
        if request is None:
            break # Worker was interrupted
        reply = request # Echo is complex... :-)


if __name__ == '__main__':
    main()

notes about worker api


  • API 為 single-threaded。這表示 worker 不會在背景發送 heartbeat。這沒關係,因為 worker 卡住時,就會停止 heartbeat,broker 就不會發送 request 給 worker
  • worker 不會做 exponential back-off。因不需要這樣做,太過複雜。
  • API 不會做 error reporting。由 application 自行檢查。

worker API 會在 peer 斷線重連後,自行關閉 socket 並重開一個新的 socket,這跟 Simple Pirate 及 Paranoid Pirate workers 一樣。雖然 ZeroMQ 會在 broker 斷線重啟後,worker 會自動重連,但不會跟 broker 重新註冊 worker。有兩種解決方法:比較簡單的做法,事關鰾 socket 並重開一個新的 socket。另一種做法是 broker 會在取得 worker heartbeat 時,詢問 unknown workers,並要求重新註冊。




mdbroker.py: majordomo broker


"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8

"""

import logging
import sys
import time
from binascii import hexlify

import zmq

# local
import MDP
from zhelpers import dump

class Service(object):
    """a single Service"""
    name = None # Service name
    requests = None # List of client requests
    waiting = None # List of waiting workers

    def __init__(self, name):
        self.name = name
        self.requests = []
        self.waiting = []

class Worker(object):
    """a Worker, idle or active"""
    identity = None # hex Identity of worker
    address = None # Address to route to
    service = None # Owning service, if known
    expiry = None # expires at this point, unless heartbeat

    def __init__(self, identity, address, lifetime):
        self.identity = identity
        self.address = address
        self.expiry = time.time() + 1e-3*lifetime

class MajorDomoBroker(object):
    """
    Majordomo Protocol broker
    A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
    """

    # We'd normally pull these from config data
    INTERNAL_SERVICE_PREFIX = b"mmi."
    HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
    HEARTBEAT_INTERVAL = 2500 # msecs
    HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

    # ---------------------------------------------------------------------

    ctx = None # Our context
    socket = None # Socket for clients & workers
    poller = None # our Poller

    heartbeat_at = None# When to send HEARTBEAT
    services = None # known services
    workers = None # known workers
    waiting = None # idle workers

    verbose = False # Print activity to stdout

    # ---------------------------------------------------------------------


    def __init__(self, verbose=False):
        """Initialize broker state."""
        self.verbose = verbose
        self.services = {}
        self.workers = {}
        self.waiting = []
        self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.ROUTER)
        self.socket.linger = 0
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)



    # ---------------------------------------------------------------------

    def mediate(self):
        """Main broker work happens here"""
        while True:
            try:
                items = self.poller.poll(self.HEARTBEAT_INTERVAL)
            except KeyboardInterrupt:
                break # Interrupted
            if items:
                msg = self.socket.recv_multipart()
                if self.verbose:
                    logging.info("I: received message:")
                    # I: received message:
                    # ----------------------------------------
                    # [005] 0x00800041aa
                    # [000]
                    # [006] MDPC01
                    # [004] echo
                    # [011] Hello world
                    dump(msg)

                sender = msg.pop(0)
                empty = msg.pop(0)
                assert empty == b''
                header = msg.pop(0)

                # 由 header 判斷是來自 client 或是 worker
                if (MDP.C_CLIENT == header):
                    self.process_client(sender, msg)
                elif (MDP.W_WORKER == header):
                    self.process_worker(sender, msg)
                else:
                    logging.error("E: invalid message:")
                    dump(msg)

            self.purge_workers()
            self.send_heartbeats()

    def destroy(self):
        """Disconnect all workers, destroy context."""
        while self.workers:
            self.delete_worker(self.workers.values()[0], True)
        self.ctx.destroy(0)


    def process_client(self, sender, msg):
        """Process a request coming from a client."""
        assert len(msg) >= 2 # Service name + body
        service = msg.pop(0)
        # Set reply return address to client sender
        msg = [sender, b''] + msg
        if service.startswith(self.INTERNAL_SERVICE_PREFIX):
            self.service_internal(service, msg)
        else:
            self.dispatch(self.require_service(service), msg)


    def process_worker(self, sender, msg):
        """Process message sent to us by a worker."""
        assert len(msg) >= 1 # At least, command

        command = msg.pop(0)

        worker_ready = hexlify(sender) in self.workers

        worker = self.require_worker(sender)

        if (MDP.W_READY == command):
            assert len(msg) >= 1 # At least, a service name
            service = msg.pop(0)
            # Not first command in session or Reserved service name
            if (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):
                self.delete_worker(worker, True)
            else:
                # Attach worker to service and mark as idle
                worker.service = self.require_service(service)
                self.worker_waiting(worker)

        elif (MDP.W_REPLY == command):
            if (worker_ready):
                # Remove & save client return envelope and insert the
                # protocol header and service name, then rewrap envelope.
                client = msg.pop(0)
                empty = msg.pop(0) # ?
                msg = [client, b'', MDP.C_CLIENT, worker.service.name] + msg
                self.socket.send_multipart(msg)
                self.worker_waiting(worker)
            else:
                self.delete_worker(worker, True)

        elif (MDP.W_HEARTBEAT == command):
            if (worker_ready):
                worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
            else:
                self.delete_worker(worker, True)

        elif (MDP.W_DISCONNECT == command):
            self.delete_worker(worker, False)
        else:
            logging.error("E: invalid message:")
            dump(msg)

    def delete_worker(self, worker, disconnect):
        """Deletes worker from all data structures, and deletes worker."""
        assert worker is not None
        if disconnect:
            self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)

        if worker.service is not None:
            worker.service.waiting.remove(worker)
        self.workers.pop(worker.identity)

    def require_worker(self, address):
        """Finds the worker (creates if necessary)."""
        assert (address is not None)
        identity = hexlify(address)
        worker = self.workers.get(identity)
        if (worker is None):
            worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)
            self.workers[identity] = worker
            if self.verbose:
                logging.info("I: registering new worker: %s", identity)

        return worker

    def require_service(self, name):
        """Locates the service (creates if necessary)."""
        assert (name is not None)
        service = self.services.get(name)
        if (service is None):
            service = Service(name)
            self.services[name] = service

        return service

    def bind(self, endpoint):
        """Bind broker to endpoint, can call this multiple times.

        We use a single socket for both clients and workers.
        """
        self.socket.bind(endpoint)
        logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)

    def service_internal(self, service, msg):
        """Handle internal service according to 8/MMI specification"""
        returncode = b"501"
        if b"mmi.service" == service:
            name = msg[-1]
            returncode = b"200" if name in self.services else b"404"
        msg[-1] = returncode

        # insert the protocol header and service name after the routing envelope ([client, ''])
        msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]
        self.socket.send_multipart(msg)

    def send_heartbeats(self):
        """Send heartbeats to idle workers if it's time"""
        if (time.time() > self.heartbeat_at):
            for worker in self.waiting:
                self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)

            self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL

    def purge_workers(self):
        """Look for & kill expired workers.

        Workers are oldest to most recent, so we stop at the first alive worker.
        """
        while self.waiting:
            w = self.waiting[0]
            if w.expiry < time.time():
                logging.info("I: deleting expired worker: %s", w.identity)
                self.delete_worker(w,False)
                self.waiting.pop(0)
            else:
                break

    def worker_waiting(self, worker):
        """This worker is now waiting for work."""
        # Queue to broker and service waiting lists
        self.waiting.append(worker)
        worker.service.waiting.append(worker)
        worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
        self.dispatch(worker.service, None)

    def dispatch(self, service, msg):
        """Dispatch requests to waiting workers as possible"""
        assert (service is not None)
        if msg is not None:# Queue message if any
            service.requests.append(msg)
        self.purge_workers()
        while service.waiting and service.requests:
            msg = service.requests.pop(0)
            worker = service.waiting.pop(0)
            self.waiting.remove(worker)
            self.send_to_worker(worker, MDP.W_REQUEST, None, msg)

    def send_to_worker(self, worker, command, option, msg=None):
        """Send message to worker.

        If message is provided, sends that message.
        """

        if msg is None:
            msg = []
        elif not isinstance(msg, list):
            msg = [msg]

        # Stack routing and protocol envelopes to start of message
        # and routing envelope
        if option is not None:
            msg = [option] + msg
        msg = [worker.address, b'', MDP.W_WORKER, command] + msg

        if self.verbose:
            logging.info("I: sending %r to worker", command)
            # I: sending b'\x02' to worker
            # ----------------------------------------
            # [005] 0x00800041a8
            # [000]
            # [006] MDPW01
            # [001]
            # [005] 0x00800041aa
            # [000]
            # [011] Hello world
            dump(msg)

        self.socket.send_multipart(msg)


def main():
    """create and start new broker"""
    verbose = '-v' in sys.argv
    broker = MajorDomoBroker(verbose)
    broker.bind("tcp://*:5555")
    broker.mediate()

if __name__ == '__main__':
    main()

broker 是一些 queue 組成的,每一個 service 都有一個 queue,會在 worker 連線時,產生 queue,另外會在每一種 service 維持一份 queue of workers。


notes about broker


  • Majordomo Protocol 讓我們以單一 socket 處理 clients 及 workers。比較容易管理,因為只需要一個 ZeroMQ endpoint,大多數的 proxies 都需要兩個。
  • broker 實作 MDP/0.1,包含了 broker 發送 invalid commands, heartbeating 後的斷線功能
  • 可擴充為 multiple threads,每個 thread 管理一個 socket, one set of clients and workers。
  • 可實作 primary/failover 或 live/live broker reliability model,因為 broker 為 stateless,由 client, worker 在 broker 異常時,自行選擇連到另一個 broker
  • 範例使用 5-second heartbeats,如果要在真正的 LAN application 使用,必須減少這個時間,retry 必須要至少 10s,有足夠的時間讓 service restart

Asynchronous Majordomo pattern


前一個版本的 Majordomo 比較簡單,client 是 Simple Pirate。因為 ZeroMQ 沒有啟用 "Nagle's algorithm",無法應付 "small-packet problem",太多過短的訊息,會造成 TCP connection 效能不彰。


現在先以程式測量 round-trip 造成的影響。首先是發送並等待 reply,另一種是 batch 發送訊息,批次接收 replies。


tripping.py


"""Round-trip demonstrator

While this example runs in a single process, that is just to make
it easier to start and stop the example. Client thread signals to
main when it's ready.
"""

import sys
import threading
import time

import zmq

from zhelpers import zpipe

def client_task (ctx, pipe):
    client = ctx.socket(zmq.DEALER)
    client.identity = b'C'
    client.connect("tcp://localhost:5555")

    print( "Setting up test...\n" ),
    time.sleep(0.1)

    print( "Synchronous round-trip test...\n" ),
    start = time.time()
    requests = 10000
    for r in range(requests):
        client.send( b"hello" )
        client.recv()
    print( " %d calls/second\n" % (requests / (time.time()-start)) ),

    print( "Asynchronous round-trip test...\n" ),
    start = time.time()
    for r in range(requests):
        client.send( b"hello" )
    for r in range(requests):
        client.recv()
    print( " %d calls/second\n" % (requests / (time.time()-start)) ),

    # signal done:
    pipe.send( b"done")

def worker_task():
    ctx = zmq.Context()
    worker = ctx.socket(zmq.DEALER)
    worker.identity = b'W'
    worker.connect("tcp://localhost:5556")

    while True:
        msg = worker.recv_multipart()
        worker.send_multipart(msg)
    ctx.destroy(0)

def broker_task():
    # Prepare our context and sockets
    ctx = zmq.Context()
    frontend = ctx.socket(zmq.ROUTER)
    backend = ctx.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5555")
    backend.bind("tcp://*:5556")

    # Initialize poll set
    poller = zmq.Poller()
    poller.register(backend, zmq.POLLIN)
    poller.register(frontend, zmq.POLLIN)

    while True:
        try:
            items = dict(poller.poll())
        except:
            break # Interrupted

        if frontend in items:
            msg = frontend.recv_multipart()
            msg[0] = b'W'
            backend.send_multipart(msg)
        if backend in items:
            msg = backend.recv_multipart()
            msg[0] = b'C'
            frontend.send_multipart(msg)

def main():
    # Create threads
    ctx = zmq.Context()
    client,pipe = zpipe(ctx)

    client_thread = threading.Thread(target=client_task, args=(ctx, pipe))
    worker_thread = threading.Thread(target=worker_task)
    worker_thread.daemon=True
    broker_thread = threading.Thread(target=broker_task)
    broker_thread.daemon=True

    worker_thread.start()
    broker_thread.start()
    client_thread.start()

    # Wait for signal on client pipe
    client.recv()

if __name__ == '__main__':
    main()

執行結果


$ python tripping.py
Setting up test...

Synchronous round-trip test...

 2293 calls/second

Asynchronous round-trip test...

 4073 calls/second

client thread 會在啟動前暫停一陣子,這是為了避免 ROUTER 的問題,發送給還沒連線的 peer address,訊息會被丟棄。非同步批次處理的方式,比同步方式快。


接下來將剛剛的 client 改為非同步的版本


mdcliapi2.py


"""Majordomo Protocol Client API, Python version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

"""

import logging

import zmq

import MDP
from zhelpers import dump

class MajorDomoClient(object):
    """Majordomo Protocol Client API, Python version.

      Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """
    broker = None
    ctx = None
    client = None
    poller = None
    timeout = 2500
    verbose = False

    def __init__(self, broker, verbose=False):
        self.broker = broker
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.client:
            self.poller.unregister(self.client)
            self.client.close()
        self.client = self.ctx.socket(zmq.DEALER)
        self.client.linger = 0
        self.client.connect(self.broker)
        self.poller.register(self.client, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

    def send(self, service, request):
        """Send request to broker
        """
        if not isinstance(request, list):
            request = [request]

        # Prefix request with protocol frames
        # Frame 0: empty (REQ emulation)
        # Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
        # Frame 2: Service name (printable string)

        request = [b'', MDP.C_CLIENT, service] + request
        if self.verbose:
            logging.warn("I: send request to '%s' service: ", service)
            dump(request)
        self.client.send_multipart(request)

    def recv(self):
        """Returns the reply message or None if there was no reply."""
        try:
            items = self.poller.poll(self.timeout)
        except KeyboardInterrupt:
            return # interrupted

        if items:
            # if we got a reply, process it
            msg = self.client.recv_multipart()
            if self.verbose:
                logging.info("I: received reply:")
                dump(msg)

            # Don't try to handle errors, just assert noisily
            assert len(msg) >= 4

            empty = msg.pop(0)
            header = msg.pop(0)
            assert MDP.C_CLIENT == header

            service = msg.pop(0)
            return msg
        else:
            logging.warn("W: permanent error, abandoning request")

mdclient2.py


"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
"""

import sys
from mdcliapi2 import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    requests = 100000
    for i in range(requests):
        request = b"Hello world"
        try:
            client.send(b"echo", request)
        except KeyboardInterrupt:
            print( "send interrupted, aborting" )
            return

    count = 0
    while count < requests:
        try:
            reply = client.recv()
        except KeyboardInterrupt:
            break
        else:
            # also break on failure to reply:
            if reply is None:
                break
        count += 1
    print( "%i requests/replies processed" % count )

if __name__ == '__main__':
    main()

  • 由 REQ 改為使用 DEALER socket,在 request 前面加上 empty delimiter frame
  • 不做 retry requests,由 application 自行處理 retry
  • 將 synchronous send 改為 send 跟 recv 分開
  • send 是 asynchronous 的,會在發送後,馬上 return
  • recv 會等待 1 response 並回傳給 caller
  • 測試程式會發送 100000 messages

asynchronous Majordomo pattern 有個問題,是無法處理 broker crash 的狀況。因 mdcliapi2.py 並沒有 reconnect 的機制


如要增加 reconnect 機制,要做以下工作


  1. 對每個 request 加上序號,並在 reply 檢查
  2. traking and holding 尚未收到 replies 的 requests
  3. 在 failover 時, client 要 resend 所有尚未收到 replies 的 requests

Service Discovery


目前已經有 service-oriented broker,但沒有檢測某種 service 是否存在的機制,只會知道 request failed。可修改 MDP/Client protocol,增加詢問 service 的 command。


另一種方式,是做類似 email 的機制,將 undeliverable requests 回傳給 sender,但需要有將 replies 跟 returned requests 區分的方法。


現在要用第一種方式,是在 MDP 上增加 Service discovery 的功能,也就是 Majordomo Management Interface (MMI),剛剛已經有實作在 broker 中。


  • client 發送 service request,以 mmi. 起始,這時以 internal 方式處理訊息
  • broker 只會處理一種 mmi.service,也就是 service discovery service
  • request 的 payload 為 external service 的名稱
  • broker 會回傳 "200" OK 或是 "404"

"""
MMI echo query example
"""

import sys
from mdcliapi import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    request = b"echo"
    reply = client.send(b"mmi.service", request)

    if reply:
        replycode = reply[0]
        print( "Lookup echo service:", replycode )
    else:
        print( "E: no response from broker, make sure it's running" )

if __name__ == '__main__':
    main()

Idempotent Services


Idempotent 代表是可以重複的 operation。例如 checking the clock,以下是 idempotente use cases


  • stateless task distribution: a pipeline where servers are stateless workers
  • name service,將 logical addres 轉為 endpoint

non-idempotent use cases


  • logging service: 因不能將相同的 log 處理 2次以上
  • 影響 downstream nodes 的 service,例如發送 information 給其他 nodes,如果 service 收到相同的 request,downstream nodes 會產生 duplicate information
  • 會修改 shared data 的 service

如果 server application 為 non-idempotent,就要小心 crash 的狀況,如果 crash 發生在 idle 或處理 request 過程中,不會造成問題,如果是在 db transaction,如果在發送 reply 時 crash,就會發生問題。


如果是網路異常,也會出現相同的問題,client 會認為 server crash,而重送 request,server 會處理 2 次。


為了處理 non-idempotent operations,要使用 detecing and rejecting duplicate requests 的方法:


  1. client 要在每個 request 加上 unique client identifier,unique message number
  2. server 在發送 reply 前,用 client ID 及 message number 為 key 儲存起來
  3. server 取得 client 的 request,要先檢查是否已經有該 client ID, message number 的 reply,如果已經處理過,就不做 request,直接發送 reply

Titanic pattern: Disconnected Reliability


Majordomo 是 reliable message broker


  • Lazy Pirate client 可運作得很好,direct client-to-server 或是 distributed queue proxies 都一樣。這邊假設 workers 是 stateless and idempotent
  • rust 會帶來一些問題,例如 slow performance

rust-based reliability 在 asynchronous disconnected network 有用,可解決 Pirate 的問題,因為 client 會 real time 等待 reply,但 clients, workers 只會短時間連線 (類似 email),必須在 broker 增加 state。


Titanic pattern: 將message 寫入 disk,保證不會遺失。要將 MDP 增加一個 Titanic layer


  • 因 divide and conquer 變得更簡單,broker 處理 message routing,worker 處理 reliability
  • broker/worker 可用不同 language 實作
  • fire-and-forget

如果 client 需要馬上得到 reply,
可直接跟 service 溝通,如果可以等待,就用 Titanic 處理



Titanic 同時有 worker 跟 client 的角色


  • client: 請接受我的 request
  • Titanic: OK
  • client: 有我的 reply 嗎?
  • Titanic: Yes, here it is. Or, no, not yet.
  • client: OK,你可以把 request 記錄清除了
  • Titanic: OK, done.

Titanic 跟 broker, worker 的溝通過程如下


  • Titanic: broker, 有 coffee service 嗎?
  • broker: 有
  • Titanic: hey, coffee service, 請幫我處理一個 request
  • coffee: Suer, here you are
  • Titanic: OK

如果在處理 request 過程中,worker crashes,Titanic 會一直 retry,如果 reply lost,Titanic 會 retry。如果已經處理了 request,但 client 沒收到,client 會再詢問一次。如果 Titanic 在處理 request/reply 時 crashes, client 會重送 request。只要 request 放入 storage,work 永遠不會遺失。


client 使用 asynchronous Majordomo pattern 進行工作,然後得到 reply。


需要有讓 client 查詢 reply 的方法,但 client 會在重連後,有不同的 identities,以下是解決方法


  • 每個 request 都有 UUID
  • 當 client 詢問 reply,必須指定原始 request 的 UUID

理想狀況下,要把 UUID 儲存在 local database 裡面


首先了解 client 如何跟 Titanic 溝通,一種方式是使用一個 service,並用三種 request types,另一種方式,是使用三種 services


  • titanic.request: 儲存 request message,回傳該 request 的 UUID
  • titanic.reply: 取得特定 UUID 的 reply
  • titanic.close: 確認 reply 已經收到

接下來實作 multithreaded worker,也就是 Titanic Service Protocol (TSP)


ticlient.py


"""
Titanic client example
Implements client side of http:rfc.zeromq.org/spec:9

"""

import sys
import time

from mdcliapi import MajorDomoClient

def service_call (session, service, request):
    """Calls a TSP service

    Returns reponse if successful (status code 200 OK), else None
    """
    reply = session.send(service, request)
    if reply:
        status = reply.pop(0)
        if status == b"200":
            return reply
        elif status == b"400":
            print( "E: client fatal error, aborting" )
            sys.exit (1)
        elif status == b"500":
            print( "E: server fatal error, aborting" )
            sys.exit (1)
    else:
        sys.exit (0);    #  Interrupted or failed

def main():
    verbose = '-v' in sys.argv
    session = MajorDomoClient("tcp://localhost:5555", verbose)

    #  1. Send 'echo' request to Titanic
    request = [b"echo", b"Hello world"]
    reply = service_call(session, b"titanic.request", request)

    uuid = None

    if reply:
        uuid = reply.pop(0)
        print( "I: request UUID ", uuid )

    #  2. Wait until we get a reply
    while True:
        time.sleep (.1)
        request = [uuid]
        reply = service_call (session, b"titanic.reply", request)

        if reply:
            reply_string = reply[-1]
            print( "Reply:", reply_string )

            #  3. Close request
            request = [uuid]
            reply = service_call (session, b"titanic.close", request)
            break
        else:
            print( "I: no reply yet, trying again..." )
            time.sleep(5)     #  Try again in 5 seconds
    return 0

if __name__ == '__main__':
    main()

titanic.py


"""
Titanic service

Implements server side of http:#rfc.zeromq.org/spec:9

"""

# import cPickle as pickle
import pickle
import os
import sys
import threading
import time
from uuid import uuid4
from pathlib import Path

import zmq

from mdwrkapi import MajorDomoWorker
from mdcliapi import MajorDomoClient

from zhelpers import zpipe

TITANIC_DIR = ".titanic"

def request_filename (uuid):
    """Returns freshly allocated request filename for given UUID"""
    return os.path.join(TITANIC_DIR, "%s.req" % uuid)

#

def reply_filename (uuid):
    """Returns freshly allocated reply filename for given UUID"""
    return os.path.join(TITANIC_DIR, "%s.rep" % uuid)

# ---------------------------------------------------------------------
# Titanic request service

def titanic_request (pipe):
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.request")

    reply = None

    while True:
        # Send reply if it's not null
        # And then get next request from broker
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        # Ensure message directory exists
        if not os.path.exists(TITANIC_DIR):
            os.mkdir(TITANIC_DIR)

        # Generate UUID and save message to disk
        uuid = uuid4().hex
        filename = request_filename (uuid)
        with open(filename, 'wb') as f:
            pickle.dump(request, f)

        # Send UUID through to message queue
        uuidframe = uuid.encode()
        print( "uuid is {}".format(uuidframe) )
        pipe.send( uuidframe )

        # Now send UUID back to client
        # Done by the worker.recv() at the top of the loop
        reply = [b"200", uuidframe ]


# ---------------------------------------------------------------------
# Titanic reply service

def titanic_reply ():
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.reply")
    reply = None

    while True:
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        uuid = request.pop(0).decode()
        req_filename = request_filename(uuid)
        rep_filename = reply_filename(uuid)

        print("req_filename={}, rep_filename={}".format(req_filename, rep_filename))
        if os.path.exists(rep_filename):
            with open(rep_filename, 'rb') as f:
                reply = pickle.load(f)
            reply = [b"200"] + reply
        else:
            if os.path.exists(req_filename):
                reply = [b"300"] # pending
            else:
                reply = [b"400"] # unknown


# ---------------------------------------------------------------------
# Titanic close service

def titanic_close():
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.close")
    reply = None

    while True:
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        uuid = request.pop(0).decode()
        req_filename = request_filename(uuid)
        rep_filename = reply_filename(uuid)
        # should these be protected?  Does zfile_delete ignore files
        # that have already been removed?  That's what we are doing here.
        if os.path.exists(req_filename):
            os.remove(req_filename)
        if os.path.exists(rep_filename):
            os.remove(rep_filename)
        reply = [b"200"]


def service_success(client, uuid):
    """Attempt to process a single request, return True if successful"""
    # Load request message, service will be first frame
    filename = request_filename (uuid)
    # print("service_success filename = {}".format(filename))

    # If the client already closed request, treat as successful
    if not os.path.exists(filename):
        return True

    with open(filename, 'rb') as f:
        request = pickle.load(f)
    service = request.pop(0)
    # Use MMI protocol to check if service is available
    mmi_request = [service]
    mmi_reply = client.send(b"mmi.service", mmi_request)
    service_ok = mmi_reply and mmi_reply[0] == b"200"

    if service_ok:
        reply = client.send(service, request)
        if reply:
            filename = reply_filename (uuid)
            with open(filename, "wb") as f:
                pickle.dump(reply, f)
            return True

    return False


def main():
    basedir = os.path.join(TITANIC_DIR)
    if not os.path.exists(basedir):
        os.makedirs(basedir)
    queuefile = os.path.join(TITANIC_DIR, 'queue')
    Path(queuefile).touch()

    verbose = '-v' in sys.argv
    ctx = zmq.Context()

    # Create MDP client session with short timeout
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    client.timeout = 1000 # 1 sec
    client.retries = 1 # only 1 retry

    request_pipe, peer = zpipe(ctx)
    request_thread = threading.Thread(target=titanic_request, args=(peer,))
    request_thread.daemon = True
    request_thread.start()
    reply_thread = threading.Thread(target=titanic_reply)
    reply_thread.daemon = True
    reply_thread.start()
    close_thread = threading.Thread(target=titanic_close)
    close_thread.daemon = True
    close_thread.start()

    poller = zmq.Poller()
    poller.register(request_pipe, zmq.POLLIN)
    # Main dispatcher loop
    while True:
        # Ensure message directory exists
        if not os.path.exists(TITANIC_DIR):
            os.mkdir(TITANIC_DIR)
        # We'll dispatch once per second, if there's no activity
        try:
            items = poller.poll(1000)
        except KeyboardInterrupt:
            break;              # Interrupted

        if items:

            # Append UUID to queue, prefixed with '-' for pending
            uuid = request_pipe.recv().decode()
            # print("append to queue: {}".format(uuid))

            with open(os.path.join(TITANIC_DIR, 'queue'), 'a') as f:
                f.write("-%s\n" % uuid)

        # Brute-force dispatcher
        #
        with open(os.path.join(TITANIC_DIR, 'queue'), 'r+b') as f:
            for entry in f.readlines():
                # UUID is prefixed with '-' if still waiting
                entry = entry.decode()
                # print("entry = {}".format(entry))
                if entry[0] == '-':
                    uuid = entry[1:].rstrip() # rstrip '\n' etc.
                    print( "I: processing request %s" % uuid )
                    if service_success(client, uuid):
                        # mark queue entry as processed
                        here = f.tell()
                        f.seek(-1*len(entry), os.SEEK_CUR)
                        f.write(b'+')
                        f.seek(here, os.SEEK_SET)


if __name__ == '__main__':
    main()

執行


$ python mdbroker.py -v

$ python mdworker.py -v

$ python titanic.py -v
2018-09-26 17:15:42 I: connecting to broker at tcp://localhost:5555...
uuid is b'a8398b5a2419455598a064936336ddf7'
I: processing request a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: send request to 'b'mmi.service'' service:
----------------------------------------
[006] MDPC01
[011] mmi.service
[004] echo
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[011] mmi.service
[003] 200
2018-09-26 17:16:02 I: send request to 'b'echo'' service:
----------------------------------------
[006] MDPC01
[004] echo
[011] Hello world
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[004] echo
[011] Hello world
req_filename=.titanic/a8398b5a2419455598a064936336ddf7.req, rep_filename=.titanic/a8398b5a2419455598a064936336ddf7.rep

$ python ticlient.py -v
2018-09-26 17:16:00 I: connecting to broker at tcp://localhost:5555...
2018-09-26 17:16:00 I: send request to 'b'titanic.request'' service:
----------------------------------------
[006] MDPC01
[015] titanic.request
[004] echo
[011] Hello world
2018-09-26 17:16:02 W: no reply, reconnecting...
2018-09-26 17:16:02 I: connecting to broker at tcp://localhost:5555...
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[015] titanic.request
[003] 200
[032] a8398b5a2419455598a064936336ddf7
I: request UUID  b'a8398b5a2419455598a064936336ddf7'
2018-09-26 17:16:02 I: send request to 'b'titanic.reply'' service:
----------------------------------------
[006] MDPC01
[013] titanic.reply
[032] a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[013] titanic.reply
[003] 200
[011] Hello world
Reply: b'Hello world'
2018-09-26 17:16:02 I: send request to 'b'titanic.close'' service:
----------------------------------------
[006] MDPC01
[013] titanic.close
[032] a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[013] titanic.close
[003] 200

  • 因 tinatic 同時有 client, worker 的身份,所以有 sending 及 receving message loops
  • titanic broker 以 MMI service discovery protocol 發送 requests
  • 使用 inproc 發送 request data 給 titanic.request

  • 使用一個檔案,處理所有 data

  • 不建議用 database,直接使用檔案,效能比 db 好

  • 如果要讓 titanic 更 reliable,可以複製 request 到 2nd server

  • 如果要讓 titanic much faster and less reliable,可將 request, replies 存在 memory


Binary Star pattern: High Availability Pair


Binary Star pattern 將兩個 servers 做成 primary-backup high-availablity pair,是 active-standby方式。



發生 failover 時



recovery from failover works:


  • operators 重新啟動 primary server,修正發生的問題
  • operators 突然停掉 backup server,短時間會造成 discruption to application
  • 當 application 重新連到 primary server,operators 會重新啟動 backup server

recovery 要手動處理,因為自動處理可能會有問題,以下是原因:


  1. failover 會讓 server 停止提供服務給 application,可能會有 10-30s,但會比完全無法使用還好。但 recovery 會再造成另一次 10-30s outage,最好是在離峰時間處理。
  2. 緊急狀況發生,最優先要把東西修好。auto recovery 可能會讓 system admin 不知道現在是哪一台機器在運作
  3. auto recovery 可能會是因為 netowrk fail over,會由不同地區的人員分析問並解決問題

Binary Star Pattern 會在 backup server fails 時,再回到 primary server,這也是觸動 recovery 的機制。


Binary Start pair 的關機程序,有以下兩種方式


  1. stop passive server -> stop active server later
  2. 任意順序,在幾秒內 stop both servers

如果先停止 active server 在關掉 passive server,會讓 application 發生 disconnect, reconnect 然後再 disconnect 的狀況。


Detailed Requirements for HA architecture

  • failover 表示發生了毀滅性 system failure 災難,例如硬體失效、火災...。一般性的 server crash 都有簡單的方法可以處理。
  • failover 時間要在 60s 內處理完成,最好是 10s 內
  • failover 要自動處理,但 recovery 是手動處理。application 要能自動切換到 backup server,但在 operators 修復前,都不要自動切換回原本的 primary server。要選擇適當的時間,再中斷一次 application。
  • failover 程序在 client application 要讓 developer 容易使用,應該要隱藏在 client API 裡面
  • 要有明確的步驟,讓 network architects 迴避可能會發生 split brain syndrome (兩個 server 都認爲自己是active) 的網路架構
  • 兩個 server 啟動沒有固定的先後順序
  • 在沒有停止 client application 時,要能提供 planned stops 並 restart 任何一台 server 的機制
  • operators 必須要隨時能夠監控 both servers
  • 使用 high-speed dedicated network connection連接兩台 servers,failover synchronization 要使用特定的 IP route

有以下假設


  • 單一 backup server 就足夠,不需要多層 backups
  • primary 與 backup server 有相同運算能力,可處理相同的 application load,不需要 load balance 到其他 servers
  • 有足夠預算,可以提供一台冗餘 backup server,平常沒在工作

不會討論到以下 issues:


  • 使用 active backup server 或是 load balancing。在 Binary Star Pair,backup server 平常都是 inactive,沒有在工作。
  • 要處理 persistent messages/transactions。假設有 unreliable server 或是 Binary Star pairs
  • Binary Star Pair 需要確切的網路架構,且要讓 application 知道 (configuration data)
  • 在 servers 之間會做 state/messages replication。server-side state 必須在 failover 時,進行重建。

在 Binary Start 主要的關鍵用語:


  • Primary: 一開始 active 的 server
  • Backup: passive server
  • Active: 接受 client 連線的 server,最多只有一台 active server
  • Passive: 正常狀況下,primary server 為 active,backup 為 passive,但 failover 發生時,角色會對調。

Binary Star Pair 必須要設定:


  1. 要告訴 primary server,backup server 在哪裡
  2. 要告訴 backup server,primary server 在哪裡
  3. optional: 可調整 failover response time,兩台 servers 的設定要相同

範例中,failover timeout 為 2000ms,如果將 primary server 包裝在 shell script,然後設定自動 restart,failover timeout 時間必需設定超過 primary server restart 所需要的時間。


為了讓 client app 在 Binary Start Pair 運作,必須要


  1. 知道 primary & backup servers 的 addresses
  2. 先連接 primary server,異常時,連接到 backup server
  3. 偵測 failed connection,通常是用 heartbeat
  4. 先嘗試重連到 primary,再嘗試 backup,retries 中間的delay時間,要超過 server failover timeout 的時間
  5. 能重建在某個 server 的所有 state
  6. 如果需要有 reliable message, 在 failover 時,要重送 messages

Binary Star Pattern 的限制:


  • server process 不能有超過 1 個 Binary Star Pair
  • primary server 只能有一台 backup server,不能超過
  • passive server 平常沒有在工作
  • backup server 必須要能完整處理所有 application loads
  • failover 設定不能在 runtime 時修改
  • client application 必須因應 failover 做一些修改

Preventing Split-Brain Syndrome

Split-Brain Syndrome 發生在 clusters 成員中,部分成員同時認為自己為 active server。她會造成 applications 無法互相知道對方的問題。Binary Star 有偵測並解除 Split-Brain 的方法,是用 three-way decision mechanism: server 在收到 application connection requests 以前,都無法判斷自己是 active,也無法看到 peer server。


然而還是有可能會有一種網路架構,會讓這個 algorithm 誤判。例如 Binary Start Pair 分在兩個建築物中,每一個建築物中,都有一組 applications,在兩個建築物中,有一條 network,將該 network 斷線,會形成兩組 applications,每一組各有一半的 Binary Star Pair,而兩個 server 都是 active。


要解決 split-brain 問題,必須將兩台 servers 以 dedicated network link 連接,最簡單的方法是連到同一台 switch,或是直接用 crossover 網路線連在一起。


不能將 Binary Star Pair 分在兩個不同的地點,如果是這樣,必須改用 federation 而不是 high-availability failover。


適當的 paranoid network 設定要使用兩個 private cluster interconnects,而不是一個。另外要用不同的網路卡。目的是區分 network failure 的問題。


Binary Star Implementation

primary 跟 backup server 是用相同的程式,在啟動時決定是哪一種角色


bstarsrv.py: binary star server


# Binary Star Server

from argparse import ArgumentParser
import time

from zhelpers import zmq

STATE_PRIMARY = 1
STATE_BACKUP = 2
STATE_ACTIVE = 3
STATE_PASSIVE = 4

PEER_PRIMARY = 1
PEER_BACKUP = 2
PEER_ACTIVE = 3
PEER_PASSIVE = 4
CLIENT_REQUEST = 5

HEARTBEAT = 1000


class BStarState(object):
    def __init__(self, state, event, peer_expiry):
        self.state = state
        self.event = event
        self.peer_expiry = peer_expiry


class BStarException(Exception):
    pass

fsm_states = {
    # 原本是 primary
    #  收到 PEER_BACKUP 會變成 active
    #  收到 PEER_ACTIVE,會變成 passive
    STATE_PRIMARY: {
        PEER_BACKUP: ("I: connected to backup (slave), ready as master",
                      STATE_ACTIVE),
        PEER_ACTIVE: ("I: connected to backup (master), ready as slave",
                      STATE_PASSIVE)
        },
    # 原本是 backup
    #  收到 PEER_ACTIVE,會變成 passive
    #  收到 CLIENT_REQUEST,異常,不異動狀態
    STATE_BACKUP: {
        PEER_ACTIVE: ("I: connected to primary (master), ready as slave",
                      STATE_PASSIVE),
        CLIENT_REQUEST: ("", False)
        },
    # 原本是 active
    #  收到 PEER_ACTIVE,異常
    STATE_ACTIVE: {
        PEER_ACTIVE: ("E: fatal error - dual masters, aborting", False)
        },
    # 原本是 passive
    #  收到 PEER_PRIMARY,會變成 active
    #  收到 PEER_BACKUP,會變成 active
    #  收到 PEER_PASSIVE,異常
    #  收到 CLIENT_REQUEST,維持不變
    STATE_PASSIVE: {
        PEER_PRIMARY: ("I: primary (slave) is restarting, ready as master",
                       STATE_ACTIVE),
        PEER_BACKUP: ("I: backup (slave) is restarting, ready as master",
                      STATE_ACTIVE),
        PEER_PASSIVE: ("E: fatal error - dual slaves, aborting", False),
        CLIENT_REQUEST: (CLIENT_REQUEST, True)  # Say true, check peer later
        }
    }


def run_fsm(fsm):
    # There are some transitional states we do not want to handle
    state_dict = fsm_states.get(fsm.state, {})
    res = state_dict.get(fsm.event)
    if res:
        msg, state = res
    else:
        return
    # state 為 False 表示 fsm 狀態異常
    if state is False:
        raise BStarException(msg)
    elif msg == CLIENT_REQUEST:
        assert fsm.peer_expiry > 0
        if int(time.time() * 1000) > fsm.peer_expiry:
            fsm.state = STATE_ACTIVE
        else:
            raise BStarException()
    else:
        print(msg)
        fsm.state = state


def main():
    parser = ArgumentParser()
    group = parser.add_mutually_exclusive_group()
    group.add_argument("-p", "--primary", action="store_true", default=False)
    group.add_argument("-b", "--backup", action="store_true", default=False)
    args = parser.parse_args()

    ctx = zmq.Context()
    statepub = ctx.socket(zmq.PUB)
    statesub = ctx.socket(zmq.SUB)
    statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
    frontend = ctx.socket(zmq.ROUTER)

    fsm = BStarState(0, 0, 0)

    if args.primary:
        print("I: Primary master, waiting for backup (slave)")
        frontend.bind("tcp://*:5001")
        statepub.bind("tcp://*:5003")
        statesub.connect("tcp://localhost:5004")
        fsm.state = STATE_PRIMARY
    elif args.backup:
        print("I: Backup slave, waiting for primary (master)")
        frontend.bind("tcp://*:5002")
        statepub.bind("tcp://*:5004")
        statesub.connect("tcp://localhost:5003")
        statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
        fsm.state = STATE_BACKUP

    send_state_at = int(time.time() * 1000 + HEARTBEAT)
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(statesub, zmq.POLLIN)

    while True:
        time_left = send_state_at - int(time.time() * 1000)
        if time_left < 0:
            time_left = 0
        socks = dict(poller.poll(time_left))
        if socks.get(frontend) == zmq.POLLIN:
            msg = frontend.recv_multipart()
            fsm.event = CLIENT_REQUEST
            try:
                run_fsm(fsm)
                frontend.send_multipart(msg)
            except BStarException:
                del msg

        if socks.get(statesub) == zmq.POLLIN:
            msg = statesub.recv()
            fsm.event = int(msg)
            del msg
            try:
                run_fsm(fsm)
                fsm.peer_expiry = int(time.time() * 1000) + (2 * HEARTBEAT)
            except BStarException:
                break

        # 超過要發布 state 的時間
        if int(time.time() * 1000) >= send_state_at:
            statepub.send_string("%d" % fsm.state)
            # 更新下一次發送 state 的時間
            send_state_at = int(time.time() * 1000) + HEARTBEAT

if __name__ == '__main__':
    main()

bstarcli.py: binary star client


from time import sleep
import zmq


REQUEST_TIMEOUT = 1000  # msecs
SETTLE_DELAY = 2000  # before failing over


def main():
    server = ['tcp://localhost:5001', 'tcp://localhost:5002']
    server_nbr = 0
    ctx = zmq.Context()

    # client 先連到 primary server
    client = ctx.socket(zmq.REQ)
    client.connect(server[server_nbr])
    poller = zmq.Poller()
    poller.register(client, zmq.POLLIN)

    sequence = 0
    while True:
        client.send_string("%s" % sequence)

        expect_reply = True
        while expect_reply:
            socks = dict(poller.poll(REQUEST_TIMEOUT))
            if socks.get(client) == zmq.POLLIN:
                reply = client.recv_string()
                if int(reply) == sequence:
                    print("I: server replied OK (%s)" % reply)
                    expect_reply = False
                    sequence += 1
                    sleep(1)
                else:
                    print("E: malformed reply from server: %s" % reply)
            else:
                # 接收 reply 的 timeout,表示 server 異常,就連接到下一台 server,重發訊息
                print("W: no response from server, failing over")
                sleep(SETTLE_DELAY / 1000)
                poller.unregister(client)
                client.close()
                server_nbr = (server_nbr + 1) % 2
                print("I: connecting to server at %s.." % server[server_nbr])
                client = ctx.socket(zmq.REQ)
                poller.register(client, zmq.POLLIN)
                # reconnect and resend request
                client.connect(server[server_nbr])
                client.send_string("%s" % sequence)

if __name__ == '__main__':
    main()

執行結果


$ python bstarsrv.py -p
I: Primary master, waiting for backup (slave)
I: connected to backup (slave), ready as master
^C

$ python bstarsrv.py -b
I: Backup slave, waiting for primary (master)
I: connected to primary (master), ready as slave

$ python bstarcli.py
I: server replied OK (0)
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
I: server replied OK (4)
I: server replied OK (5)
W: no response from server, failing over
I: connecting to server at tcp://localhost:5002..
I: server replied OK (6)
I: server replied OK (7)
I: server replied OK (8)
I: server replied OK (9)
I: server replied OK (10)
I: server replied OK (11)
I: server replied OK (12)

kill primary server,就可造成 failover,然後 restart primary,再 kill backup,就會 recover。


Binary Star 是用 finite state machine 實作,Events 為 peer state,"Peer Active" 代表另一個 server 通知我們,他是 active。"Client Request" 代表收到 client request。"Client Vote" 代表收到 client request,且 peer 已經 2 heartbeat 為 inactive。


servers 使用 PUB-SUB socket 用作 state 交換,沒有其他 socket combination。PUSH 及 DEALER 會在沒有 peer ready 時,block並不接收 message。PAIR 在 peer 斷線重新啟動後,不會自動 reconnect。ROUTER 需要知道 peer address 用來發送訊息。



Binary Star Reactor

Binary Star 可用來封裝成 reactor class。reactor 用在需要處理訊息後,會比 copy/paste Binary Star code 還方便。


bstar.py: Binary Star core class


"""
Binary Star server

"""

import time

import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

# States we can be in at any point in time
STATE_PRIMARY = 1          # Primary, waiting for peer to connect
STATE_BACKUP = 2           # Backup, waiting for peer to connect
STATE_ACTIVE = 3           # Active - accepting connections
STATE_PASSIVE = 4          # Passive - not accepting connections

# Events, which start with the states our peer can be in
PEER_PRIMARY = 1           # HA peer is pending primary
PEER_BACKUP = 2            # HA peer is pending backup
PEER_ACTIVE = 3            # HA peer is active
PEER_PASSIVE = 4           # HA peer is passive
CLIENT_REQUEST = 5         # Client makes request

# We send state information every this often
# If peer doesn't respond in two heartbeats, it is 'dead'
HEARTBEAT = 1000          # In msecs


class FSMError(Exception):
    """Exception class for invalid state"""
    pass


class BinaryStar(object):
    def __init__(self, primary, local, remote):
        # initialize the Binary Star
        self.ctx = zmq.Context()  # Our private context
        self.loop = IOLoop.instance()  # Reactor loop
        self.state = STATE_PRIMARY if primary else STATE_BACKUP

        self.event = None  # Current event
        self.peer_expiry = 0  # When peer is considered 'dead'
        self.voter_callback = None  # Voting socket handler
        self.master_callback = None  # Call when become master
        self.slave_callback = None  # Call when become slave

        # Create publisher for state going to peer
        self.statepub = self.ctx.socket(zmq.PUB)
        self.statepub.bind(local)

        # Create subscriber for state coming from peer
        self.statesub = self.ctx.socket(zmq.SUB)
        self.statesub.setsockopt_string(zmq.SUBSCRIBE, u'')
        self.statesub.connect(remote)

        # wrap statesub in ZMQStream for event triggers
        self.statesub = ZMQStream(self.statesub, self.loop)

        # setup basic reactor events
        self.heartbeat = PeriodicCallback(self.send_state,
                                          HEARTBEAT, self.loop)
        self.statesub.on_recv(self.recv_state)

    def update_peer_expiry(self):
        """Update peer expiry time to be 2 heartbeats from now."""
        self.peer_expiry = time.time() + 2e-3 * HEARTBEAT

    def start(self):
        self.update_peer_expiry()
        self.heartbeat.start()
        return self.loop.start()

    def execute_fsm(self):
        """Binary Star finite state machine (applies event to state)

        returns True if connections should be accepted, False otherwise.
        """
        accept = True
        if self.state == STATE_PRIMARY:
            # Primary server is waiting for peer to connect
            # Accepts CLIENT_REQUEST events in this state
            if self.event == PEER_BACKUP:
                print("I: connected to backup (slave), ready as master")
                self.state = STATE_ACTIVE
                if self.master_callback:
                    self.loop.add_callback(self.master_callback)
            elif self.event == PEER_ACTIVE:
                print("I: connected to backup (master), ready as slave")
                self.state = STATE_PASSIVE
                if self.slave_callback:
                    self.loop.add_callback(self.slave_callback)
            elif self.event == CLIENT_REQUEST:
                if time.time() >= self.peer_expiry:
                    print("I: request from client, ready as master")
                    self.state = STATE_ACTIVE
                    if self.master_callback:
                        self.loop.add_callback(self.master_callback)
                else:
                    # don't respond to clients yet - we don't know if
                    # the backup is currently Active as a result of
                    # a successful failover
                    accept = False
        elif self.state == STATE_BACKUP:
            # Backup server is waiting for peer to connect
            # Rejects CLIENT_REQUEST events in this state
            if self.event == PEER_ACTIVE:
                print("I: connected to primary (master), ready as slave")
                self.state = STATE_PASSIVE
                if self.slave_callback:
                    self.loop.add_callback(self.slave_callback)
            elif self.event == CLIENT_REQUEST:
                accept = False
        elif self.state == STATE_ACTIVE:
            # Server is active
            # Accepts CLIENT_REQUEST events in this state
            # The only way out of ACTIVE is death
            if self.event == PEER_ACTIVE:
                # Two masters would mean split-brain
                print("E: fatal error - dual masters, aborting")
                raise FSMError("Dual Masters")
        elif self.state == STATE_PASSIVE:
            # Server is passive
            # CLIENT_REQUEST events can trigger failover if peer looks dead
            if self.event == PEER_PRIMARY:
                # Peer is restarting - become active, peer will go passive
                print("I: primary (slave) is restarting, ready as master")
                self.state = STATE_ACTIVE
            elif self.event == PEER_BACKUP:
                # Peer is restarting - become active, peer will go passive
                print("I: backup (slave) is restarting, ready as master")
                self.state = STATE_ACTIVE
            elif self.event == PEER_PASSIVE:
                # Two passives would mean cluster would be non-responsive
                print("E: fatal error - dual slaves, aborting")
                raise FSMError("Dual slaves")
            elif self.event == CLIENT_REQUEST:
                # Peer becomes master if timeout has passed
                # It's the client request that triggers the failover
                assert self.peer_expiry > 0
                if time.time() >= self.peer_expiry:
                    # If peer is dead, switch to the active state
                    print("I: failover successful, ready as master")
                    self.state = STATE_ACTIVE
                else:
                    # If peer is alive, reject connections
                    accept = False
            # Call state change handler if necessary
            if self.state == STATE_ACTIVE and self.master_callback:
                self.loop.add_callback(self.master_callback)
        return accept

    # ---------------------------------------------------------------------
    # Reactor event handlers...

    def send_state(self):
        """Publish our state to peer"""
        self.statepub.send_string("%d" % self.state)

    def recv_state(self, msg):
        """Receive state from peer, execute finite state machine"""
        state = msg[0]
        if state:
            self.event = int(state)
            self.update_peer_expiry()
        self.execute_fsm()

    def voter_ready(self, msg):
        """Application wants to speak to us, see if it's possible"""
        # If server can accept input now, call appl handler
        self.event = CLIENT_REQUEST
        if self.execute_fsm():
            print("CLIENT REQUEST")
            self.voter_callback(self.voter_socket, msg)
        else:
            # Message will be ignored
            pass

    # -------------------------------------------------------------------------
    #

    def register_voter(self, endpoint, type, handler):
        """Create socket, bind to local endpoint, and register as reader for
        voting. The socket will only be available if the Binary Star state
        machine allows it. Input on the socket will act as a "vote" in the
        Binary Star scheme.  We require exactly one voter per bstar instance.

        handler will always be called with two arguments: (socket,msg)
        where socket is the one we are creating here, and msg is the message
        that triggered the POLLIN event.
        """
        assert self.voter_callback is None

        socket = self.ctx.socket(type)
        socket.bind(endpoint)
        self.voter_socket = socket
        self.voter_callback = handler

        stream = ZMQStream(socket, self.loop)
        stream.on_recv(self.voter_ready)

bstarsrv2.py: Binary Star server


"""
Binary Star server, using bstar reactor

"""

import sys

import zmq

from bstar import BinaryStar


def echo(socket, msg):
    """Echo service"""
    socket.send_multipart(msg)


def main():
    # Arguments can be either of:
    #     -p  primary server, at tcp://localhost:5001
    #     -b  backup server, at tcp://localhost:5002
    if '-p' in sys.argv:
        star = BinaryStar(True, "tcp://*:5003", "tcp://localhost:5004")
        star.register_voter("tcp://*:5001", zmq.ROUTER, echo)
    elif '-b' in sys.argv:
        star = BinaryStar(False, "tcp://*:5004", "tcp://localhost:5003")
        star.register_voter("tcp://*:5002", zmq.ROUTER, echo)
    else:
        print("Usage: bstarsrv2.py { -p | -b }\n")
        return

    star.start()

if __name__ == '__main__':
    main()

bstarsrv2 啟動時發生 tornado error,可能要把 tornado 由 5.1 降至 4.5 才行,沒有測試不確定。


Freelance pattern: Brokerless Reliability


distributed peer-to-peer architecture: Freelance Pattern


use case 為 name resolution service


ZeroMQ 常見的架構問題:如何得知要連接到哪一個 endpoint?能不能不要 hard-coding,也不要用設定檔。


ZeroMQ name service 有以下工作


  • 解析 logical name 為 bind endpoint,及 connect endpoint。實際上要能提供多個 bind endpoints、多個 connect endpoints會更好。
  • 可管理多個平行環境,例如在不需要修改程式,區分 "test" 及 "production"
  • application 在 name service 異常時,就無法連接到網路,所以 name service 必須要 reliable

可將 name service 放在 service-oriented Majordomo broker,name service 變成唯一的 global network endpoint,client 只需要設定 name service 的 endpoint。



要處理的異常有 server crashes and restarts, server busy looping, server overload, network issues。要能 reliable,必須建立 pool of name servers。實際上,兩個就夠了。


架構中,多個 clients 直接連接到少量 servers,servers 會 bind addresses,這跟 Majordomo 不同。clients 有幾種實作方式:


  • REQ sockets 及 Lazy Pirate Pattern。實作簡單,但需要增加一些 code,否則 client 回一直嘗試連到 dead servers
  • DEALER sockets 及 blast out requests (load balanced to all connected servers) 直到取得 reply 為止
  • ROUTER sockets。clients 可使用特定 servers,但 client 要如何知道 identity of the server sockets? server 可 ping client,或是 server hard-coded,使用固定的 identity。

Model 1: Simple Retry and Failover

將 Lazy Pirate 改寫為跟多個 server endpoints 運作


flserver1.py: Freelance server


#
# Freelance server - Model 1
# Trivial echo service
import sys
import zmq

if len(sys.argv) < 2:
    print( "I: Syntax: %s <endpoint>" % sys.argv[0] )
    sys.exit(0)

endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)

print( "I: Echo service is ready at %s" % endpoint )
while True:
    msg = server.recv_multipart()
    if not msg:
        break  # Interrupted
    server.send_multipart(msg)

server.setsockopt(zmq.LINGER, 0) # Terminate immediately

flclient1.py


#
# Freelance Client - Model 1
# Uses REQ socket to query one or more services

import sys
import time

import zmq

REQUEST_TIMEOUT = 1000  # ms
MAX_RETRIES = 3   # Before we abandon

def try_request(ctx, endpoint, request):
    print( "I: Trying echo service at %s..." % endpoint )
    client = ctx.socket(zmq.REQ)
    client.setsockopt(zmq.LINGER, 0)  # Terminate early
    client.connect(endpoint)
    client.send(request)
    poll = zmq.Poller()
    poll.register(client, zmq.POLLIN)
    socks = dict(poll.poll(REQUEST_TIMEOUT))
    if socks.get(client) == zmq.POLLIN:
        reply = client.recv_multipart()
    else:
        reply = ''
    poll.unregister(client)
    client.close()
    return reply

context = zmq.Context()
request = b"Hello world"
reply = None

endpoints = len(sys.argv) - 1
if endpoints == 0:
    print( "I: syntax: %s <endpoint> ..." % sys.argv[0] )
elif endpoints == 1:
    # For one endpoint, we retry N times
    endpoint = sys.argv[1]
    for retries in range(MAX_RETRIES):
        reply = try_request(context, endpoint, request)
        if reply:
            break  # Success
        print( "W: No response from %s, retrying" % endpoint )
else:
    # For multiple endpoints, try each at most once
    for endpoint in sys.argv[1:]:
        reply = try_request(context, endpoint, request)
        if reply:
            break  # Success
        print( "W: No response from %s" % endpoint )

if reply:
    print( "Service is running OK" )

執行結果


$python flserver1.py tcp://*:5555
I: Echo service is ready at tcp://*:5555

$python flserver1.py tcp://*:5556
I: Echo service is ready at tcp://*:5556

$python flclient1.py tcp://localhost:5555 tcp:/localhost:5556
I: Trying echo service at tcp://localhost:5555...
Service is running OK

雖然是 Lazy Pirate,client 只需要取得一個 reply,但有兩個部分要注意


  • 如果是 single server,client 會重試很多次,就像 Lazy Pirate 一樣
  • 如果是 multiple servers,client 會嘗試每一個 server 一次,直到收到 reply,或是已經試過所有的 servers

這個方式解決 Lazy Pirate 的問題:無法 fail over 到 backup/alternate servers


但正式環境不能用這個方法,如果使用了多個 sockets,primary name server 掛了,就會一直遇到 timeout。


Model 2: Brutal Shotgun Massacre

改用 DEALER socket,要確保 shortest possible time 時間內,取得 reply。


client 改用以下方法:


  • 設定後,連接所有 servers
  • 如果有 request,就全發給所有的 servers
  • 等待第一個 reply,忽略其他 replies

client 會取得多個 replies,不確定會有幾個。requests, replies 都可能會遺失。


在 request 增加 sequence number,並忽略錯誤的 number 的 replies


flserver2.py


#
# Freelance server - Model 2
# Does some work, replies OK, with message sequencing


import sys
import zmq

if len(sys.argv) < 2:
    print( "I: Syntax: %s <endpoint>" % sys.argv[0] )
    sys.exit(0)

endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)

print( "I: Service is ready at %s" % endpoint )
while True:
    request = server.recv_multipart()
    if not request:
        break  # Interrupted
    # Fail nastily if run against wrong client
    assert len(request) == 2

    address = request[0]
    reply = [address, b"OK"]
    server.send_multipart(reply)

server.setsockopt(zmq.LINGER, 0)  # Terminate early

flclient2.py


#
# Freelance Client - Model 2
# Uses DEALER socket to blast one or more services
import sys
import time

import zmq

GLOBAL_TIMEOUT = 2500  # ms

class FLClient(object):
    def __init__(self):
        self.servers = 0
        self.sequence = 0
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)   # DEALER

    def destroy(self):
        self.socket.setsockopt(zmq.LINGER, 0)  # Terminate early
        self.socket.close()
        self.context.term()

    def connect(self, endpoint):
        self.socket.connect(endpoint)
        self.servers += 1
        print( "I: Connected to %s" % endpoint )

    def request(self, *request):
        # Prefix request with sequence number and empty envelope
        self.sequence += 1
        msg = [b'', str(self.sequence).encode() ] + list(request)


        # Blast the request to all connected servers
        for server in range(self.servers):
            self.socket.send_multipart(msg)

        # Wait for a matching reply to arrive from anywhere
        # Since we can poll several times, calculate each one
        poll = zmq.Poller()
        poll.register(self.socket, zmq.POLLIN)

        reply = None
        endtime = time.time() + GLOBAL_TIMEOUT / 1000
        while time.time() < endtime:
            socks = dict(poll.poll((endtime - time.time()) * 1000))
            if socks.get(self.socket) == zmq.POLLIN:
                reply = self.socket.recv_multipart()
                assert len(reply) == 3
                sequence = int(reply[1].decode())
                if sequence == self.sequence:
                    break
        return reply

if len(sys.argv) == 1:
    print( "I: Usage: %s <endpoint> ..." % sys.argv[0] )
    sys.exit(0)

# Create new freelance client object
client = FLClient()

for endpoint in sys.argv[1:]:
    client.connect(endpoint)

start = time.time()
for requests in range(10000):
    request = b"random name"
    reply = client.request(request)
    if not reply:
        print( "E: Name service not available, aborting" )
        break
print( "Average round trip cost: {} usec".format((time.time() - start) / 100) )
client.destroy()

執行結果


$python flserver2.py tcp://*:5555
I: Service is ready at tcp://*:5555

$python flclient2.py tcp://localhost:5555
I: Connected to tcp://localhost:5555
Average round trip cost: 0.024947671890258788 usec

client 實作要注意


  • client 要用 class-based API 封裝
  • cloent 在數秒內,沒有連到任何 respoinsive server,就會放棄
  • client 會產生 valid REP envelope,增加 empty frame

client 會執行 10000 name resolution request,並測量 average cost


優點:


  • simple,容易實作
  • 有failover 功能,運作快,只要至少有一個 server 運作即可

缺點


  • 會產生 redundant network traffic
  • 無法設定 server 優先順序
  • server 一次只能處理一個 request

Model 3: Complex and Nasty

改用 ROUTER socket,可發送 request 給特定 servers,避免使用 dead servers。


在 ROUTER-ROUTER 之間,兩端都需要 identity,必須要收到第一個 message 後,才會產生 identity。唯一的解決方式,是在一個方向採用 hard-coded identities。


使用 connection endpoint 為 identity。


ZeroMQ identities 運作方式為,server ROUTER socket 會在 bind socket 前,設定 identity,在 client 連線時,會交換 identities,然後才會發送真正的訊息。


client ROUTER socket 一開始沒有設定 identity,會送 null identity 給 server,server 會產生 random UUID 給該 client,並發送給 client。


client 不是在 zmq_connect()後,就馬上可以 route message 到 server,而是在 random time 以後,這裡會產生一個問題:我們不知道 server 是否存在,但如果 server online,在幾 ms 後,就會完成。


我們需要知道哪些 servers 可連線使用。在 Freelance pattern,不同於 broker-based patterns,servers 一開始都是靜默的等待別人連線。


解決方案是採用 shotgun approach,就是嘗試發送 ping-pong heartbeat 給所有 servers。


how Freelance client and server exchange ping-pong commands and request-reply commands 說明了這個 protocol


flclient3.py


"""
Freelance client - Model 3

Uses flcliapi class to encapsulate Freelance pattern
"""

import time

from flcliapi import FreelanceClient

def main():
    # Create new freelance client object
    client = FreelanceClient()

    # Connect to several endpoints
    client.connect ("tcp://localhost:5555")
    client.connect ("tcp://localhost:5556")
    client.connect ("tcp://localhost:5557")

    # Send a bunch of name resolution 'requests', measure time
    requests = 10000
    start = time.time()
    for i in range(requests):
        request = [b"random name"]
        reply = client.request(request)
        if not reply:
            print( "E: name service not available, aborting" )
            return

    print( "Average round trip cost: {} usec".format(1e6*(time.time() - start) / requests) )

if __name__ == '__main__':
    main()

flcliapi.py


"""
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services

"""

import threading
import time

import zmq

from zhelpers import zpipe

# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 3000    # msecs
# PING interval for servers we think are alivecp
PING_INTERVAL  = 2000    # msecs
# Server considered dead if silent for this long
SERVER_TTL     = 6000    # msecs


def flciapi_agent(peer):
    """This is the thread that handles our real flcliapi class
    """
    pass

# =====================================================================
# Synchronous part, works in our application thread

class FreelanceClient(object):
    ctx = None      # Our Context
    pipe = None     # Pipe through to flciapi agent
    agent = None    # agent in a thread

    def __init__(self):
        self.ctx = zmq.Context()
        self.pipe, peer = zpipe(self.ctx)
        self.agent = threading.Thread(target=agent_task, args=(self.ctx,peer))
        self.agent.daemon = True
        self.agent.start()


    def connect(self, endpoint):
        """Connect to new server endpoint
        Sends [CONNECT][endpoint] to the agent
        """
        self.pipe.send_multipart([b"CONNECT", endpoint.encode()])
        time.sleep(0.1) # Allow connection to come up

    def request(self, msg):
        "Send request, get reply"
        request = [b"REQUEST"] + msg
        self.pipe.send_multipart(request)
        reply = self.pipe.recv_multipart()
        status = reply.pop(0)
        if status != b"FAILED":
            return reply


# =====================================================================
# Asynchronous part, works in the background

# ---------------------------------------------------------------------
# Simple class for one server we talk to

class FreelanceServer(object):
    endpoint = None         # Server identity/endpoint
    alive = True            # 1 if known to be alive
    ping_at = 0             # Next ping at this time
    expires = 0             # Expires at this time

    def __init__(self, endpoint):
        self.endpoint = endpoint
        self.alive = True
        self.ping_at = time.time() + 1e-3*PING_INTERVAL
        self.expires = time.time() + 1e-3*SERVER_TTL

    def ping(self, socket):
        if time.time() > self.ping_at:
            socket.send_multipart([self.endpoint.encode(), b'PING'])
            self.ping_at = time.time() + 1e-3*PING_INTERVAL

    def tickless(self, tickless):
        if tickless > self.ping_at:
            tickless = self.ping_at
        return tickless

# ---------------------------------------------------------------------
# Simple class for one background agent

class FreelanceAgent(object):
    ctx = None              # Own context
    pipe = None             # Socket to talk back to application
    router = None           # Socket to talk to servers
    servers = None          # Servers we've connected to
    actives = None          # Servers we know are alive
    sequence = 0            # Number of requests ever sent
    request = None          # Current request if any
    reply = None            # Current reply if any
    expires = 0             # Timeout for request/reply

    def __init__(self, ctx, pipe):
        self.ctx = ctx
        self.pipe = pipe
        self.router = ctx.socket(zmq.ROUTER)
        self.servers = {}
        self.actives = []

    def control_message (self):
        msg = self.pipe.recv_multipart()
        command = msg.pop(0)

        if command == b"CONNECT":
            endpoint = msg.pop(0).decode()
            print( "I: connecting to %s...\n" % endpoint ),
            self.router.connect(endpoint)
            server = FreelanceServer(endpoint)
            self.servers[endpoint] = server
            self.actives.append(server)
            # these are in the C case, but seem redundant:
            server.ping_at = time.time() + 1e-3*PING_INTERVAL
            server.expires = time.time() + 1e-3*SERVER_TTL
        elif command == b"REQUEST":
            assert not self.request    # Strict request-reply cycle
            # Prefix request with sequence number and empty envelope
            self.request = [str(self.sequence).encode(), b''] + msg

            # Request expires after global timeout
            self.expires = time.time() + 1e-3*GLOBAL_TIMEOUT

    def router_message (self):
        reply = self.router.recv_multipart()
        # Frame 0 is server that replied
        endpoint = reply.pop(0).decode()
        server = self.servers[endpoint]
        if not server.alive:
            self.actives.append(server)
            server.alive = 1

        server.ping_at = time.time() + 1e-3*PING_INTERVAL
        server.expires = time.time() + 1e-3*SERVER_TTL;

        # Frame 1 may be sequence number for reply
        sequence = reply.pop(0)
        if int(sequence) == self.sequence:
            self.sequence += 1
            reply = [b"OK"] + reply
            self.pipe.send_multipart(reply)
            self.request = None


# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.

def agent_task(ctx, pipe):
    agent = FreelanceAgent(ctx, pipe)
    poller = zmq.Poller()
    poller.register(agent.pipe, zmq.POLLIN)
    poller.register(agent.router, zmq.POLLIN)

    while True:
        # Calculate tickless timer, up to 1 hour
        tickless = time.time() + 3600
        if (agent.request and tickless > agent.expires):
            tickless = agent.expires
            for server in agent.servers.values():
                tickless = server.tickless(tickless)
        try:
            items = dict(poller.poll(1000 * (tickless - time.time())))
        except:
            break              # Context has been shut down

        if agent.pipe in items:
            agent.control_message()

        if agent.router in items:
            agent.router_message()

        # If we're processing a request, dispatch to next server
        if (agent.request):
            if (time.time() >= agent.expires):
                # Request expired, kill it
                agent.pipe.send(b"FAILED")
                agent.request = None
            else:
                # Find server to talk to, remove any expired ones
                while agent.actives:
                    server = agent.actives[0]
                    if time.time() >= server.expires:
                        server.alive = 0
                        agent.actives.pop(0)
                    else:
                        request = [server.endpoint.encode()] + agent.request
                        agent.router.send_multipart(request)
                        break

        # Disconnect and delete any expired servers
        # Send heartbeats to idle servers if needed
        for server in agent.servers.values():
            server.ping(agent.router)

flserver3.py


"""Freelance server - Model 3

Uses an ROUTER/ROUTER socket but just one thread
"""

import sys

import zmq

from zhelpers import dump

def main():
    verbose = '-v' in sys.argv

    ctx = zmq.Context()
    # Prepare server socket with predictable identity
    bind_endpoint = "tcp://*:5555"
    connect_endpoint = "tcp://localhost:5555"
    server = ctx.socket(zmq.ROUTER)
    server.identity = connect_endpoint.encode()
    server.bind(bind_endpoint)
    print( "I: service is ready at", bind_endpoint )

    while True:
        try:
            request = server.recv_multipart()
        except:
            break # Interrupted
        # Frame 0: identity of client
        # Frame 1: PING, or client control frame
        # Frame 2: request body
        address, control = request[:2]
        reply = [address, control]
        if control == b"PING":
            reply[1] = b"PONG"
        else:
            reply.append(b"OK")
        if verbose:
            dump(reply)
        server.send_multipart(reply)
    print( "W: interrupted" )

if __name__ == '__main__':
    main()

執行結果


$ python flserver3.py -v
I: service is ready at tcp://*:5555
----------------------------------------
[005] 0x00800041aa
[001] 0
[002] OK
----------------------------------------
[005] 0x00800041aa
[001] 1
[002] OK
----------------------------------------
[005] 0x00800041aa
[001] 2
[002] OK


$ python flclient3.py
I: connecting to tcp://localhost:5555...

I: connecting to tcp://localhost:5556...

I: connecting to tcp://localhost:5557...

Average round trip cost: 652.7282238006592 usec

  • Multithreaded API: client API 有兩個部分,在 application thread 運作的 synchronous flcliapi class,及背景運作的 asynchronous agent。ZeroMQ 可用在 multithreaded apps,flcliapi 及 agent class 會用 inproc socket 互相溝通,所有 ZeroMQ 相關功能,都封裝在 API 裡面。agent 就像是 mini-broker,在背景跟所有 servers 溝通。發送 request 時,會盡可能連接到可使用的 server。
  • Tickless poll timer: 以往是在 poll loop 使用 fixed tick interval,但還是會造成 CPU costs power,這邊的 agent 使用 tickless timer,會根據 next timeout 計算 poll delay。

References


ØMQ - The Guide

沒有留言:

張貼留言