討論 reliability 的問題,在 ZMQ core request-reply pattern 上,提供 reliable messaging Patterns
- The Lazy Pirate pattern: reliable request-reply from the client side
- The Simple Pirate pattern: reliable request-reply using load balancing
- The Paranoid Pirate pattern: reliable request-reply with heartbeating
- The Majordomo pattern: service-oriented reliable queuing
- The Titanic pattern: disk-based/disconnected reliable queuing
- The Binary Star pattern: primary-backup server failover
- The Freelance pattern: brokerless reliable request-reply
"Reliability" 是什麼?
以 failure 定義 reliability,如果可以處理 well-defined and unstood failure,就是 reliable。
- application code 會 crash, exit, freeze, stop responding to input,或是處理太慢,把記憶體耗盡...
- system code (例如 broker) 因某個原因 crash,system code 應該要比 application code 穩定。但還是可能因為 slow clients 造成 queued messages 把記憶體耗盡。
- message queue 會發生 overflow,通常在 system code 針對 slow clients 有對應處理方式,就是直接丟棄 message,因此會造成 "lost" message
- network 異常,ZMQ 會自動 reconnect,但是 message 會發生 lost
- 硬體異常
- network 以奇怪的方式故障,例如 switch 的某些 port 故障,造成某些網路區段沒有回應
- 因為 lightning, earthquakes, fire, 電力或降溫故障,造成 data centers 異常
為了增加軟體可靠度,有些問題的對應處理已超過 ZMQ 的範圍
前五個異常已經佔了 99.9% 的比例
設計 reliability
"要讓程式在 freezes/crashes 還能正常運作"
Request-reply:
如果 server 在處理某個 request 時掛了,client 會因為沒有收到回應,知道發生問題。可以找另一個 server 重做一次。client 部分如果掛掉,可以由 client 開發者處理。Pub-sub:
如果 client dies,server 不會知道。因為 Pub-sub 的 client 不會回傳資訊給 server。但 client 可透過另一個管道,發送訊息通知 server。server crash 的部分,目前無法處理。subscribers 可在運作太慢時,採取行動通知管理員。Pipeline:
如果 worker dies,ventilator 不會知道。pipeline 只會單向運作,但 downstream collector 可偵測某個 task 沒有完成的狀況,可回送訊息給 ventilator,通知要重送某個 task。如果 ventilator/collector 掛了,client等待會逾時,然後可重送所有 tasks
本章只專注討論 request-reply 的部分
基本的 request-reply pattern (REQ client socket 會 blocking send/receive to REP server socket。如果 server 在處理 request 時 crash、遺失 request 或reply,client 會一直等待。
request-reply 因為ZMQ 有 reconnect peer and load balance message 的功能,表現比傳統 tcp 好,如果在沒有網路或不同 process 之間運作時,會不太穩定,在 threads 之間運作會很穩定(因為沒有網路)。
可設計 reliable request-reply (RRR) pattern 稱為 Pirate pattern
有 3 種方式連接 client 及 server,每一種處理 reliability 的方式不同
- 多個 clients 跟單一 server 溝通。Use Case: clients 連到一個已知的 server。要處理的異常為:server crashes, restarts,網路斷線
- 多個 clients 跟 broker proxy 溝通,發布工作到多個 workers。Use Case: service oriented transaction processing。要處理的異常為:worker crashes/restarts、worker busy looping/overload、queue crashes/restarts、network 斷線
- 多個 clients 跟多個 servers 直接溝通,沒有透過 proxies。Use Case: distributed service 例如 dns。要處理的異常為:service crashes/restart、service busy looping/overload、網路斷線
Lazy Pirate pattern, Client-Side Reliability
- poll REQ socket,只在有 reply 到達時,才取得 reply
- 如果在 timeout 後,沒有收到 reply,就重發 request
- 在幾次 request 後還是沒有收到 reply,就放棄這個 transaction
如果像讓 REQ socket 不遵循 send/receive 順序時,會收到 "EFSM" error,一位 REQ socket 是以 finite-state machine 實作,限制一定會遵循 send/receive 順序。
但在 pirate pattern 會因為沒收到 reply 重發 request,就會出現這個 error。解決方式是在收到 error 時 close 並 reopen REQ socket。
lpclient.py
#
# Lazy Pirate client
# Use zmq_poll to do a safe request-reply
# To run, start lpserver and then randomly kill/restart it
from __future__ import print_function
import zmq
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
context = zmq.Context(1)
print("I: Connecting to server...")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
sequence += 1
request = str(sequence).encode()
print("I: Sending (%s)" % request)
client.send(request)
expect_reply = True
while expect_reply:
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv()
if not reply:
break
if int(reply) == sequence:
print("I: Server replied OK (%s)" % reply)
retries_left = REQUEST_RETRIES
expect_reply = False
else:
print("E: Malformed reply from server: %s" % reply)
else:
print("W: No response from server, retrying...")
# close 舊的 socket,重試次數 -1
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
retries_left -= 1
if retries_left == 0:
# retry 超過3次,就認為 server 斷線無法復原
print("E: Server seems to be offline, abandoning")
break
# 還可以retry,建立新的 client REQ socket,重發 request
print("I: Reconnecting and resending (%s)" % request)
# Create new connection
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll.register(client, zmq.POLLIN)
client.send(request)
context.term()
lpserver.py
#
# Lazy Pirate server
# Binds REQ socket to tcp://*:5555
# Like hwserver except:
# - echoes request as-is
# - randomly runs slowly, or exits to simulate a crash.
from __future__ import print_function
from random import randint
import time
import zmq
context = zmq.Context(1)
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")
cycles = 0
while True:
request = server.recv()
cycles += 1
# Simulate various problems, after a few cycles
# 在處理幾次後,就故意產生 error
if cycles > 3 and randint(0, 3) == 0:
print("I: Simulating a crash")
break
elif cycles > 3 and randint(0, 3) == 0:
print("I: Simulating CPU overload")
time.sleep(2)
print("I: Normal request (%s)" % request)
time.sleep(1) # Do some heavy work
server.send(request)
server.close()
context.term()
執行結果
$ python lpserver.py
I: Normal request (b'1')
I: Normal request (b'2')
I: Normal request (b'3')
I: Normal request (b'4')
I: Simulating a crash
$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')
I: Server replied OK (b'3')
I: Sending (b'4')
I: Server replied OK (b'4')
I: Sending (b'5')
W: No response from server, retrying...
I: Reconnecting and resending (b'5')
W: No response from server, retrying...
I: Reconnecting and resending (b'5')
W: No response from server, retrying...
E: Server seems to be offline, abandoning
client 依序傳送每個訊息,檢查 replies 是否依照順序。也就是沒有 request/replies 遺失,沒有超過一個以上的 replies。不需要加上序號,也可以保證訊息會依照順序傳遞。
client 使用 REQ socket,當沒有遵循 send/receive cycle 時,就會強制 close/reopne socket。不建議換成 DEALER,因為需要處理成類似 REQ socket 的 envelopes,另外還有可能會取得異常的 replies。
雖然有多個 clients,單一 server,但只需要在 client 處理 failures。
優點
- 容易實作及了解
- 容易在既有 client/server app code 實作
- ZeroMQ 會自動 reconnect
缺點
- 無法 failover 到 backup/alternate servers
Simple Pirate pattern, Basic Reliable Queueing
以 queue proxy 擴充 Lazy Pirate pattern,可連線到多個 servers (workers)。
在 Pirate pattern 裡面,workers 都是 stateless。如果 application 需要 shared state (ex: shared database),可自行處理,不需要放在 messaging framework 裡面。workers 可自行連線或斷線,不需要通知 clients。
queue proxy 適用 chap 3 的 load balancing broker 實作的,但需要加上一些處理 dead/blocked workers 的機制。
因 clients 已經有 retry 機制,同樣方式也可以在 load balancing pattern 運作。
spqueue.py: simple pirate queue
#
# Simple Pirate queue
# This is identical to the LRU pattern, with no reliability mechanisms
# at all. It depends on the client for recovery. Runs forever.
import zmq
LRU_READY = "\x01"
context = zmq.Context(1)
frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556") # For workers
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
workers = []
while True:
if workers:
socks = dict(poll_both.poll())
else:
socks = dict(poll_workers.poll())
# Handle worker activity on backend
if socks.get(backend) == zmq.POLLIN:
# Use worker address for LRU routing
msg = backend.recv_multipart()
if not msg:
break
address = msg[0]
workers.append(address)
# Everything after the second (delimiter) frame is reply
reply = msg[2:]
# Forward message to client if it's not a READY
if reply[0] != LRU_READY:
frontend.send_multipart(reply)
if socks.get(frontend) == zmq.POLLIN:
# Get client request, route to first available worker
# msg = frontend.recv_multipart()
# request = [workers.pop(0), b''] + msg
# backend.send_multipart( request )
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
spworker.py: simple pirate worker
#
# Simple Pirate worker
# Connects REQ socket to tcp://*:5556
# Implements worker part of LRU queueing
from random import randint
import time
import zmq
LRU_READY = "\x01"
context = zmq.Context(1)
worker = context.socket(zmq.REQ)
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
worker.connect("tcp://localhost:5556")
print( "I: (%s) worker ready" % identity)
worker.send_string(LRU_READY)
cycles = 0
while True:
msg = worker.recv_multipart()
if not msg:
break
cycles += 1
if cycles > 3 and randint(0, 5) == 0:
print( "I: (%s) simulating a crash" % identity)
break
elif cycles > 3 and randint(0, 5) == 0:
print( "I: (%s) simulating CPU overload" % identity)
time.sleep(3)
print( "I: (%s) normal reply" % identity)
time.sleep(1) # Do some heavy work
worker.send_multipart(msg)
測試時啟動多個 workers,一個 Lazy Pirate client 及 queue,誰先啟動都沒關係。
$ python spworker.py
I: (AF11-C837) worker ready
I: (AF11-C837) normal reply
I: (AF11-C837) normal reply
$ python spqueue.py
$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')
Paranoid Pirate pattern, Robust Reliable Queueing
Simple Pirate Pattern 只是把兩個既有的 pattern 合併再一起,但還是有兩個問題:
當遇到 queue crash/restart 時,client 會 recover 但 worker 不行。ZeroMQ 會自動重連 workers' sockets,但因為是新的 queue,workers 沒有初始化。因此要增加 queue 到 worker 的 heartbeating,用來偵測 queue 斷線。
queue 無法偵測 worker failure,如果 worker 在 idle 時 crash,queue 在發送一個 request 前,無法由 worker queue 中移除,client 會進入 wait/retry 程序。雖不是很嚴重的問題,但不是很好。在 worker 到 queue 增加 heartbeating,可偵測 lost worker。
原本是在 worker 使用 REQ socket,但在 Paranoid Pirate worker,要換成 DEALER socket。
ppqueue.py: 擴充 load balancing pattern with heartbeating of workers
#
## Paranoid Pirate queue
from collections import OrderedDict
import time
import zmq
HEARTBEAT_LIVENESS = 3 # 3..5 is reasonable
HEARTBEAT_INTERVAL = 1.0 # Seconds
# Paranoid Pirate Protocol constants
PPP_READY = b"\x01" # Signals worker is ready
PPP_HEARTBEAT = b"\x02" # Signals worker heartbeat
class Worker(object):
def __init__(self, address):
self.address = address
self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
class WorkerQueue(object):
def __init__(self):
self.queue = OrderedDict()
def ready(self, worker):
self.queue.pop(worker.address, None)
self.queue[worker.address] = worker
def purge(self):
"""Look for & kill expired workers."""
t = time.time()
expired = []
for address,worker in self.queue.items():
if t > worker.expiry: # Worker expired
expired.append(address)
for address in expired:
print( "W: Idle worker expired: %s" % address )
self.queue.pop(address, None)
def next(self):
address, worker = self.queue.popitem(False)
return address
context = zmq.Context(1)
frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556") # For workers
poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)
poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)
workers = WorkerQueue()
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
while True:
if len(workers.queue) > 0:
poller = poll_both
else:
poller = poll_workers
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# Handle worker activity on backend
if socks.get(backend) == zmq.POLLIN:
# Use worker address for LRU routing
frames = backend.recv_multipart()
if not frames:
break
address = frames[0]
workers.ready(Worker(address))
# Validate control message, or return reply to client
msg = frames[1:]
if len(msg) == 1:
if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
print( "E: Invalid message from worker: %s" % msg )
else:
frontend.send_multipart(msg)
# Send heartbeats to idle workers if it's time
if time.time() >= heartbeat_at:
for worker in workers.queue:
msg = [worker, PPP_HEARTBEAT]
backend.send_multipart(msg)
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
if socks.get(frontend) == zmq.POLLIN:
frames = frontend.recv_multipart()
if not frames:
break
frames.insert(0, workers.next())
backend.send_multipart(frames)
workers.purge()
ppworker.py
#
## Paranoid Pirate worker
from random import randint
import time
import zmq
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32
# Paranoid Pirate Protocol constants
PPP_READY = b"\x01" # Signals worker is ready
PPP_HEARTBEAT = b"\x02" # Signals worker heartbeat
def worker_socket(context, poller):
"""Helper function that returns a new configured socket
connected to the Paranoid Pirate queue"""
worker = context.socket(zmq.DEALER) # DEALER
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
poller.register(worker, zmq.POLLIN)
worker.connect("tcp://localhost:5556")
worker.send(PPP_READY)
return worker
context = zmq.Context(1)
poller = zmq.Poller()
liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
worker = worker_socket(context, poller)
cycles = 0
while True:
socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))
# Handle worker activity on backend
if socks.get(worker) == zmq.POLLIN:
# Get message
# - 3-part envelope + content -> request
# - 1-part HEARTBEAT -> heartbeat
frames = worker.recv_multipart()
if not frames:
break # Interrupted
if len(frames) == 3:
# Simulate various problems, after a few cycles
cycles += 1
if cycles > 3 and randint(0, 5) == 0:
print( "I: Simulating a crash" )
break
if cycles > 3 and randint(0, 5) == 0:
print( "I: Simulating CPU overload" )
time.sleep(3)
print( "I: Normal reply" )
worker.send_multipart(frames)
liveness = HEARTBEAT_LIVENESS
time.sleep(1) # Do some heavy work
elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
print( "I: Queue heartbeat" )
liveness = HEARTBEAT_LIVENESS
else:
print( "E: Invalid message: %s" % frames )
interval = INTERVAL_INIT
else:
liveness -= 1
if liveness == 0:
print( "W: Heartbeat failure, can't reach queue" )
print( "W: Reconnecting in %0.2fs..." % interval )
time.sleep(interval)
if interval < INTERVAL_MAX:
interval *= 2
poller.unregister(worker)
worker.setsockopt(zmq.LINGER, 0)
worker.close()
worker = worker_socket(context, poller)
liveness = HEARTBEAT_LIVENESS
if time.time() > heartbeat_at:
heartbeat_at = time.time() + HEARTBEAT_INTERVAL
print( "I: Worker heartbeat" )
worker.send(PPP_HEARTBEAT)
comments:
- 程式裡面有 failure 的模擬,實際使用時要去掉
- workers 有 reconnect 機制,類似 Lazy Pirate client 的做法,不同點在於 (a) exponential back-off (b) retries indefinitely
可任意 stop/restart queue/client/workers,client 永遠不會收到 out-of-order reply。
可用 script 啟動
python ppqueue.py &
for i in 1 2 3 4; do
python ppworker.py &
sleep 1
done
python lpclient.py &
或直接開多個 terminal 啟動
$ python ppqueue.py
$ python ppworker.py
I: Queue heartbeat
I: Normal reply
I: Worker heartbeat
I: Queue heartbeat
$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')
Heartbeating
heartbeating 解決 peer 兩端偵測連線狀態的問題,這不是 ZeroMQ 單有的問題,TCP 有 long timeout (30 mins),因此會發生 peer 無法正常取得連線狀態的問題。
以下是三個 heartbeating 的常見問題
Shrugging It Off
常常有很多 application 不做 heartbeating,但如果不做,可能會發生以下問題。
- 使用 ROUTER socket 追蹤 peers,當 peers 斷線或 reconnect,application 會發生 memory leak,變得越來越慢。
- 使用 SUB- 或 DEALER-based data 接收器,無法分辨 good silence (沒有資料) 或是 bad silence (另一端斷線)。如果接收端知道另一端已經斷線,可切換到 backup route。
- 如果使用 TCP connection,並保持靜默,在某些網路會自動斷線。因此要做 keep-alive,保持使用狀態,讓network connection 持續存活。
One-Way Heartbeats
second option: 在每個 node 每秒發送一個 heartbeat message 到 peers。當一個 node 超過 timeout 時間沒有收到資料,就將該 peer 視為斷線。但這種做法有些狀況會異常。
在 Pub-Sub socket,可用這種做法,也是唯一可用的方法。因為 SUB socket 無法發送資料給 PUB,但 PUB 可發送 "I'm alive" message 給 subscribers。
最好的方式,是沒有資料可發送時,就送 heartbeats。也可以慢速發送 heartbeat,只要接收端可以偵測 failure 就可以了。
設計時可能會遇到的問題:
發送大量資料時,可能會發生錯誤,因為 heartbeat 會在 data 後面發送,但因為 heartbeat delayed,會造成 timeout,並判斷為斷線。解決方式是將所有收到的 data,都視為 heartbeat。
在 pub-sub pattern,會因為接收端斷線,而丟棄 messages。PUSH/DEALER 會放到 queue。如果發送 heartbeat 給斷線的 peer,在該 peer 恢復連線後,會收到一堆在 queue 裡面的 heartbeat。
這個方案假設 timeout 時間長度是固定的,但實際上不一定是這樣。
Ping-Pong Heartbeat
3rd option: 使用 ping-pong dialog。一端送 ping,另一端 reply pong。ping/pong 是獨立的、沒有相關。通常是 client 發 ping,server 回應 pong。
這種方式在 all ROUTER-based brokers 可以運作。也可用上一個方法加強:將每一個 incoming data 都視為 pong,只在嗎有 data 時,發送 ping。
Heartbeating for Paranoid Pirate
Paranoid Pirate 是採用第二個方法。但第三種方法會比較簡單。heartbeat message flow 是雙向非同步的,任一端都可偵測到斷線。
在 worker 如何處理 queue 裡面接收的 heartbeats:
- 計算 "liveness",也就是在判斷是斷線前,還可以遺失幾個 heartbeats。通常設定為 3,每遺失一個 heartbeat 就減 1。
- 在 `
zmq_poll
loop 裡面 wait 時,heartbeat interval 每次為 1s - 如果在 queue 裡面收到了訊息,就重設 "liveness" 為 3
- 如果沒有訊息,就倒數 liveness
- 當 liveness 為 0,就將該 peer/queue 視為斷線
- 如果 queue 斷線,就要 destry socket,產生新的 socket,並 reconnect
- 為了避免 opening/closing 太多 sockets,在 reconnect 前要等待一段時間,每一次要 2 倍,最多是 32 秒。
以下是處理發送給 queue 的 heartbeats
- 因為只需要跟一個另一端的 queue 溝通,所以用一個變數,設定下一次要發送 heartbeat 的時間點。
- 在
zmq_poll
loop 裡面,只要超過時間,就要發送 heartbeat 到 queue。
這是 worker heartbeat code (C)
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff
…
// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
while (true) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// Receive any message from queue
liveness = HEARTBEAT_LIVENESS;
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
…
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
// Send heartbeat message to queue
}
}
queue 的部分還需要對每一個 worker 記錄 expiration time
- 使用
zmq_poll
或是 reactor 作為 application 的 main task - 在兩端啟動 heartbeating,利用failure 模擬測試,然後再實作 message flow
- 使用 simple tracing (ex: console print),可協助 peers 之間的 message trace,使用 zmsg 提供的 dump method,在訊息裡面加上遞增的 numbers,檢查是否會發生 gap
- 實際 application 的 heartbeating 必須要能設定,通常是由 peers 進行協調。有些 peer 需要 aggressive heartbeating,可設定為 10ms,有些則只需要設定為 30s
- 如果針對不同 peers 要有不同的 heartbeat,poll timeout 必須設定為最短的那個時間,不要用 infinite timeout
- 在發送 message 的 socket 實作 heartbeat,同時可讓 heartbeat 作為 network connection 的 keep-alive 機制。
Contracts and Protocols
因為 heartbeat 機制不同,Paranoid Pirate 無法 "interoperable" with Simple Pirate。interoperability 就像是 contract,也可以說是 protocol。
http://rfc.zeromq.org/ 有提供 ZeroMQ 可實作的 protocol contacts,例如 Pirate Pattern Protocol
要實作 PPP 時,要注意以下工作
- 在 READY command 有 protocol version number
- READY 及 HEARTBEAT 無法跟 requests/replies 區分開來。為了要分辨,可在 message structure 裡面加上 "message type" part
Majordomo pattern: Service Oriented Reliable Queueing
Majordomo Protocol (MDP) 擴充 PPP,增加 "service name",讓 client 指定某個服務的 request,workers 要註冊提供哪一種服務。
Paranoid Pirate queue 增加 service name 後,變成 service-oriented broker。
有兩個 contacts: (1) MDP,說明 distributed architecture (2) 定義 user application 如何跟 framework 溝通
majordomo 有兩個部分: client side, worker side。
MDP.py: Majordomo Protocol definitions
"""Majordomo Protocol definitions"""
# This is the version of MDP/Client we implement
C_CLIENT = b"MDPC01"
# This is the version of MDP/Worker we implement
W_WORKER = b"MDPW01"
# MDP/Server commands, as strings
W_READY = b"\001"
W_REQUEST = b"\002"
W_REPLY = b"\003"
W_HEARTBEAT = b"\004"
W_DISCONNECT = b"\005"
commands = [None, "READY", "REQUEST", "REPLY", "HEARTBEAT", "DISCONNECT"]
mdcliapi.py: mojordemo client api
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
import logging
import zmq
import MDP
from zhelpers import dump
class MajorDomoClient(object):
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
broker = None
ctx = None
client = None
poller = None
timeout = 2500
retries = 3
verbose = False
def __init__(self, broker, verbose=False):
self.broker = broker
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.client:
# client 已經存在時,表示要先 close,然後再 reconnect
self.poller.unregister(self.client)
self.client.close()
self.client = self.ctx.socket(zmq.REQ)
self.client.linger = 0
self.client.connect(self.broker)
self.poller.register(self.client, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
def send(self, service, request):
"""Send request to broker and get reply by hook or crook.
Takes ownership of request message and destroys it when sent.
Returns the reply message or None if there was no reply.
"""
if not isinstance(request, list):
request = [request]
request = [MDP.C_CLIENT, service] + request
if self.verbose:
logging.warn("I: send request to '%s' service: ", service)
dump(request)
reply = None
retries = self.retries
while retries > 0:
self.client.send_multipart(request)
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
break # interrupted
if items:
msg = self.client.recv_multipart()
if self.verbose:
logging.info("I: received reply:")
dump(msg)
# Don't try to handle errors, just assert noisily
# 確認收到的訊息的 frame 長度
assert len(msg) >= 3
header = msg.pop(0)
assert MDP.C_CLIENT == header
reply_service = msg.pop(0)
assert service == reply_service
reply = msg
break
else:
# 超過 timeout 時間,沒有收到 reply,必須進入 retry
if retries:
logging.warn("W: no reply, reconnecting...")
self.reconnect_to_broker()
else:
logging.warn("W: permanent error, abandoning")
break
retries -= 1
return reply
def destroy(self):
self.context.destroy()
mdclient.py: client application
"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
"""
import sys
from mdcliapi import MajorDomoClient
def main():
verbose = '-v' in sys.argv
client = MajorDomoClient("tcp://localhost:5555", verbose)
count = 0
while count < 100000:
request = b"Hello world"
try:
# 將 request 發送給 echo service
reply = client.send(b"echo", request)
except KeyboardInterrupt:
break
else:
# also break on failure to reply:
if reply is None:
break
count += 1
print( "%i requests/replies processed" % count )
if __name__ == '__main__':
main()
mdwrkapi.py: worker api
"""Majordomo Protocol Worker API, Python version
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
import logging
import time
import zmq
from zhelpers import dump
# MajorDomo protocol constants:
import MDP
class MajorDomoWorker(object):
"""Majordomo Protocol Worker API, Python version
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
broker = None
ctx = None
service = None
worker = None # Socket to broker
heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
liveness = 0 # How many attempts left
heartbeat = 2500 # Heartbeat delay, msecs
reconnect = 2500 # Reconnect delay, msecs
# Internal state
expect_reply = False # False only at start
timeout = 2500 # poller timeout
verbose = False # Print activity to stdout
# Return address, if any
reply_to = None
def __init__(self, broker, service, verbose=False):
self.broker = broker
self.service = service
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.worker:
# worker 已經存在時,表示要先 close,然後再 reconnect
self.poller.unregister(self.worker)
self.worker.close()
self.worker = self.ctx.socket(zmq.DEALER)
self.worker.linger = 0
self.worker.connect(self.broker)
self.poller.register(self.worker, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
# Register service with broker
self.send_to_broker(MDP.W_READY, self.service, [])
# If liveness hits zero, queue is considered disconnected
self.liveness = self.HEARTBEAT_LIVENESS
# 1e-3 = 1 * 10^(-3) = 1/1000
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
def send_to_broker(self, command, option=None, msg=None):
"""Send message to broker.
If no msg is provided, creates one internally
"""
if msg is None:
msg = []
elif not isinstance(msg, list):
msg = [msg]
if option:
msg = [option] + msg
msg = [b'', MDP.W_WORKER, command] + msg
if self.verbose:
logging.info("I: sending %s to broker", command)
dump(msg)
self.worker.send_multipart(msg)
def recv(self, reply=None):
"""Send reply, if any, to broker and wait for next request."""
# Format and send the reply if we were provided one
# expect_reply 只在第一次時為 False,因為第一次的 reply 為 None
assert reply is not None or not self.expect_reply
if reply is not None:
assert self.reply_to is not None
reply = [self.reply_to, b''] + reply
# ex:
# I: sending b'\x03' to broker
# ----------------------------------------
# [000]
# [006] MDPW01
# [001]
# [005] 0x00800041aa
# [000]
# [011] Hello world
self.send_to_broker(MDP.W_REPLY, msg=reply)
self.expect_reply = True
while True:
# Poll socket for a reply, with timeout
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
break # Interrupted
if items:
msg = self.worker.recv_multipart()
if self.verbose:
logging.info("I: received message from broker: ")
# I: received message from broker:
# ----------------------------------------
# [000]
# [006] MDPW01
# [001]
# [005] 0x00800041aa
# [000]
# [011] Hello world
dump(msg)
self.liveness = self.HEARTBEAT_LIVENESS
# Don't try to handle errors, just assert noisily
assert len(msg) >= 3
empty = msg.pop(0)
assert empty == b''
header = msg.pop(0)
assert header == MDP.W_WORKER
command = msg.pop(0)
if command == MDP.W_REQUEST:
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one...
self.reply_to = msg.pop(0)
# pop empty
empty = msg.pop(0)
assert empty == b''
return msg # We have a request to process
elif command == MDP.W_HEARTBEAT:
# Do nothing for heartbeats
pass
elif command == MDP.W_DISCONNECT:
self.reconnect_to_broker()
else :
logging.error("E: invalid input message: ")
dump(msg)
else:
self.liveness -= 1
if self.liveness == 0:
if self.verbose:
logging.warn("W: disconnected from broker - retrying...")
try:
time.sleep(1e-3*self.reconnect)
except KeyboardInterrupt:
break
self.reconnect_to_broker()
# Send HEARTBEAT if it's time
# 定時發送 HEARTBEAT
if time.time() > self.heartbeat_at:
self.send_to_broker(MDP.W_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3*self.heartbeat
logging.warn("W: interrupt received, killing worker...")
return None
def destroy(self):
# context.destroy depends on pyzmq >= 2.1.10
self.ctx.destroy(0)
mkworker.py: worker application
"""Majordomo Protocol worker example.
Uses the mdwrk API to hide all MDP aspects
"""
import sys
from mdwrkapi import MajorDomoWorker
def main():
verbose = '-v' in sys.argv
worker = MajorDomoWorker("tcp://localhost:5555", b"echo", verbose)
reply = None
while True:
request = worker.recv(reply)
if request is None:
break # Worker was interrupted
reply = request # Echo is complex... :-)
if __name__ == '__main__':
main()
notes about worker api
- API 為 single-threaded。這表示 worker 不會在背景發送 heartbeat。這沒關係,因為 worker 卡住時,就會停止 heartbeat,broker 就不會發送 request 給 worker
- worker 不會做 exponential back-off。因不需要這樣做,太過複雜。
- API 不會做 error reporting。由 application 自行檢查。
worker API 會在 peer 斷線重連後,自行關閉 socket 並重開一個新的 socket,這跟 Simple Pirate 及 Paranoid Pirate workers 一樣。雖然 ZeroMQ 會在 broker 斷線重啟後,worker 會自動重連,但不會跟 broker 重新註冊 worker。有兩種解決方法:比較簡單的做法,事關鰾 socket 並重開一個新的 socket。另一種做法是 broker 會在取得 worker heartbeat 時,詢問 unknown workers,並要求重新註冊。
mdbroker.py: majordomo broker
"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
"""
import logging
import sys
import time
from binascii import hexlify
import zmq
# local
import MDP
from zhelpers import dump
class Service(object):
"""a single Service"""
name = None # Service name
requests = None # List of client requests
waiting = None # List of waiting workers
def __init__(self, name):
self.name = name
self.requests = []
self.waiting = []
class Worker(object):
"""a Worker, idle or active"""
identity = None # hex Identity of worker
address = None # Address to route to
service = None # Owning service, if known
expiry = None # expires at this point, unless heartbeat
def __init__(self, identity, address, lifetime):
self.identity = identity
self.address = address
self.expiry = time.time() + 1e-3*lifetime
class MajorDomoBroker(object):
"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
"""
# We'd normally pull these from config data
INTERNAL_SERVICE_PREFIX = b"mmi."
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
HEARTBEAT_INTERVAL = 2500 # msecs
HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
# ---------------------------------------------------------------------
ctx = None # Our context
socket = None # Socket for clients & workers
poller = None # our Poller
heartbeat_at = None# When to send HEARTBEAT
services = None # known services
workers = None # known workers
waiting = None # idle workers
verbose = False # Print activity to stdout
# ---------------------------------------------------------------------
def __init__(self, verbose=False):
"""Initialize broker state."""
self.verbose = verbose
self.services = {}
self.workers = {}
self.waiting = []
self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.ROUTER)
self.socket.linger = 0
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
# ---------------------------------------------------------------------
def mediate(self):
"""Main broker work happens here"""
while True:
try:
items = self.poller.poll(self.HEARTBEAT_INTERVAL)
except KeyboardInterrupt:
break # Interrupted
if items:
msg = self.socket.recv_multipart()
if self.verbose:
logging.info("I: received message:")
# I: received message:
# ----------------------------------------
# [005] 0x00800041aa
# [000]
# [006] MDPC01
# [004] echo
# [011] Hello world
dump(msg)
sender = msg.pop(0)
empty = msg.pop(0)
assert empty == b''
header = msg.pop(0)
# 由 header 判斷是來自 client 或是 worker
if (MDP.C_CLIENT == header):
self.process_client(sender, msg)
elif (MDP.W_WORKER == header):
self.process_worker(sender, msg)
else:
logging.error("E: invalid message:")
dump(msg)
self.purge_workers()
self.send_heartbeats()
def destroy(self):
"""Disconnect all workers, destroy context."""
while self.workers:
self.delete_worker(self.workers.values()[0], True)
self.ctx.destroy(0)
def process_client(self, sender, msg):
"""Process a request coming from a client."""
assert len(msg) >= 2 # Service name + body
service = msg.pop(0)
# Set reply return address to client sender
msg = [sender, b''] + msg
if service.startswith(self.INTERNAL_SERVICE_PREFIX):
self.service_internal(service, msg)
else:
self.dispatch(self.require_service(service), msg)
def process_worker(self, sender, msg):
"""Process message sent to us by a worker."""
assert len(msg) >= 1 # At least, command
command = msg.pop(0)
worker_ready = hexlify(sender) in self.workers
worker = self.require_worker(sender)
if (MDP.W_READY == command):
assert len(msg) >= 1 # At least, a service name
service = msg.pop(0)
# Not first command in session or Reserved service name
if (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):
self.delete_worker(worker, True)
else:
# Attach worker to service and mark as idle
worker.service = self.require_service(service)
self.worker_waiting(worker)
elif (MDP.W_REPLY == command):
if (worker_ready):
# Remove & save client return envelope and insert the
# protocol header and service name, then rewrap envelope.
client = msg.pop(0)
empty = msg.pop(0) # ?
msg = [client, b'', MDP.C_CLIENT, worker.service.name] + msg
self.socket.send_multipart(msg)
self.worker_waiting(worker)
else:
self.delete_worker(worker, True)
elif (MDP.W_HEARTBEAT == command):
if (worker_ready):
worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
else:
self.delete_worker(worker, True)
elif (MDP.W_DISCONNECT == command):
self.delete_worker(worker, False)
else:
logging.error("E: invalid message:")
dump(msg)
def delete_worker(self, worker, disconnect):
"""Deletes worker from all data structures, and deletes worker."""
assert worker is not None
if disconnect:
self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)
if worker.service is not None:
worker.service.waiting.remove(worker)
self.workers.pop(worker.identity)
def require_worker(self, address):
"""Finds the worker (creates if necessary)."""
assert (address is not None)
identity = hexlify(address)
worker = self.workers.get(identity)
if (worker is None):
worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)
self.workers[identity] = worker
if self.verbose:
logging.info("I: registering new worker: %s", identity)
return worker
def require_service(self, name):
"""Locates the service (creates if necessary)."""
assert (name is not None)
service = self.services.get(name)
if (service is None):
service = Service(name)
self.services[name] = service
return service
def bind(self, endpoint):
"""Bind broker to endpoint, can call this multiple times.
We use a single socket for both clients and workers.
"""
self.socket.bind(endpoint)
logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)
def service_internal(self, service, msg):
"""Handle internal service according to 8/MMI specification"""
returncode = b"501"
if b"mmi.service" == service:
name = msg[-1]
returncode = b"200" if name in self.services else b"404"
msg[-1] = returncode
# insert the protocol header and service name after the routing envelope ([client, ''])
msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]
self.socket.send_multipart(msg)
def send_heartbeats(self):
"""Send heartbeats to idle workers if it's time"""
if (time.time() > self.heartbeat_at):
for worker in self.waiting:
self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)
self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
def purge_workers(self):
"""Look for & kill expired workers.
Workers are oldest to most recent, so we stop at the first alive worker.
"""
while self.waiting:
w = self.waiting[0]
if w.expiry < time.time():
logging.info("I: deleting expired worker: %s", w.identity)
self.delete_worker(w,False)
self.waiting.pop(0)
else:
break
def worker_waiting(self, worker):
"""This worker is now waiting for work."""
# Queue to broker and service waiting lists
self.waiting.append(worker)
worker.service.waiting.append(worker)
worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
self.dispatch(worker.service, None)
def dispatch(self, service, msg):
"""Dispatch requests to waiting workers as possible"""
assert (service is not None)
if msg is not None:# Queue message if any
service.requests.append(msg)
self.purge_workers()
while service.waiting and service.requests:
msg = service.requests.pop(0)
worker = service.waiting.pop(0)
self.waiting.remove(worker)
self.send_to_worker(worker, MDP.W_REQUEST, None, msg)
def send_to_worker(self, worker, command, option, msg=None):
"""Send message to worker.
If message is provided, sends that message.
"""
if msg is None:
msg = []
elif not isinstance(msg, list):
msg = [msg]
# Stack routing and protocol envelopes to start of message
# and routing envelope
if option is not None:
msg = [option] + msg
msg = [worker.address, b'', MDP.W_WORKER, command] + msg
if self.verbose:
logging.info("I: sending %r to worker", command)
# I: sending b'\x02' to worker
# ----------------------------------------
# [005] 0x00800041a8
# [000]
# [006] MDPW01
# [001]
# [005] 0x00800041aa
# [000]
# [011] Hello world
dump(msg)
self.socket.send_multipart(msg)
def main():
"""create and start new broker"""
verbose = '-v' in sys.argv
broker = MajorDomoBroker(verbose)
broker.bind("tcp://*:5555")
broker.mediate()
if __name__ == '__main__':
main()
broker 是一些 queue 組成的,每一個 service 都有一個 queue,會在 worker 連線時,產生 queue,另外會在每一種 service 維持一份 queue of workers。
notes about broker
- Majordomo Protocol 讓我們以單一 socket 處理 clients 及 workers。比較容易管理,因為只需要一個 ZeroMQ endpoint,大多數的 proxies 都需要兩個。
- broker 實作 MDP/0.1,包含了 broker 發送 invalid commands, heartbeating 後的斷線功能
- 可擴充為 multiple threads,每個 thread 管理一個 socket, one set of clients and workers。
- 可實作 primary/failover 或 live/live broker reliability model,因為 broker 為 stateless,由 client, worker 在 broker 異常時,自行選擇連到另一個 broker
- 範例使用 5-second heartbeats,如果要在真正的 LAN application 使用,必須減少這個時間,retry 必須要至少 10s,有足夠的時間讓 service restart
Asynchronous Majordomo pattern
前一個版本的 Majordomo 比較簡單,client 是 Simple Pirate。因為 ZeroMQ 沒有啟用 "Nagle's algorithm",無法應付 "small-packet problem",太多過短的訊息,會造成 TCP connection 效能不彰。
現在先以程式測量 round-trip 造成的影響。首先是發送並等待 reply,另一種是 batch 發送訊息,批次接收 replies。
tripping.py
"""Round-trip demonstrator
While this example runs in a single process, that is just to make
it easier to start and stop the example. Client thread signals to
main when it's ready.
"""
import sys
import threading
import time
import zmq
from zhelpers import zpipe
def client_task (ctx, pipe):
client = ctx.socket(zmq.DEALER)
client.identity = b'C'
client.connect("tcp://localhost:5555")
print( "Setting up test...\n" ),
time.sleep(0.1)
print( "Synchronous round-trip test...\n" ),
start = time.time()
requests = 10000
for r in range(requests):
client.send( b"hello" )
client.recv()
print( " %d calls/second\n" % (requests / (time.time()-start)) ),
print( "Asynchronous round-trip test...\n" ),
start = time.time()
for r in range(requests):
client.send( b"hello" )
for r in range(requests):
client.recv()
print( " %d calls/second\n" % (requests / (time.time()-start)) ),
# signal done:
pipe.send( b"done")
def worker_task():
ctx = zmq.Context()
worker = ctx.socket(zmq.DEALER)
worker.identity = b'W'
worker.connect("tcp://localhost:5556")
while True:
msg = worker.recv_multipart()
worker.send_multipart(msg)
ctx.destroy(0)
def broker_task():
# Prepare our context and sockets
ctx = zmq.Context()
frontend = ctx.socket(zmq.ROUTER)
backend = ctx.socket(zmq.ROUTER)
frontend.bind("tcp://*:5555")
backend.bind("tcp://*:5556")
# Initialize poll set
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
poller.register(frontend, zmq.POLLIN)
while True:
try:
items = dict(poller.poll())
except:
break # Interrupted
if frontend in items:
msg = frontend.recv_multipart()
msg[0] = b'W'
backend.send_multipart(msg)
if backend in items:
msg = backend.recv_multipart()
msg[0] = b'C'
frontend.send_multipart(msg)
def main():
# Create threads
ctx = zmq.Context()
client,pipe = zpipe(ctx)
client_thread = threading.Thread(target=client_task, args=(ctx, pipe))
worker_thread = threading.Thread(target=worker_task)
worker_thread.daemon=True
broker_thread = threading.Thread(target=broker_task)
broker_thread.daemon=True
worker_thread.start()
broker_thread.start()
client_thread.start()
# Wait for signal on client pipe
client.recv()
if __name__ == '__main__':
main()
執行結果
$ python tripping.py
Setting up test...
Synchronous round-trip test...
2293 calls/second
Asynchronous round-trip test...
4073 calls/second
client thread 會在啟動前暫停一陣子,這是為了避免 ROUTER 的問題,發送給還沒連線的 peer address,訊息會被丟棄。非同步批次處理的方式,比同步方式快。
接下來將剛剛的 client 改為非同步的版本
mdcliapi2.py
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
import logging
import zmq
import MDP
from zhelpers import dump
class MajorDomoClient(object):
"""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""
broker = None
ctx = None
client = None
poller = None
timeout = 2500
verbose = False
def __init__(self, broker, verbose=False):
self.broker = broker
self.verbose = verbose
self.ctx = zmq.Context()
self.poller = zmq.Poller()
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
self.reconnect_to_broker()
def reconnect_to_broker(self):
"""Connect or reconnect to broker"""
if self.client:
self.poller.unregister(self.client)
self.client.close()
self.client = self.ctx.socket(zmq.DEALER)
self.client.linger = 0
self.client.connect(self.broker)
self.poller.register(self.client, zmq.POLLIN)
if self.verbose:
logging.info("I: connecting to broker at %s...", self.broker)
def send(self, service, request):
"""Send request to broker
"""
if not isinstance(request, list):
request = [request]
# Prefix request with protocol frames
# Frame 0: empty (REQ emulation)
# Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
# Frame 2: Service name (printable string)
request = [b'', MDP.C_CLIENT, service] + request
if self.verbose:
logging.warn("I: send request to '%s' service: ", service)
dump(request)
self.client.send_multipart(request)
def recv(self):
"""Returns the reply message or None if there was no reply."""
try:
items = self.poller.poll(self.timeout)
except KeyboardInterrupt:
return # interrupted
if items:
# if we got a reply, process it
msg = self.client.recv_multipart()
if self.verbose:
logging.info("I: received reply:")
dump(msg)
# Don't try to handle errors, just assert noisily
assert len(msg) >= 4
empty = msg.pop(0)
header = msg.pop(0)
assert MDP.C_CLIENT == header
service = msg.pop(0)
return msg
else:
logging.warn("W: permanent error, abandoning request")
mdclient2.py
"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
"""
import sys
from mdcliapi2 import MajorDomoClient
def main():
verbose = '-v' in sys.argv
client = MajorDomoClient("tcp://localhost:5555", verbose)
requests = 100000
for i in range(requests):
request = b"Hello world"
try:
client.send(b"echo", request)
except KeyboardInterrupt:
print( "send interrupted, aborting" )
return
count = 0
while count < requests:
try:
reply = client.recv()
except KeyboardInterrupt:
break
else:
# also break on failure to reply:
if reply is None:
break
count += 1
print( "%i requests/replies processed" % count )
if __name__ == '__main__':
main()
- 由 REQ 改為使用 DEALER socket,在 request 前面加上 empty delimiter frame
- 不做 retry requests,由 application 自行處理 retry
- 將 synchronous send 改為 send 跟 recv 分開
- send 是 asynchronous 的,會在發送後,馬上 return
- recv 會等待 1 response 並回傳給 caller
- 測試程式會發送 100000 messages
asynchronous Majordomo pattern 有個問題,是無法處理 broker crash 的狀況。因 mdcliapi2.py 並沒有 reconnect 的機制
如要增加 reconnect 機制,要做以下工作
- 對每個 request 加上序號,並在 reply 檢查
- traking and holding 尚未收到 replies 的 requests
- 在 failover 時, client 要 resend 所有尚未收到 replies 的 requests
Service Discovery
目前已經有 service-oriented broker,但沒有檢測某種 service 是否存在的機制,只會知道 request failed。可修改 MDP/Client protocol,增加詢問 service 的 command。
另一種方式,是做類似 email 的機制,將 undeliverable requests 回傳給 sender,但需要有將 replies 跟 returned requests 區分的方法。
現在要用第一種方式,是在 MDP 上增加 Service discovery 的功能,也就是 Majordomo Management Interface (MMI),剛剛已經有實作在 broker 中。
- client 發送 service request,以 mmi. 起始,這時以 internal 方式處理訊息
- broker 只會處理一種 mmi.service,也就是 service discovery service
- request 的 payload 為 external service 的名稱
- broker 會回傳 "200" OK 或是 "404"
"""
MMI echo query example
"""
import sys
from mdcliapi import MajorDomoClient
def main():
verbose = '-v' in sys.argv
client = MajorDomoClient("tcp://localhost:5555", verbose)
request = b"echo"
reply = client.send(b"mmi.service", request)
if reply:
replycode = reply[0]
print( "Lookup echo service:", replycode )
else:
print( "E: no response from broker, make sure it's running" )
if __name__ == '__main__':
main()
Idempotent Services
Idempotent 代表是可以重複的 operation。例如 checking the clock,以下是 idempotente use cases
- stateless task distribution: a pipeline where servers are stateless workers
- name service,將 logical addres 轉為 endpoint
non-idempotent use cases
- logging service: 因不能將相同的 log 處理 2次以上
- 影響 downstream nodes 的 service,例如發送 information 給其他 nodes,如果 service 收到相同的 request,downstream nodes 會產生 duplicate information
- 會修改 shared data 的 service
如果 server application 為 non-idempotent,就要小心 crash 的狀況,如果 crash 發生在 idle 或處理 request 過程中,不會造成問題,如果是在 db transaction,如果在發送 reply 時 crash,就會發生問題。
如果是網路異常,也會出現相同的問題,client 會認為 server crash,而重送 request,server 會處理 2 次。
為了處理 non-idempotent operations,要使用 detecing and rejecting duplicate requests 的方法:
- client 要在每個 request 加上 unique client identifier,unique message number
- server 在發送 reply 前,用 client ID 及 message number 為 key 儲存起來
- server 取得 client 的 request,要先檢查是否已經有該 client ID, message number 的 reply,如果已經處理過,就不做 request,直接發送 reply
Titanic pattern: Disconnected Reliability
Majordomo 是 reliable message broker
- Lazy Pirate client 可運作得很好,direct client-to-server 或是 distributed queue proxies 都一樣。這邊假設 workers 是 stateless and idempotent
- rust 會帶來一些問題,例如 slow performance
rust-based reliability 在 asynchronous disconnected network 有用,可解決 Pirate 的問題,因為 client 會 real time 等待 reply,但 clients, workers 只會短時間連線 (類似 email),必須在 broker 增加 state。
Titanic pattern: 將message 寫入 disk,保證不會遺失。要將 MDP 增加一個 Titanic layer
- 因 divide and conquer 變得更簡單,broker 處理 message routing,worker 處理 reliability
- broker/worker 可用不同 language 實作
- fire-and-forget
如果 client 需要馬上得到 reply,
可直接跟 service 溝通,如果可以等待,就用 Titanic 處理
Titanic 同時有 worker 跟 client 的角色
- client: 請接受我的 request
- Titanic: OK
- client: 有我的 reply 嗎?
- Titanic: Yes, here it is. Or, no, not yet.
- client: OK,你可以把 request 記錄清除了
- Titanic: OK, done.
Titanic 跟 broker, worker 的溝通過程如下
- Titanic: broker, 有 coffee service 嗎?
- broker: 有
- Titanic: hey, coffee service, 請幫我處理一個 request
- coffee: Suer, here you are
- Titanic: OK
如果在處理 request 過程中,worker crashes,Titanic 會一直 retry,如果 reply lost,Titanic 會 retry。如果已經處理了 request,但 client 沒收到,client 會再詢問一次。如果 Titanic 在處理 request/reply 時 crashes, client 會重送 request。只要 request 放入 storage,work 永遠不會遺失。
client 使用 asynchronous Majordomo pattern 進行工作,然後得到 reply。
需要有讓 client 查詢 reply 的方法,但 client 會在重連後,有不同的 identities,以下是解決方法
- 每個 request 都有 UUID
- 當 client 詢問 reply,必須指定原始 request 的 UUID
理想狀況下,要把 UUID 儲存在 local database 裡面
首先了解 client 如何跟 Titanic 溝通,一種方式是使用一個 service,並用三種 request types,另一種方式,是使用三種 services
- titanic.request: 儲存 request message,回傳該 request 的 UUID
- titanic.reply: 取得特定 UUID 的 reply
- titanic.close: 確認 reply 已經收到
接下來實作 multithreaded worker,也就是 Titanic Service Protocol (TSP)
ticlient.py
"""
Titanic client example
Implements client side of http:rfc.zeromq.org/spec:9
"""
import sys
import time
from mdcliapi import MajorDomoClient
def service_call (session, service, request):
"""Calls a TSP service
Returns reponse if successful (status code 200 OK), else None
"""
reply = session.send(service, request)
if reply:
status = reply.pop(0)
if status == b"200":
return reply
elif status == b"400":
print( "E: client fatal error, aborting" )
sys.exit (1)
elif status == b"500":
print( "E: server fatal error, aborting" )
sys.exit (1)
else:
sys.exit (0); # Interrupted or failed
def main():
verbose = '-v' in sys.argv
session = MajorDomoClient("tcp://localhost:5555", verbose)
# 1. Send 'echo' request to Titanic
request = [b"echo", b"Hello world"]
reply = service_call(session, b"titanic.request", request)
uuid = None
if reply:
uuid = reply.pop(0)
print( "I: request UUID ", uuid )
# 2. Wait until we get a reply
while True:
time.sleep (.1)
request = [uuid]
reply = service_call (session, b"titanic.reply", request)
if reply:
reply_string = reply[-1]
print( "Reply:", reply_string )
# 3. Close request
request = [uuid]
reply = service_call (session, b"titanic.close", request)
break
else:
print( "I: no reply yet, trying again..." )
time.sleep(5) # Try again in 5 seconds
return 0
if __name__ == '__main__':
main()
titanic.py
"""
Titanic service
Implements server side of http:#rfc.zeromq.org/spec:9
"""
# import cPickle as pickle
import pickle
import os
import sys
import threading
import time
from uuid import uuid4
from pathlib import Path
import zmq
from mdwrkapi import MajorDomoWorker
from mdcliapi import MajorDomoClient
from zhelpers import zpipe
TITANIC_DIR = ".titanic"
def request_filename (uuid):
"""Returns freshly allocated request filename for given UUID"""
return os.path.join(TITANIC_DIR, "%s.req" % uuid)
#
def reply_filename (uuid):
"""Returns freshly allocated reply filename for given UUID"""
return os.path.join(TITANIC_DIR, "%s.rep" % uuid)
# ---------------------------------------------------------------------
# Titanic request service
def titanic_request (pipe):
worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.request")
reply = None
while True:
# Send reply if it's not null
# And then get next request from broker
request = worker.recv(reply)
if not request:
break # Interrupted, exit
# Ensure message directory exists
if not os.path.exists(TITANIC_DIR):
os.mkdir(TITANIC_DIR)
# Generate UUID and save message to disk
uuid = uuid4().hex
filename = request_filename (uuid)
with open(filename, 'wb') as f:
pickle.dump(request, f)
# Send UUID through to message queue
uuidframe = uuid.encode()
print( "uuid is {}".format(uuidframe) )
pipe.send( uuidframe )
# Now send UUID back to client
# Done by the worker.recv() at the top of the loop
reply = [b"200", uuidframe ]
# ---------------------------------------------------------------------
# Titanic reply service
def titanic_reply ():
worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.reply")
reply = None
while True:
request = worker.recv(reply)
if not request:
break # Interrupted, exit
uuid = request.pop(0).decode()
req_filename = request_filename(uuid)
rep_filename = reply_filename(uuid)
print("req_filename={}, rep_filename={}".format(req_filename, rep_filename))
if os.path.exists(rep_filename):
with open(rep_filename, 'rb') as f:
reply = pickle.load(f)
reply = [b"200"] + reply
else:
if os.path.exists(req_filename):
reply = [b"300"] # pending
else:
reply = [b"400"] # unknown
# ---------------------------------------------------------------------
# Titanic close service
def titanic_close():
worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.close")
reply = None
while True:
request = worker.recv(reply)
if not request:
break # Interrupted, exit
uuid = request.pop(0).decode()
req_filename = request_filename(uuid)
rep_filename = reply_filename(uuid)
# should these be protected? Does zfile_delete ignore files
# that have already been removed? That's what we are doing here.
if os.path.exists(req_filename):
os.remove(req_filename)
if os.path.exists(rep_filename):
os.remove(rep_filename)
reply = [b"200"]
def service_success(client, uuid):
"""Attempt to process a single request, return True if successful"""
# Load request message, service will be first frame
filename = request_filename (uuid)
# print("service_success filename = {}".format(filename))
# If the client already closed request, treat as successful
if not os.path.exists(filename):
return True
with open(filename, 'rb') as f:
request = pickle.load(f)
service = request.pop(0)
# Use MMI protocol to check if service is available
mmi_request = [service]
mmi_reply = client.send(b"mmi.service", mmi_request)
service_ok = mmi_reply and mmi_reply[0] == b"200"
if service_ok:
reply = client.send(service, request)
if reply:
filename = reply_filename (uuid)
with open(filename, "wb") as f:
pickle.dump(reply, f)
return True
return False
def main():
basedir = os.path.join(TITANIC_DIR)
if not os.path.exists(basedir):
os.makedirs(basedir)
queuefile = os.path.join(TITANIC_DIR, 'queue')
Path(queuefile).touch()
verbose = '-v' in sys.argv
ctx = zmq.Context()
# Create MDP client session with short timeout
client = MajorDomoClient("tcp://localhost:5555", verbose)
client.timeout = 1000 # 1 sec
client.retries = 1 # only 1 retry
request_pipe, peer = zpipe(ctx)
request_thread = threading.Thread(target=titanic_request, args=(peer,))
request_thread.daemon = True
request_thread.start()
reply_thread = threading.Thread(target=titanic_reply)
reply_thread.daemon = True
reply_thread.start()
close_thread = threading.Thread(target=titanic_close)
close_thread.daemon = True
close_thread.start()
poller = zmq.Poller()
poller.register(request_pipe, zmq.POLLIN)
# Main dispatcher loop
while True:
# Ensure message directory exists
if not os.path.exists(TITANIC_DIR):
os.mkdir(TITANIC_DIR)
# We'll dispatch once per second, if there's no activity
try:
items = poller.poll(1000)
except KeyboardInterrupt:
break; # Interrupted
if items:
# Append UUID to queue, prefixed with '-' for pending
uuid = request_pipe.recv().decode()
# print("append to queue: {}".format(uuid))
with open(os.path.join(TITANIC_DIR, 'queue'), 'a') as f:
f.write("-%s\n" % uuid)
# Brute-force dispatcher
#
with open(os.path.join(TITANIC_DIR, 'queue'), 'r+b') as f:
for entry in f.readlines():
# UUID is prefixed with '-' if still waiting
entry = entry.decode()
# print("entry = {}".format(entry))
if entry[0] == '-':
uuid = entry[1:].rstrip() # rstrip '\n' etc.
print( "I: processing request %s" % uuid )
if service_success(client, uuid):
# mark queue entry as processed
here = f.tell()
f.seek(-1*len(entry), os.SEEK_CUR)
f.write(b'+')
f.seek(here, os.SEEK_SET)
if __name__ == '__main__':
main()
執行
$ python mdbroker.py -v
$ python mdworker.py -v
$ python titanic.py -v
2018-09-26 17:15:42 I: connecting to broker at tcp://localhost:5555...
uuid is b'a8398b5a2419455598a064936336ddf7'
I: processing request a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: send request to 'b'mmi.service'' service:
----------------------------------------
[006] MDPC01
[011] mmi.service
[004] echo
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[011] mmi.service
[003] 200
2018-09-26 17:16:02 I: send request to 'b'echo'' service:
----------------------------------------
[006] MDPC01
[004] echo
[011] Hello world
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[004] echo
[011] Hello world
req_filename=.titanic/a8398b5a2419455598a064936336ddf7.req, rep_filename=.titanic/a8398b5a2419455598a064936336ddf7.rep
$ python ticlient.py -v
2018-09-26 17:16:00 I: connecting to broker at tcp://localhost:5555...
2018-09-26 17:16:00 I: send request to 'b'titanic.request'' service:
----------------------------------------
[006] MDPC01
[015] titanic.request
[004] echo
[011] Hello world
2018-09-26 17:16:02 W: no reply, reconnecting...
2018-09-26 17:16:02 I: connecting to broker at tcp://localhost:5555...
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[015] titanic.request
[003] 200
[032] a8398b5a2419455598a064936336ddf7
I: request UUID b'a8398b5a2419455598a064936336ddf7'
2018-09-26 17:16:02 I: send request to 'b'titanic.reply'' service:
----------------------------------------
[006] MDPC01
[013] titanic.reply
[032] a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[013] titanic.reply
[003] 200
[011] Hello world
Reply: b'Hello world'
2018-09-26 17:16:02 I: send request to 'b'titanic.close'' service:
----------------------------------------
[006] MDPC01
[013] titanic.close
[032] a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[013] titanic.close
[003] 200
- 因 tinatic 同時有 client, worker 的身份,所以有 sending 及 receving message loops
- titanic broker 以 MMI service discovery protocol 發送 requests
使用 inproc 發送 request data 給 titanic.request
使用一個檔案,處理所有 data
不建議用 database,直接使用檔案,效能比 db 好
如果要讓 titanic 更 reliable,可以複製 request 到 2nd server
如果要讓 titanic much faster and less reliable,可將 request, replies 存在 memory
Binary Star pattern: High Availability Pair
Binary Star pattern 將兩個 servers 做成 primary-backup high-availablity pair,是 active-standby方式。
發生 failover 時
recovery from failover works:
- operators 重新啟動 primary server,修正發生的問題
- operators 突然停掉 backup server,短時間會造成 discruption to application
- 當 application 重新連到 primary server,operators 會重新啟動 backup server
recovery 要手動處理,因為自動處理可能會有問題,以下是原因:
- failover 會讓 server 停止提供服務給 application,可能會有 10-30s,但會比完全無法使用還好。但 recovery 會再造成另一次 10-30s outage,最好是在離峰時間處理。
- 緊急狀況發生,最優先要把東西修好。auto recovery 可能會讓 system admin 不知道現在是哪一台機器在運作
- auto recovery 可能會是因為 netowrk fail over,會由不同地區的人員分析問並解決問題
Binary Star Pattern 會在 backup server fails 時,再回到 primary server,這也是觸動 recovery 的機制。
Binary Start pair 的關機程序,有以下兩種方式
- stop passive server -> stop active server later
- 任意順序,在幾秒內 stop both servers
如果先停止 active server 在關掉 passive server,會讓 application 發生 disconnect, reconnect 然後再 disconnect 的狀況。
Detailed Requirements for HA architecture
- failover 表示發生了毀滅性 system failure 災難,例如硬體失效、火災...。一般性的 server crash 都有簡單的方法可以處理。
- failover 時間要在 60s 內處理完成,最好是 10s 內
- failover 要自動處理,但 recovery 是手動處理。application 要能自動切換到 backup server,但在 operators 修復前,都不要自動切換回原本的 primary server。要選擇適當的時間,再中斷一次 application。
- failover 程序在 client application 要讓 developer 容易使用,應該要隱藏在 client API 裡面
- 要有明確的步驟,讓 network architects 迴避可能會發生 split brain syndrome (兩個 server 都認爲自己是active) 的網路架構
- 兩個 server 啟動沒有固定的先後順序
- 在沒有停止 client application 時,要能提供 planned stops 並 restart 任何一台 server 的機制
- operators 必須要隨時能夠監控 both servers
- 使用 high-speed dedicated network connection連接兩台 servers,failover synchronization 要使用特定的 IP route
有以下假設
- 單一 backup server 就足夠,不需要多層 backups
- primary 與 backup server 有相同運算能力,可處理相同的 application load,不需要 load balance 到其他 servers
- 有足夠預算,可以提供一台冗餘 backup server,平常沒在工作
不會討論到以下 issues:
- 使用 active backup server 或是 load balancing。在 Binary Star Pair,backup server 平常都是 inactive,沒有在工作。
- 要處理 persistent messages/transactions。假設有 unreliable server 或是 Binary Star pairs
- Binary Star Pair 需要確切的網路架構,且要讓 application 知道 (configuration data)
- 在 servers 之間會做 state/messages replication。server-side state 必須在 failover 時,進行重建。
在 Binary Start 主要的關鍵用語:
- Primary: 一開始 active 的 server
- Backup: passive server
- Active: 接受 client 連線的 server,最多只有一台 active server
- Passive: 正常狀況下,primary server 為 active,backup 為 passive,但 failover 發生時,角色會對調。
Binary Star Pair 必須要設定:
- 要告訴 primary server,backup server 在哪裡
- 要告訴 backup server,primary server 在哪裡
- optional: 可調整 failover response time,兩台 servers 的設定要相同
範例中,failover timeout 為 2000ms,如果將 primary server 包裝在 shell script,然後設定自動 restart,failover timeout 時間必需設定超過 primary server restart 所需要的時間。
為了讓 client app 在 Binary Start Pair 運作,必須要
- 知道 primary & backup servers 的 addresses
- 先連接 primary server,異常時,連接到 backup server
- 偵測 failed connection,通常是用 heartbeat
- 先嘗試重連到 primary,再嘗試 backup,retries 中間的delay時間,要超過 server failover timeout 的時間
- 能重建在某個 server 的所有 state
- 如果需要有 reliable message, 在 failover 時,要重送 messages
Binary Star Pattern 的限制:
- server process 不能有超過 1 個 Binary Star Pair
- primary server 只能有一台 backup server,不能超過
- passive server 平常沒有在工作
- backup server 必須要能完整處理所有 application loads
- failover 設定不能在 runtime 時修改
- client application 必須因應 failover 做一些修改
Preventing Split-Brain Syndrome
Split-Brain Syndrome 發生在 clusters 成員中,部分成員同時認為自己為 active server。她會造成 applications 無法互相知道對方的問題。Binary Star 有偵測並解除 Split-Brain 的方法,是用 three-way decision mechanism: server 在收到 application connection requests 以前,都無法判斷自己是 active,也無法看到 peer server。
然而還是有可能會有一種網路架構,會讓這個 algorithm 誤判。例如 Binary Start Pair 分在兩個建築物中,每一個建築物中,都有一組 applications,在兩個建築物中,有一條 network,將該 network 斷線,會形成兩組 applications,每一組各有一半的 Binary Star Pair,而兩個 server 都是 active。
要解決 split-brain 問題,必須將兩台 servers 以 dedicated network link 連接,最簡單的方法是連到同一台 switch,或是直接用 crossover 網路線連在一起。
不能將 Binary Star Pair 分在兩個不同的地點,如果是這樣,必須改用 federation 而不是 high-availability failover。
適當的 paranoid network 設定要使用兩個 private cluster interconnects,而不是一個。另外要用不同的網路卡。目的是區分 network failure 的問題。
Binary Star Implementation
primary 跟 backup server 是用相同的程式,在啟動時決定是哪一種角色
bstarsrv.py: binary star server
# Binary Star Server
from argparse import ArgumentParser
import time
from zhelpers import zmq
STATE_PRIMARY = 1
STATE_BACKUP = 2
STATE_ACTIVE = 3
STATE_PASSIVE = 4
PEER_PRIMARY = 1
PEER_BACKUP = 2
PEER_ACTIVE = 3
PEER_PASSIVE = 4
CLIENT_REQUEST = 5
HEARTBEAT = 1000
class BStarState(object):
def __init__(self, state, event, peer_expiry):
self.state = state
self.event = event
self.peer_expiry = peer_expiry
class BStarException(Exception):
pass
fsm_states = {
# 原本是 primary
# 收到 PEER_BACKUP 會變成 active
# 收到 PEER_ACTIVE,會變成 passive
STATE_PRIMARY: {
PEER_BACKUP: ("I: connected to backup (slave), ready as master",
STATE_ACTIVE),
PEER_ACTIVE: ("I: connected to backup (master), ready as slave",
STATE_PASSIVE)
},
# 原本是 backup
# 收到 PEER_ACTIVE,會變成 passive
# 收到 CLIENT_REQUEST,異常,不異動狀態
STATE_BACKUP: {
PEER_ACTIVE: ("I: connected to primary (master), ready as slave",
STATE_PASSIVE),
CLIENT_REQUEST: ("", False)
},
# 原本是 active
# 收到 PEER_ACTIVE,異常
STATE_ACTIVE: {
PEER_ACTIVE: ("E: fatal error - dual masters, aborting", False)
},
# 原本是 passive
# 收到 PEER_PRIMARY,會變成 active
# 收到 PEER_BACKUP,會變成 active
# 收到 PEER_PASSIVE,異常
# 收到 CLIENT_REQUEST,維持不變
STATE_PASSIVE: {
PEER_PRIMARY: ("I: primary (slave) is restarting, ready as master",
STATE_ACTIVE),
PEER_BACKUP: ("I: backup (slave) is restarting, ready as master",
STATE_ACTIVE),
PEER_PASSIVE: ("E: fatal error - dual slaves, aborting", False),
CLIENT_REQUEST: (CLIENT_REQUEST, True) # Say true, check peer later
}
}
def run_fsm(fsm):
# There are some transitional states we do not want to handle
state_dict = fsm_states.get(fsm.state, {})
res = state_dict.get(fsm.event)
if res:
msg, state = res
else:
return
# state 為 False 表示 fsm 狀態異常
if state is False:
raise BStarException(msg)
elif msg == CLIENT_REQUEST:
assert fsm.peer_expiry > 0
if int(time.time() * 1000) > fsm.peer_expiry:
fsm.state = STATE_ACTIVE
else:
raise BStarException()
else:
print(msg)
fsm.state = state
def main():
parser = ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument("-p", "--primary", action="store_true", default=False)
group.add_argument("-b", "--backup", action="store_true", default=False)
args = parser.parse_args()
ctx = zmq.Context()
statepub = ctx.socket(zmq.PUB)
statesub = ctx.socket(zmq.SUB)
statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
frontend = ctx.socket(zmq.ROUTER)
fsm = BStarState(0, 0, 0)
if args.primary:
print("I: Primary master, waiting for backup (slave)")
frontend.bind("tcp://*:5001")
statepub.bind("tcp://*:5003")
statesub.connect("tcp://localhost:5004")
fsm.state = STATE_PRIMARY
elif args.backup:
print("I: Backup slave, waiting for primary (master)")
frontend.bind("tcp://*:5002")
statepub.bind("tcp://*:5004")
statesub.connect("tcp://localhost:5003")
statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
fsm.state = STATE_BACKUP
send_state_at = int(time.time() * 1000 + HEARTBEAT)
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(statesub, zmq.POLLIN)
while True:
time_left = send_state_at - int(time.time() * 1000)
if time_left < 0:
time_left = 0
socks = dict(poller.poll(time_left))
if socks.get(frontend) == zmq.POLLIN:
msg = frontend.recv_multipart()
fsm.event = CLIENT_REQUEST
try:
run_fsm(fsm)
frontend.send_multipart(msg)
except BStarException:
del msg
if socks.get(statesub) == zmq.POLLIN:
msg = statesub.recv()
fsm.event = int(msg)
del msg
try:
run_fsm(fsm)
fsm.peer_expiry = int(time.time() * 1000) + (2 * HEARTBEAT)
except BStarException:
break
# 超過要發布 state 的時間
if int(time.time() * 1000) >= send_state_at:
statepub.send_string("%d" % fsm.state)
# 更新下一次發送 state 的時間
send_state_at = int(time.time() * 1000) + HEARTBEAT
if __name__ == '__main__':
main()
bstarcli.py: binary star client
from time import sleep
import zmq
REQUEST_TIMEOUT = 1000 # msecs
SETTLE_DELAY = 2000 # before failing over
def main():
server = ['tcp://localhost:5001', 'tcp://localhost:5002']
server_nbr = 0
ctx = zmq.Context()
# client 先連到 primary server
client = ctx.socket(zmq.REQ)
client.connect(server[server_nbr])
poller = zmq.Poller()
poller.register(client, zmq.POLLIN)
sequence = 0
while True:
client.send_string("%s" % sequence)
expect_reply = True
while expect_reply:
socks = dict(poller.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv_string()
if int(reply) == sequence:
print("I: server replied OK (%s)" % reply)
expect_reply = False
sequence += 1
sleep(1)
else:
print("E: malformed reply from server: %s" % reply)
else:
# 接收 reply 的 timeout,表示 server 異常,就連接到下一台 server,重發訊息
print("W: no response from server, failing over")
sleep(SETTLE_DELAY / 1000)
poller.unregister(client)
client.close()
server_nbr = (server_nbr + 1) % 2
print("I: connecting to server at %s.." % server[server_nbr])
client = ctx.socket(zmq.REQ)
poller.register(client, zmq.POLLIN)
# reconnect and resend request
client.connect(server[server_nbr])
client.send_string("%s" % sequence)
if __name__ == '__main__':
main()
執行結果
$ python bstarsrv.py -p
I: Primary master, waiting for backup (slave)
I: connected to backup (slave), ready as master
^C
$ python bstarsrv.py -b
I: Backup slave, waiting for primary (master)
I: connected to primary (master), ready as slave
$ python bstarcli.py
I: server replied OK (0)
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
I: server replied OK (4)
I: server replied OK (5)
W: no response from server, failing over
I: connecting to server at tcp://localhost:5002..
I: server replied OK (6)
I: server replied OK (7)
I: server replied OK (8)
I: server replied OK (9)
I: server replied OK (10)
I: server replied OK (11)
I: server replied OK (12)
kill primary server,就可造成 failover,然後 restart primary,再 kill backup,就會 recover。
Binary Star 是用 finite state machine 實作,Events 為 peer state,"Peer Active" 代表另一個 server 通知我們,他是 active。"Client Request" 代表收到 client request。"Client Vote" 代表收到 client request,且 peer 已經 2 heartbeat 為 inactive。
servers 使用 PUB-SUB socket 用作 state 交換,沒有其他 socket combination。PUSH 及 DEALER 會在沒有 peer ready 時,block並不接收 message。PAIR 在 peer 斷線重新啟動後,不會自動 reconnect。ROUTER 需要知道 peer address 用來發送訊息。
Binary Star Reactor
Binary Star 可用來封裝成 reactor class。reactor 用在需要處理訊息後,會比 copy/paste Binary Star code 還方便。
bstar.py: Binary Star core class
"""
Binary Star server
"""
import time
import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream
# States we can be in at any point in time
STATE_PRIMARY = 1 # Primary, waiting for peer to connect
STATE_BACKUP = 2 # Backup, waiting for peer to connect
STATE_ACTIVE = 3 # Active - accepting connections
STATE_PASSIVE = 4 # Passive - not accepting connections
# Events, which start with the states our peer can be in
PEER_PRIMARY = 1 # HA peer is pending primary
PEER_BACKUP = 2 # HA peer is pending backup
PEER_ACTIVE = 3 # HA peer is active
PEER_PASSIVE = 4 # HA peer is passive
CLIENT_REQUEST = 5 # Client makes request
# We send state information every this often
# If peer doesn't respond in two heartbeats, it is 'dead'
HEARTBEAT = 1000 # In msecs
class FSMError(Exception):
"""Exception class for invalid state"""
pass
class BinaryStar(object):
def __init__(self, primary, local, remote):
# initialize the Binary Star
self.ctx = zmq.Context() # Our private context
self.loop = IOLoop.instance() # Reactor loop
self.state = STATE_PRIMARY if primary else STATE_BACKUP
self.event = None # Current event
self.peer_expiry = 0 # When peer is considered 'dead'
self.voter_callback = None # Voting socket handler
self.master_callback = None # Call when become master
self.slave_callback = None # Call when become slave
# Create publisher for state going to peer
self.statepub = self.ctx.socket(zmq.PUB)
self.statepub.bind(local)
# Create subscriber for state coming from peer
self.statesub = self.ctx.socket(zmq.SUB)
self.statesub.setsockopt_string(zmq.SUBSCRIBE, u'')
self.statesub.connect(remote)
# wrap statesub in ZMQStream for event triggers
self.statesub = ZMQStream(self.statesub, self.loop)
# setup basic reactor events
self.heartbeat = PeriodicCallback(self.send_state,
HEARTBEAT, self.loop)
self.statesub.on_recv(self.recv_state)
def update_peer_expiry(self):
"""Update peer expiry time to be 2 heartbeats from now."""
self.peer_expiry = time.time() + 2e-3 * HEARTBEAT
def start(self):
self.update_peer_expiry()
self.heartbeat.start()
return self.loop.start()
def execute_fsm(self):
"""Binary Star finite state machine (applies event to state)
returns True if connections should be accepted, False otherwise.
"""
accept = True
if self.state == STATE_PRIMARY:
# Primary server is waiting for peer to connect
# Accepts CLIENT_REQUEST events in this state
if self.event == PEER_BACKUP:
print("I: connected to backup (slave), ready as master")
self.state = STATE_ACTIVE
if self.master_callback:
self.loop.add_callback(self.master_callback)
elif self.event == PEER_ACTIVE:
print("I: connected to backup (master), ready as slave")
self.state = STATE_PASSIVE
if self.slave_callback:
self.loop.add_callback(self.slave_callback)
elif self.event == CLIENT_REQUEST:
if time.time() >= self.peer_expiry:
print("I: request from client, ready as master")
self.state = STATE_ACTIVE
if self.master_callback:
self.loop.add_callback(self.master_callback)
else:
# don't respond to clients yet - we don't know if
# the backup is currently Active as a result of
# a successful failover
accept = False
elif self.state == STATE_BACKUP:
# Backup server is waiting for peer to connect
# Rejects CLIENT_REQUEST events in this state
if self.event == PEER_ACTIVE:
print("I: connected to primary (master), ready as slave")
self.state = STATE_PASSIVE
if self.slave_callback:
self.loop.add_callback(self.slave_callback)
elif self.event == CLIENT_REQUEST:
accept = False
elif self.state == STATE_ACTIVE:
# Server is active
# Accepts CLIENT_REQUEST events in this state
# The only way out of ACTIVE is death
if self.event == PEER_ACTIVE:
# Two masters would mean split-brain
print("E: fatal error - dual masters, aborting")
raise FSMError("Dual Masters")
elif self.state == STATE_PASSIVE:
# Server is passive
# CLIENT_REQUEST events can trigger failover if peer looks dead
if self.event == PEER_PRIMARY:
# Peer is restarting - become active, peer will go passive
print("I: primary (slave) is restarting, ready as master")
self.state = STATE_ACTIVE
elif self.event == PEER_BACKUP:
# Peer is restarting - become active, peer will go passive
print("I: backup (slave) is restarting, ready as master")
self.state = STATE_ACTIVE
elif self.event == PEER_PASSIVE:
# Two passives would mean cluster would be non-responsive
print("E: fatal error - dual slaves, aborting")
raise FSMError("Dual slaves")
elif self.event == CLIENT_REQUEST:
# Peer becomes master if timeout has passed
# It's the client request that triggers the failover
assert self.peer_expiry > 0
if time.time() >= self.peer_expiry:
# If peer is dead, switch to the active state
print("I: failover successful, ready as master")
self.state = STATE_ACTIVE
else:
# If peer is alive, reject connections
accept = False
# Call state change handler if necessary
if self.state == STATE_ACTIVE and self.master_callback:
self.loop.add_callback(self.master_callback)
return accept
# ---------------------------------------------------------------------
# Reactor event handlers...
def send_state(self):
"""Publish our state to peer"""
self.statepub.send_string("%d" % self.state)
def recv_state(self, msg):
"""Receive state from peer, execute finite state machine"""
state = msg[0]
if state:
self.event = int(state)
self.update_peer_expiry()
self.execute_fsm()
def voter_ready(self, msg):
"""Application wants to speak to us, see if it's possible"""
# If server can accept input now, call appl handler
self.event = CLIENT_REQUEST
if self.execute_fsm():
print("CLIENT REQUEST")
self.voter_callback(self.voter_socket, msg)
else:
# Message will be ignored
pass
# -------------------------------------------------------------------------
#
def register_voter(self, endpoint, type, handler):
"""Create socket, bind to local endpoint, and register as reader for
voting. The socket will only be available if the Binary Star state
machine allows it. Input on the socket will act as a "vote" in the
Binary Star scheme. We require exactly one voter per bstar instance.
handler will always be called with two arguments: (socket,msg)
where socket is the one we are creating here, and msg is the message
that triggered the POLLIN event.
"""
assert self.voter_callback is None
socket = self.ctx.socket(type)
socket.bind(endpoint)
self.voter_socket = socket
self.voter_callback = handler
stream = ZMQStream(socket, self.loop)
stream.on_recv(self.voter_ready)
bstarsrv2.py: Binary Star server
"""
Binary Star server, using bstar reactor
"""
import sys
import zmq
from bstar import BinaryStar
def echo(socket, msg):
"""Echo service"""
socket.send_multipart(msg)
def main():
# Arguments can be either of:
# -p primary server, at tcp://localhost:5001
# -b backup server, at tcp://localhost:5002
if '-p' in sys.argv:
star = BinaryStar(True, "tcp://*:5003", "tcp://localhost:5004")
star.register_voter("tcp://*:5001", zmq.ROUTER, echo)
elif '-b' in sys.argv:
star = BinaryStar(False, "tcp://*:5004", "tcp://localhost:5003")
star.register_voter("tcp://*:5002", zmq.ROUTER, echo)
else:
print("Usage: bstarsrv2.py { -p | -b }\n")
return
star.start()
if __name__ == '__main__':
main()
bstarsrv2 啟動時發生 tornado error,可能要把 tornado 由 5.1 降至 4.5 才行,沒有測試不確定。
Freelance pattern: Brokerless Reliability
distributed peer-to-peer architecture: Freelance Pattern
use case 為 name resolution service
ZeroMQ 常見的架構問題:如何得知要連接到哪一個 endpoint?能不能不要 hard-coding,也不要用設定檔。
ZeroMQ name service 有以下工作
- 解析 logical name 為 bind endpoint,及 connect endpoint。實際上要能提供多個 bind endpoints、多個 connect endpoints會更好。
- 可管理多個平行環境,例如在不需要修改程式,區分 "test" 及 "production"
- application 在 name service 異常時,就無法連接到網路,所以 name service 必須要 reliable
可將 name service 放在 service-oriented Majordomo broker,name service 變成唯一的 global network endpoint,client 只需要設定 name service 的 endpoint。
要處理的異常有 server crashes and restarts, server busy looping, server overload, network issues。要能 reliable,必須建立 pool of name servers。實際上,兩個就夠了。
架構中,多個 clients 直接連接到少量 servers,servers 會 bind addresses,這跟 Majordomo 不同。clients 有幾種實作方式:
- REQ sockets 及 Lazy Pirate Pattern。實作簡單,但需要增加一些 code,否則 client 回一直嘗試連到 dead servers
- DEALER sockets 及 blast out requests (load balanced to all connected servers) 直到取得 reply 為止
- ROUTER sockets。clients 可使用特定 servers,但 client 要如何知道 identity of the server sockets? server 可 ping client,或是 server hard-coded,使用固定的 identity。
Model 1: Simple Retry and Failover
將 Lazy Pirate 改寫為跟多個 server endpoints 運作
flserver1.py: Freelance server
#
# Freelance server - Model 1
# Trivial echo service
import sys
import zmq
if len(sys.argv) < 2:
print( "I: Syntax: %s <endpoint>" % sys.argv[0] )
sys.exit(0)
endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)
print( "I: Echo service is ready at %s" % endpoint )
while True:
msg = server.recv_multipart()
if not msg:
break # Interrupted
server.send_multipart(msg)
server.setsockopt(zmq.LINGER, 0) # Terminate immediately
flclient1.py
#
# Freelance Client - Model 1
# Uses REQ socket to query one or more services
import sys
import time
import zmq
REQUEST_TIMEOUT = 1000 # ms
MAX_RETRIES = 3 # Before we abandon
def try_request(ctx, endpoint, request):
print( "I: Trying echo service at %s..." % endpoint )
client = ctx.socket(zmq.REQ)
client.setsockopt(zmq.LINGER, 0) # Terminate early
client.connect(endpoint)
client.send(request)
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv_multipart()
else:
reply = ''
poll.unregister(client)
client.close()
return reply
context = zmq.Context()
request = b"Hello world"
reply = None
endpoints = len(sys.argv) - 1
if endpoints == 0:
print( "I: syntax: %s <endpoint> ..." % sys.argv[0] )
elif endpoints == 1:
# For one endpoint, we retry N times
endpoint = sys.argv[1]
for retries in range(MAX_RETRIES):
reply = try_request(context, endpoint, request)
if reply:
break # Success
print( "W: No response from %s, retrying" % endpoint )
else:
# For multiple endpoints, try each at most once
for endpoint in sys.argv[1:]:
reply = try_request(context, endpoint, request)
if reply:
break # Success
print( "W: No response from %s" % endpoint )
if reply:
print( "Service is running OK" )
執行結果
$python flserver1.py tcp://*:5555
I: Echo service is ready at tcp://*:5555
$python flserver1.py tcp://*:5556
I: Echo service is ready at tcp://*:5556
$python flclient1.py tcp://localhost:5555 tcp:/localhost:5556
I: Trying echo service at tcp://localhost:5555...
Service is running OK
雖然是 Lazy Pirate,client 只需要取得一個 reply,但有兩個部分要注意
- 如果是 single server,client 會重試很多次,就像 Lazy Pirate 一樣
- 如果是 multiple servers,client 會嘗試每一個 server 一次,直到收到 reply,或是已經試過所有的 servers
這個方式解決 Lazy Pirate 的問題:無法 fail over 到 backup/alternate servers
但正式環境不能用這個方法,如果使用了多個 sockets,primary name server 掛了,就會一直遇到 timeout。
Model 2: Brutal Shotgun Massacre
改用 DEALER socket,要確保 shortest possible time 時間內,取得 reply。
client 改用以下方法:
- 設定後,連接所有 servers
- 如果有 request,就全發給所有的 servers
- 等待第一個 reply,忽略其他 replies
client 會取得多個 replies,不確定會有幾個。requests, replies 都可能會遺失。
在 request 增加 sequence number,並忽略錯誤的 number 的 replies
flserver2.py
#
# Freelance server - Model 2
# Does some work, replies OK, with message sequencing
import sys
import zmq
if len(sys.argv) < 2:
print( "I: Syntax: %s <endpoint>" % sys.argv[0] )
sys.exit(0)
endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)
print( "I: Service is ready at %s" % endpoint )
while True:
request = server.recv_multipart()
if not request:
break # Interrupted
# Fail nastily if run against wrong client
assert len(request) == 2
address = request[0]
reply = [address, b"OK"]
server.send_multipart(reply)
server.setsockopt(zmq.LINGER, 0) # Terminate early
flclient2.py
#
# Freelance Client - Model 2
# Uses DEALER socket to blast one or more services
import sys
import time
import zmq
GLOBAL_TIMEOUT = 2500 # ms
class FLClient(object):
def __init__(self):
self.servers = 0
self.sequence = 0
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER) # DEALER
def destroy(self):
self.socket.setsockopt(zmq.LINGER, 0) # Terminate early
self.socket.close()
self.context.term()
def connect(self, endpoint):
self.socket.connect(endpoint)
self.servers += 1
print( "I: Connected to %s" % endpoint )
def request(self, *request):
# Prefix request with sequence number and empty envelope
self.sequence += 1
msg = [b'', str(self.sequence).encode() ] + list(request)
# Blast the request to all connected servers
for server in range(self.servers):
self.socket.send_multipart(msg)
# Wait for a matching reply to arrive from anywhere
# Since we can poll several times, calculate each one
poll = zmq.Poller()
poll.register(self.socket, zmq.POLLIN)
reply = None
endtime = time.time() + GLOBAL_TIMEOUT / 1000
while time.time() < endtime:
socks = dict(poll.poll((endtime - time.time()) * 1000))
if socks.get(self.socket) == zmq.POLLIN:
reply = self.socket.recv_multipart()
assert len(reply) == 3
sequence = int(reply[1].decode())
if sequence == self.sequence:
break
return reply
if len(sys.argv) == 1:
print( "I: Usage: %s <endpoint> ..." % sys.argv[0] )
sys.exit(0)
# Create new freelance client object
client = FLClient()
for endpoint in sys.argv[1:]:
client.connect(endpoint)
start = time.time()
for requests in range(10000):
request = b"random name"
reply = client.request(request)
if not reply:
print( "E: Name service not available, aborting" )
break
print( "Average round trip cost: {} usec".format((time.time() - start) / 100) )
client.destroy()
執行結果
$python flserver2.py tcp://*:5555
I: Service is ready at tcp://*:5555
$python flclient2.py tcp://localhost:5555
I: Connected to tcp://localhost:5555
Average round trip cost: 0.024947671890258788 usec
client 實作要注意
- client 要用 class-based API 封裝
- cloent 在數秒內,沒有連到任何 respoinsive server,就會放棄
- client 會產生 valid REP envelope,增加 empty frame
client 會執行 10000 name resolution request,並測量 average cost
優點:
- simple,容易實作
- 有failover 功能,運作快,只要至少有一個 server 運作即可
缺點
- 會產生 redundant network traffic
- 無法設定 server 優先順序
- server 一次只能處理一個 request
Model 3: Complex and Nasty
改用 ROUTER socket,可發送 request 給特定 servers,避免使用 dead servers。
在 ROUTER-ROUTER 之間,兩端都需要 identity,必須要收到第一個 message 後,才會產生 identity。唯一的解決方式,是在一個方向採用 hard-coded identities。
使用 connection endpoint 為 identity。
ZeroMQ identities 運作方式為,server ROUTER socket 會在 bind socket 前,設定 identity,在 client 連線時,會交換 identities,然後才會發送真正的訊息。
client ROUTER socket 一開始沒有設定 identity,會送 null identity 給 server,server 會產生 random UUID 給該 client,並發送給 client。
client 不是在 zmq_connect()
後,就馬上可以 route message 到 server,而是在 random time 以後,這裡會產生一個問題:我們不知道 server 是否存在,但如果 server online,在幾 ms 後,就會完成。
我們需要知道哪些 servers 可連線使用。在 Freelance pattern,不同於 broker-based patterns,servers 一開始都是靜默的等待別人連線。
解決方案是採用 shotgun approach,就是嘗試發送 ping-pong heartbeat 給所有 servers。
how Freelance client and server exchange ping-pong commands and request-reply commands 說明了這個 protocol
flclient3.py
"""
Freelance client - Model 3
Uses flcliapi class to encapsulate Freelance pattern
"""
import time
from flcliapi import FreelanceClient
def main():
# Create new freelance client object
client = FreelanceClient()
# Connect to several endpoints
client.connect ("tcp://localhost:5555")
client.connect ("tcp://localhost:5556")
client.connect ("tcp://localhost:5557")
# Send a bunch of name resolution 'requests', measure time
requests = 10000
start = time.time()
for i in range(requests):
request = [b"random name"]
reply = client.request(request)
if not reply:
print( "E: name service not available, aborting" )
return
print( "Average round trip cost: {} usec".format(1e6*(time.time() - start) / requests) )
if __name__ == '__main__':
main()
flcliapi.py
"""
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services
"""
import threading
import time
import zmq
from zhelpers import zpipe
# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 3000 # msecs
# PING interval for servers we think are alivecp
PING_INTERVAL = 2000 # msecs
# Server considered dead if silent for this long
SERVER_TTL = 6000 # msecs
def flciapi_agent(peer):
"""This is the thread that handles our real flcliapi class
"""
pass
# =====================================================================
# Synchronous part, works in our application thread
class FreelanceClient(object):
ctx = None # Our Context
pipe = None # Pipe through to flciapi agent
agent = None # agent in a thread
def __init__(self):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
self.agent = threading.Thread(target=agent_task, args=(self.ctx,peer))
self.agent.daemon = True
self.agent.start()
def connect(self, endpoint):
"""Connect to new server endpoint
Sends [CONNECT][endpoint] to the agent
"""
self.pipe.send_multipart([b"CONNECT", endpoint.encode()])
time.sleep(0.1) # Allow connection to come up
def request(self, msg):
"Send request, get reply"
request = [b"REQUEST"] + msg
self.pipe.send_multipart(request)
reply = self.pipe.recv_multipart()
status = reply.pop(0)
if status != b"FAILED":
return reply
# =====================================================================
# Asynchronous part, works in the background
# ---------------------------------------------------------------------
# Simple class for one server we talk to
class FreelanceServer(object):
endpoint = None # Server identity/endpoint
alive = True # 1 if known to be alive
ping_at = 0 # Next ping at this time
expires = 0 # Expires at this time
def __init__(self, endpoint):
self.endpoint = endpoint
self.alive = True
self.ping_at = time.time() + 1e-3*PING_INTERVAL
self.expires = time.time() + 1e-3*SERVER_TTL
def ping(self, socket):
if time.time() > self.ping_at:
socket.send_multipart([self.endpoint.encode(), b'PING'])
self.ping_at = time.time() + 1e-3*PING_INTERVAL
def tickless(self, tickless):
if tickless > self.ping_at:
tickless = self.ping_at
return tickless
# ---------------------------------------------------------------------
# Simple class for one background agent
class FreelanceAgent(object):
ctx = None # Own context
pipe = None # Socket to talk back to application
router = None # Socket to talk to servers
servers = None # Servers we've connected to
actives = None # Servers we know are alive
sequence = 0 # Number of requests ever sent
request = None # Current request if any
reply = None # Current reply if any
expires = 0 # Timeout for request/reply
def __init__(self, ctx, pipe):
self.ctx = ctx
self.pipe = pipe
self.router = ctx.socket(zmq.ROUTER)
self.servers = {}
self.actives = []
def control_message (self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)
if command == b"CONNECT":
endpoint = msg.pop(0).decode()
print( "I: connecting to %s...\n" % endpoint ),
self.router.connect(endpoint)
server = FreelanceServer(endpoint)
self.servers[endpoint] = server
self.actives.append(server)
# these are in the C case, but seem redundant:
server.ping_at = time.time() + 1e-3*PING_INTERVAL
server.expires = time.time() + 1e-3*SERVER_TTL
elif command == b"REQUEST":
assert not self.request # Strict request-reply cycle
# Prefix request with sequence number and empty envelope
self.request = [str(self.sequence).encode(), b''] + msg
# Request expires after global timeout
self.expires = time.time() + 1e-3*GLOBAL_TIMEOUT
def router_message (self):
reply = self.router.recv_multipart()
# Frame 0 is server that replied
endpoint = reply.pop(0).decode()
server = self.servers[endpoint]
if not server.alive:
self.actives.append(server)
server.alive = 1
server.ping_at = time.time() + 1e-3*PING_INTERVAL
server.expires = time.time() + 1e-3*SERVER_TTL;
# Frame 1 may be sequence number for reply
sequence = reply.pop(0)
if int(sequence) == self.sequence:
self.sequence += 1
reply = [b"OK"] + reply
self.pipe.send_multipart(reply)
self.request = None
# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.
def agent_task(ctx, pipe):
agent = FreelanceAgent(ctx, pipe)
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
poller.register(agent.router, zmq.POLLIN)
while True:
# Calculate tickless timer, up to 1 hour
tickless = time.time() + 3600
if (agent.request and tickless > agent.expires):
tickless = agent.expires
for server in agent.servers.values():
tickless = server.tickless(tickless)
try:
items = dict(poller.poll(1000 * (tickless - time.time())))
except:
break # Context has been shut down
if agent.pipe in items:
agent.control_message()
if agent.router in items:
agent.router_message()
# If we're processing a request, dispatch to next server
if (agent.request):
if (time.time() >= agent.expires):
# Request expired, kill it
agent.pipe.send(b"FAILED")
agent.request = None
else:
# Find server to talk to, remove any expired ones
while agent.actives:
server = agent.actives[0]
if time.time() >= server.expires:
server.alive = 0
agent.actives.pop(0)
else:
request = [server.endpoint.encode()] + agent.request
agent.router.send_multipart(request)
break
# Disconnect and delete any expired servers
# Send heartbeats to idle servers if needed
for server in agent.servers.values():
server.ping(agent.router)
flserver3.py
"""Freelance server - Model 3
Uses an ROUTER/ROUTER socket but just one thread
"""
import sys
import zmq
from zhelpers import dump
def main():
verbose = '-v' in sys.argv
ctx = zmq.Context()
# Prepare server socket with predictable identity
bind_endpoint = "tcp://*:5555"
connect_endpoint = "tcp://localhost:5555"
server = ctx.socket(zmq.ROUTER)
server.identity = connect_endpoint.encode()
server.bind(bind_endpoint)
print( "I: service is ready at", bind_endpoint )
while True:
try:
request = server.recv_multipart()
except:
break # Interrupted
# Frame 0: identity of client
# Frame 1: PING, or client control frame
# Frame 2: request body
address, control = request[:2]
reply = [address, control]
if control == b"PING":
reply[1] = b"PONG"
else:
reply.append(b"OK")
if verbose:
dump(reply)
server.send_multipart(reply)
print( "W: interrupted" )
if __name__ == '__main__':
main()
執行結果
$ python flserver3.py -v
I: service is ready at tcp://*:5555
----------------------------------------
[005] 0x00800041aa
[001] 0
[002] OK
----------------------------------------
[005] 0x00800041aa
[001] 1
[002] OK
----------------------------------------
[005] 0x00800041aa
[001] 2
[002] OK
$ python flclient3.py
I: connecting to tcp://localhost:5555...
I: connecting to tcp://localhost:5556...
I: connecting to tcp://localhost:5557...
Average round trip cost: 652.7282238006592 usec
- Multithreaded API: client API 有兩個部分,在 application thread 運作的 synchronous flcliapi class,及背景運作的 asynchronous agent。ZeroMQ 可用在 multithreaded apps,flcliapi 及 agent class 會用 inproc socket 互相溝通,所有 ZeroMQ 相關功能,都封裝在 API 裡面。agent 就像是 mini-broker,在背景跟所有 servers 溝通。發送 request 時,會盡可能連接到可使用的 server。
- Tickless poll timer: 以往是在 poll loop 使用 fixed tick interval,但還是會造成 CPU costs power,這邊的 agent 使用 tickless timer,會根據 next timeout 計算 poll delay。
沒有留言:
張貼留言