2019/10/7

ZeroMQ 3 Advanced Request-Reply Patterns

基於 request-reply pattern 的 advanced request-reply pattern


  • request-reply 機制如何運作
  • 如何組合 REQ, REP, DEALER, ROUTER sockets
  • ROUTER sockets 運作方式
  • load balancing pattern
  • 如何製作 simple load balancing message broker
  • 以 ØMQ 設計高階 API
  • 如何製作非同步 request-reply server
  • 詳細的 inter-broker routing 範例

Request-Reply 機制


reply message envelopes


envelope 是包裝 data 增加 address 的方法,藉由分離 address 到 envelope 的方法,可以製作一般化的 intermediaries,例如 APIs, proxies,用來 create, read, remove addresses。


在 request-reply pattern 中,envelope 會儲存 return address(用在 reply),這也是無狀態的 ØMQ network 可以建立 round-trip request-reply dialog 的方法。


如果是使用 REQ, REP sockets,也不會看到 envelopes,sockets 會自動處理。


The Simple Reply Envelope

request-reply 訊息交換包含了 request 以及 reply message,最簡單的 pattern 針對每一個 request 只有一個 reply,在 advanced pattern 裡面,requests 及 replies 可以用非同步方式運作,不過 reply envelope 都是一樣的。


ØMQ reply enveope 包含了 0 到多個 reply addresses,後面是 empty frame (envelope delimiter),接下來是 message body (0 到多個 frames),envelope 是由多個 socket 合作產生出來的。


由 REQ socket 發送最間單的 "Hello" 開始,REQ socket 會產生最簡單的 reply envelope,沒有 addresses,只有一個 empty delimiter frame 及 message frame。



REP socket 會 strips off the envelope,儲存整個 envelope,將 "Hello" 傳給 application,因此 application 並不會看到這個 envelope。


如果監聽 hwclient 及 hwserver 之間的封包,會發現每一個 request/reply message 都有兩個 frames。


Extended Reply Envelope

Extended Request-Reply Pattern: 透過 ROUTER-DEALER proxy 延伸 REQ-REP pair。



proxy 的 pseudo-code


prepare context, frontend and backend sockets
while true:
    poll on both sockets
    if frontend had input:
        read all frames from frontend
        send to backend
    if backend had input:
        read all frames from backend
        send to frontend

ROUTER socket 不同於其他 sockets,會追蹤 connection,並通知 caller,方法是在每一個收到的訊息欠面加上 connection identity: address,address 只是一個 binary string,用意是:connection 唯一的 handle。再發送 message 到 ROUTER socket 時,會先發送 identity frame。


收到 ZMQ_ROUTER socket 的訊息時,要先 prepend a message part,描述訊息的來源 id,訊息在所有 connected peers 間是 fair-queued 接收。發送訊息到ZMQ_ROUTER socket 時,要先移除第一個message part,用來判斷要將訊息 route 到哪一個 node。


ØMQ v2.2 以前是使用 UUDIs 為 identities,3.0 以後,會產生 5 bytes identity ( 0 + random 32bits integer )。


如果有三個 REQ socket 連接到 ROUTER,會對每一個 REQ socket 都產生一個 random identity。


如果 REQ 的 identity 為 "ABC",在 ROUTER socket 會收到以下 3 frames 的 message



proxy 核心功能是 "由一個 socket 讀資料,寫入另一個 socket",由 DEALER socket 發送給 REP socket 的 message 為以下格式。REP socket 一次只能處理一個 request-reply 交換機制。



當 DEALER 讀取 3 frames,透過 ROUTER socket 發送三個 message。ROUTER 取得第一個 frame (ABC),並尋找該 connection,找到時,就發送後面 2 個 frames。



REQ socket 取得訊息,檢查一個 frame 為 empty delimiter。然後會丟棄該 frmame,並發送 "World" 到 applicaiton 呼叫端。


優點

雖然 (extended) request-reply pattern 沒有在 server 因為 application bug 而發生問題的 recover 機制,但還是很好用。


  • 每次 ROUTER 提供訊息時,都會告訴發送端的 identity
  • 可使用 hash table (以identity 為 key) 追蹤是否有增加新的 peer
  • 如果在第一個 frame 增加 identity,ROUTER 會以非同步方式轉發訊息到任一個 node。

ROUTER 不知道 envelope,也不知道 empty delimiter,只知道第一個是 identity frame。


recap

  • REQ socket 發送 message 是同步的,在訊息前面會加上 empty delimiter frame,永遠是 send 以後,就等待 reply。REQ socket 一次只會跟一個 peer 溝通。
  • REP socket 會讀並儲存所有 identities,包含 empty delimiter,並傳送給 caller。如果將 REP socket 連上多個 peers,由 peer 發送的 request 會以 fair 方式平均讀取。
  • DEALER socket 容易忽略 reply envelope,DEALER socket 為非同步的,會以 fair-queue 方式由所有 connection 讀取訊息。
  • ROUTER socket 容易忽略 reply envelope。他會產生 connection 的 identity,identity 以第一個 frame 方式傳送。ROUTER 是非同步的。

Request-Reply Combinations


有四種 request-reply sockets,各種組合有不同的用途。


以下是合法的組合:


  • REQ to REP
  • DEALER to REP
  • REQ to ROUTER
  • DEALER to ROUTER
  • DEALER to DEALER
  • ROUTER to ROUTER

以下是非法的組合


  • REQ to REQ
  • REQ to DEALER
  • REP to REP
  • REP to ROUTER

注意,因為 DEALER 是 asynchronous REQ socket,ROUTER 是 asynchronous REP socket,當使用 REQ socket,可以用 DEALER 替代 (需要自己讀寫 envelope)。也可以將 REP socket 換成 ROUTER (需要自己管理 identities)。


把 REQ, DEALER 視為 "client",要用 connect,連接到 REQ, ROUTER,而 REP, ROUTER 視為 "server",要用 bind。


REQ to REP

必須要由 REQ client 先發送訊息,然後由 REP server 發送回應。發生問題會有 EFSM error。


DEALER to REP

把 REQ 替換成 DEALER (asynchronous),DEALER 可跟多個 REP server 溝通。


使用 DEALER 必須要正確 emulate envelope,否則 REP socket 會直接丟棄 message,發送時必須要:


  • 發送 empty message frame with MORE flag
  • 再發送 message body

接收時


  • 接收第一個 frame,如果不是 empty,就丟棄 message
  • 接收第二個 frame,傳送給 application

REQ to ROUTER

將 REP 換成 ROUTER (asynchronous server),ROUTER 可跟多個 REQ clients 溝通。


可用兩種方法使用 ROUTER:


  1. 在 fronend, backend socket 之間,當作 proxy
    ROUTER 會讀取所有 frames (包含 identity frame) 傳送出去
  2. 作為 application,讀取並處理 message
    ROUTER 必須知道 reply envelope 的格式。ROUTER 會收到 identity frame, empty frame, data frame。

DEALER to ROUTER

asynchronous client 跟 asynchronous server 溝通


可以處理任意格式的 message format,必須要自己設計 protocol。可決定要不要模仿 REQ/REP reply envelope,也就是自己決定要不要發送 replies


DEALER to DEALER

非同步,可發送任意數量的 replies,要自行管理 reply envelopes。


這是特殊的 pattern,很少用到。


ROUTER to ROUTER

N-to-N connection,最難使用的組合,在 chap4 有 example


Invalid Combinations

以下是錯誤的原因


  • REQ to REQ
    兩邊都想先開始發送訊息

  • REQ to DEALER
    DEALER 無法發送 reply 給原本的 peer

  • REP to REP
    兩邊都在等待第一個 request message

  • REP to ROUTER
    理論上可用 ROUTER initiate dialog,發送第一個 request,但要預先知道 REP socket 已經連線,且知道該 connection 的 identity


binds 那一端是 server, broker, publisher, collector
connects 那端是 client, worker


Exploring ROUTER sockets


解釋如何識別connection,ROUTER socket 在無法發送訊息時會怎麼處理。


Identities and Addresses

identity 是 ØMQ 讓 ROUTER 識別 connection 的方法,用在 reply envelope 的 addresses。通常 identity 是亂數,在 ROUTER socket 裡面有個 hash table。另外 peer 會有實體的 address ( network endpoint ex:"tcp://192.168.55.115:5670" ) 或是 logical address (UUID or email address or other unique key)


application 可使用 ROUTER socket 跟特定 peers 溝通,會轉換 logical address 為 identity。因為 ROUTER socket 只會在 peer 發送訊息時,才會使用 identity of a connection,可以只回應該訊息,不是直接跟 peer 通訊。


可強制 ROUTER socket 改用 logical address 為 identity (利用 zmq_setsockopt)


  • peer application 在 peer socket (DEALER or REQ) binding 或 connection 前,設定 ZMQ_IDENTITY option
  • 通常 peer 會連接到已綁定的 ROUTER socket,但 ROUTER 會連接到 peer
  • 在 connection time,peer socket 會告訴 router socket "請在此 connection 使用這個 identity"
  • 如果 peer socket 沒有提供 identity,router 會自己產生 random identity
  • ROUTER socket 提供 logical address 給 application 作為自該 peer 接收的訊息的 identity frame
  • ROUTER 也會使用 logical address 為未來發送訊息的 identity frame

identity.py,產生兩個 REQ socket,一個用預設的 random identity,一個用自訂的 identity


# encoding: utf-8
#
#   Demonstrate identities as used by the request-reply pattern.  Run this
#   program by itself.

import zmq
import zhelpers

context = zmq.Context()

sink = context.socket(zmq.ROUTER)
sink.bind("inproc://example")

# First allow 0MQ to set the identity
anonymous = context.socket(zmq.REQ)
anonymous.connect("inproc://example")
anonymous.send(b"ROUTER uses a generated 5 byte identity")
zhelpers.dump(sink)

# Then set the identity ourselves
identified = context.socket(zmq.REQ)
identified.setsockopt(zmq.IDENTITY, b"PEER2")
identified.connect("inproc://example")
identified.send(b"ROUTER socket uses REQ's socket identity")
zhelpers.dump(sink)

執行結果


$ python identity.py
----------------------------------------
[005] 0x00800041a7
[000]
[039] ROUTER uses a generated 5 byte identity
----------------------------------------
[005] PEER2
[000]
[040] ROUTER socket uses REQ's socket identity

Router Error Handling

ROUTER socket 針對無法發送的訊息有暴力的解決方法:就是直接丟棄。雖然有效,但很難 debug。


ØMQ v3.2 以後,增加了 socket option 可以 catch error: ZMQ_ROUTER_MANDATORY,如果有設定,在 socket 發送 unroutable identity message 時,socket 會發出 EHOSTUNREACH error。


Load Balancing Pattern


瞭解如何連接 ROUTER 到 REQ 然後到 DEALER。load balancing pattern 就是利用 ROUTER socket 處理 routing。


load balancing pattern 解決了 (PUSH, DEALER) 提供的 simple round robin routing 的問題:因為 round robin 無法處理 worker 處理時間不同的問題,會變得沒有效率。


如果每一個郵局櫃檯人員有一條 queue,有些人買郵票 (simple transaction),有些人開戶頭 (slow transaction),round robin 處理時,會發現買郵票的人,會卡在 queue 裡面。


郵局的問題,解決方式是,建立一條單一 queue,讓幾個櫃檯處理 slow transaction,其他櫃檯以 first-come first-serve 方式服務客戶。


PUSH, DEALER 使用 simple round robin 的原因是迴避 sheer performance。如果在機場入境處,會發現大排長龍,不是用單一 queue,而是讓使用者預先往前走。因為入境檢查花費的時間大部分都差不多,造成 more or less fair 的結果。這也是 PUSH, DEALER 的策略,先將工單發送出去,最後移動的距離會比較短。


不同情境要用不同的策略。


回到 worker (DEALER or REQ) 連接到 broker (ROUTER) 的問題上,broker 需要知道哪一個 worker 有空,並維持一份 least recently used worker 列表。


解決方式為:worker 在啟動及完成一項工作時,發送 "ready" message。broker 會依順序讀取訊息,每次收到訊息,就是 last used worker 發送的。因為是用 ROUTER,可以取得 identity,就可以繼續往該 worker 發送工作訊息。


每個 task 會跟著 reply 一起發送,該 task 的 response 會以新的 request 發送。


ROUTER Broker and REQ Workers

使用 ROUTER broker 實作 load balancing pattern,跟一組 REQ workers 溝通。


rtreq.py


# encoding: utf-8
#
#   Custom routing Router to Mama (ROUTER to REQ)

import time
import random
from threading import Thread

import zmq

import zhelpers

NBR_WORKERS = 10


def worker_thread(context=None):
    context = context or zmq.Context.instance()
    worker = context.socket(zmq.REQ)

    # We use a string identity for ease here
    zhelpers.set_id(worker)
    worker.connect("tcp://localhost:5671")

    total = 0
    while True:
        # Tell the router we're ready for work
        worker.send(b"ready")

        # Get workload from router, until finished
        # 發送 b"ready" 後,自 router 持續接收工作的訊息
        workload = worker.recv()

        # 判斷是否收到 b"END" 的訊息,就是要結束工作
        finished = workload == b"END"
        if finished:
            print("Processed: %d tasks" % total)
            break
        total += 1

        # Do some random work
        time.sleep(0.1 * random.random())


context = zmq.Context.instance()
client = context.socket(zmq.ROUTER)
client.bind("tcp://*:5671")

# 產生 10 個 worker
for _ in range(NBR_WORKERS):
    Thread(target=worker_thread).start()

# 產生 100 個 task,接收到 worker 發送的 b"ready" 後(夾帶了 worker address),就回送 task 訊息
for _ in range(NBR_WORKERS * 10):
    # LRU worker is next waiting in the queue
    address, empty, ready = client.recv_multipart()

    client.send_multipart([
        address,
        b'',
        b'This is the workload',
    ])

# Now ask mama to shut down and report their results
# 發送 b"END" 訊息給 worker 要結束工作
for _ in range(NBR_WORKERS):
    address, empty, ready = client.recv_multipart()
    client.send_multipart([
        address,
        b'',
        b'END',
    ])

執行結果


$ python rtreq.py
Processed: 8 tasks
Processed: 10 tasks
Processed: 10 tasks
Processed: 9 tasks
Processed: 9 tasks
Processed: 12 tasks
Processed: 8 tasks
Processed: 11 tasks
Processed: 10 tasks
Processed: 13 tasks

REQ 發送的訊息格式如下



ROUTER Broker and DEALER Workers

在能使用 REQ 的地方,都可以改用 DEALER,但有兩個差異


  1. REQ socket 會在 data frame 前面,先送 empty delimiter frame,但是 DEALER 不會
  2. REQ socket 在收到 reply 前只會發送一個 message,而 DEALER 是 asynchronous 的

synchronous/asynchronous 的差異看起來不明顯,差異比較大的在 failure recovering,在 chap4 討論。


rtdealer.py


# encoding: utf-8
#
#   Custom routing Router to Dealer

import time
import random
from threading import Thread

import zmq


# We have two workers, here we copy the code, normally these would
# run on different boxes...
# 兩個 worker 分別有不同的 identity
def worker_a(context=None):
    context = context or zmq.Context.instance()
    worker = context.socket(zmq.DEALER)
    worker.setsockopt(zmq.IDENTITY, b'A')
    worker.connect("ipc://routing.ipc")

    total = 0
    while True:
        # We receive one part, with the workload
        request = worker.recv()
        # 接收到 b"END" 代表要結束 worker
        finished = request == b"END"
        if finished:
            print("A received: %s" % total)
            break
        total += 1


def worker_b(context=None):
    context = context or zmq.Context.instance()
    worker = context.socket(zmq.DEALER)
    worker.setsockopt(zmq.IDENTITY, b'B')
    worker.connect("ipc://routing.ipc")

    total = 0
    while True:
        # We receive one part, with the workload
        request = worker.recv()
        finished = request == b"END"
        if finished:
            print("B received: %s" % total)
            break
        total += 1


context = zmq.Context.instance()
client = context.socket(zmq.ROUTER)
client.bind("ipc://routing.ipc")

Thread(target=worker_a).start()
Thread(target=worker_b).start()

# Wait for threads to stabilize
time.sleep(1)

# Send 10 tasks scattered to A twice as often as B
for _ in range(10):
    # Send two message parts, first the address...
    # 以 2:1 的比例,發送 task 給 worker A, B
    ident = random.choice([b'A', b'A', b'B'])
    # And then the workload
    work = b"This is the workload"
    client.send_multipart([ident, work])

client.send_multipart([b'A', b'END'])
client.send_multipart([b'B', b'END'])

執行結果


$ python rtdealer.py
B received: 3
A received: 7

程式跟上一個差不多,只是改成 DEALER。


A Load Balancing Message Broker

前面的例子中,沒有跟 clint 溝通的方法。將中間改為 proxy,就可以在 frontend backend 間轉發訊息



broker 有以下工作


  • 接收來自多個 clients 的連線
  • 接收來自多個 workers 的連線
  • 接收 client 的 request,存放在單一 queue
  • 以 load balancing pattern 發送 requests
  • 接收來自 workers 的 replies
  • 回送 replies 給 client

lbbroker.py


測試後發現,會有部分 client 訊息遺失,但將 ipc 改為 tcp 後,程式就是正常的。


"""
Load-balancing broker

Clients and workers are shown here in-process.
"""

from __future__ import print_function

import multiprocessing

import zmq


NBR_CLIENTS = 10
NBR_WORKERS = 3


def client_task(ident):
    """Basic request-reply client using REQ socket."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = u"Client-{}".format(ident).encode("ascii")
    # socket.connect("ipc://frontend.ipc")
    socket.connect("tcp://localhost:20000")

    # Send request, get reply
    socket.send(b"HELLO")
    reply = socket.recv()
    print("{}: {}".format(socket.identity.decode("ascii"),
                          reply.decode("ascii")))


def worker_task(ident):
    """Worker task, using a REQ socket to do load-balancing."""
    socket = zmq.Context().socket(zmq.REQ)
    socket.identity = u"Worker-{}".format(ident).encode("ascii")
    # socket.connect("ipc://backend.ipc")
    socket.connect("tcp://localhost:20001")

    # Tell broker we're ready for work
    socket.send(b"READY")

    while True:
        address, empty, request = socket.recv_multipart()
        print("{}: {}".format(socket.identity.decode("ascii"),
                              request.decode("ascii")))
        socket.send_multipart([address, b"", b"OK"])


def main():
    """Load balancer main loop."""
    # Prepare context and sockets
    context = zmq.Context.instance()
    frontend = context.socket(zmq.ROUTER)
    # frontend.bind("ipc://frontend.ipc")
    frontend.bind("tcp://*:20000")
    backend = context.socket(zmq.ROUTER)
    # backend.bind("ipc://backend.ipc")
    backend.bind("tcp://*:20001")

    # Start background tasks
    def start(task, *args):
        process = multiprocessing.Process(target=task, args=args)
        process.daemon = True
        process.start()
    for i in range(NBR_CLIENTS):
        start(client_task, i)
    for i in range(NBR_WORKERS):
        start(worker_task, i)

    # Initialize main loop state
    count = NBR_CLIENTS
    workers = []
    poller = zmq.Poller()
    # Only poll for requests from backend until workers are available
    poller.register(backend, zmq.POLLIN)

    while True:
        sockets = dict(poller.poll())

        if backend in sockets:
            # Handle worker activity on the backend
            request = backend.recv_multipart()
            worker, empty, client = request[:3]
            if not workers:
                # Poll for clients now that a worker is available
                poller.register(frontend, zmq.POLLIN)
            workers.append(worker)
            if client != b"READY" and len(request) > 3:
                # If client reply, send rest back to frontend
                empty, reply = request[3:]
                frontend.send_multipart([client, b"", reply])
                count -= 1
                if not count:
                    break

        if frontend in sockets:
            # Get next client request, route to last-used worker
            client, empty, request = frontend.recv_multipart()
            worker = workers.pop(0)
            backend.send_multipart([worker, b"", client, b"", request])
            if not workers:
                # Don't poll clients if no workers are available
                poller.unregister(frontend)

    # Clean up
    backend.close()
    frontend.close()
    context.term()


if __name__ == "__main__":
    main()

程式要注意的部分


  1. 每個 socket 讀寫的訊息 envelope
  2. load balancing algorithm

Client 會發送 "Hello" 到 broker



REQ socket 會增加一個 empty delimiter frame
ROUTER socket 會增加 conneciton identity,因此 broker 的 frontend 接收訊息後,會收到 client address, empty delimiter frame, data frame



broker 會再增加 empty delimiter,然後再加上 address of worker,透過 backend 發送出去



worker 發送的訊息



worker 需要儲存 envelope,包含 empty delimiter 的部分。REP socket 會自動處理,但使用 ROUTER 必須自己處理 message frames。


回傳訊息的部分,backend 會給 broker 5 frames 的訊息,broker 發送 3 frames 訊息給 client,而 client 只會取得結果的訊息




load balancing 的部分


  1. 建立 pollset,固定會 pools backend,但只在有 workers 的時候,會 polls frontend
  2. poll for activity 有無限的 timeout 時間
  3. 如果 backend 有活動,不是取得 "ready" 訊息,就是取得處理結果 (要回傳給 client)。兩種狀況下都儲存 worker addres 到一個 worker list queue
  4. 如果 frontend 有活動,取得 client request -> 由 list 取出一個 worker,並發送 request 到 backend。也就是 worker address, empty delimiter, 及 client request 的三個 frames

A High-Level API for ZeroMQ


因為需要撰寫更複雜的範例,所以要先更深入了解高階的 API


這是 load balancing broker 的 worker thread


while (true) {
    //  Get one address frame and empty delimiter
    char *address = s_recv (worker);
    char *empty = s_recv (worker);
    assert (*empty == 0);
    free (empty);

    //  Get request, send reply
    char *request = s_recv (worker);
    printf ("Worker: %s\n", request);
    free (request);

    s_sendmore (worker, address);
    s_sendmore (worker, "");
    s_send     (worker, "OK");
    free (address);
}

程式不夠彈性,因為只能處理一個 replay address 的 envelope。如果直接用 libzmq API 需要改寫為


while (true) {
    //  Get one address frame and empty delimiter
    char address [255];
    int address_size = zmq_recv (worker, address, 255, 0);
    if (address_size == -1)
        break;

    char empty [1];
    int empty_size = zmq_recv (worker, empty, 1, 0);
    zmq_recv (worker, &empty, 0);
    assert (empty_size <= 0);
    if (empty_size == -1)
        break;

    //  Get request, send reply
    char request [256];
    int request_size = zmq_recv (worker, request, 255, 0);
    if (request_size == -1)
        return NULL;
    request [request_size] = 0;
    printf ("Worker: %s\n", request);
    
    zmq_send (worker, address, address_size, ZMQ_SNDMORE);
    zmq_send (worker, empty, 0, ZMQ_SNDMORE);
    zmq_send (worker, "OK", 2, 0);
}

高階 API 的功能

  • string helper (s_send, s_recv)
  • frame: message frame
  • message: list of frames

剛剛的 worker thread 可改寫簡化為


while (true) {
    zmsg_t *msg = zmsg_recv (worker);
    zframe_reset (zmsg_last (msg), "OK", 2);
    zmsg_send (&msg, worker);
}

  • automatic handling of sockets: 要手動關閉 socket 很麻煩,某些狀況需要指定 linger timeout。要有個好方法,可以在關閉 context 時,自動關閉 sockets
  • portable thread management: 通常 ZeroMQ application 會使用 threads,但是 POSIX threads 並不 portable,因此 API 應該要封裝這個處理方法
  • piping from parent to child threads: 這是 recurrent problem,如何在 parent 及 child threds 之間發送 signal,API 要提供 ZeroMQ pipe (broker 的 frontend 接收訊息後,自動使用 PAIR socket 及 inproc)

  • portable clocks: 取得 ms 時間,或是 sleep ms 時間都不 portable,application 需要 API 提供正確的 clock 時間資訊

  • a reactor to replace zmq_poll()。pool loop 雖簡單,但不夠好。程式常會重複: calculating itmers, 在 socket ready 時 calling code。需要簡單餓 reactor,且有 socket reader 及 timer 功能。

  • proper handling of Ctrl-C


CZMQ High-Level API

C 語言,可改使用 CZMQ: ZeroMQ binding for C


lbbroker2.py


跟上一版的 lbbroker.py 的差異為


  • 不會一直呼叫 poller.register(frontend, zmq.POLLIN) 或是 poller.unregister(frontend),而改用一個 available_workers 變數,判斷是否要 pool frontend 的 message
  • inproc 可以運作
  • 增加 assert,檢查 empty delimiter
  • 在 worker 檢查 zmq.ContextTerminated 的錯誤訊號

"""
   Least-recently used (LRU) queue device
   Clients and workers are shown here in-process
"""

from __future__ import print_function
import threading
import time
import zmq

NBR_CLIENTS = 10
NBR_WORKERS = 3


def worker_thread(worker_url, context, i):
    """ Worker using REQ socket to do LRU routing """

    socket = context.socket(zmq.REQ)

    # set worker identity
    socket.identity = (u"Worker-%d" % (i)).encode('ascii')

    socket.connect(worker_url)

    # Tell the broker we are ready for work
    socket.send(b"READY")

    try:
        while True:

            address, empty, request = socket.recv_multipart()

            print("%s: %s\n" % (socket.identity.decode('ascii'),
                                request.decode('ascii')), end='')

            socket.send_multipart([address, b'', b'OK'])

    except zmq.ContextTerminated:
        # context terminated so quit silently
        return


def client_thread(client_url, context, i):
    """ Basic request-reply client using REQ socket """

    socket = context.socket(zmq.REQ)

    # Set client identity. Makes tracing easier
    socket.identity = (u"Client-%d" % (i)).encode('ascii')

    socket.connect(client_url)

    #  Send request, get reply
    socket.send(b"HELLO")
    reply = socket.recv()

    print("%s: %s\n" % (socket.identity.decode('ascii'),
                        reply.decode('ascii')), end='')


def main():
    """ main method """

    url_worker = "inproc://workers"
    url_client = "inproc://clients"
    client_nbr = NBR_CLIENTS

    # Prepare our context and sockets
    context = zmq.Context()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind(url_client)
    backend = context.socket(zmq.ROUTER)
    backend.bind(url_worker)

    # create workers and clients threads
    for i in range(NBR_WORKERS):
        thread = threading.Thread(target=worker_thread,
                                  args=(url_worker, context, i, ))
        thread.start()

    for i in range(NBR_CLIENTS):
        thread_c = threading.Thread(target=client_thread,
                                    args=(url_client, context, i, ))
        thread_c.start()

    # Logic of LRU loop
    # - Poll backend always, frontend only if 1+ worker ready
    # - If worker replies, queue worker as ready and forward reply
    # to client if necessary
    # - If client requests, pop next worker and send request to it

    # Queue of available workers
    available_workers = 0
    workers_list = []

    # init poller
    poller = zmq.Poller()

    # Always poll for worker activity on backend
    poller.register(backend, zmq.POLLIN)

    # Poll front-end only if we have available workers
    poller.register(frontend, zmq.POLLIN)

    while True:

        socks = dict(poller.poll())

        # Handle worker activity on backend
        if (backend in socks and socks[backend] == zmq.POLLIN):

            # Queue worker address for LRU routing
            message = backend.recv_multipart()
            assert available_workers < NBR_WORKERS

            worker_addr = message[0]

            # add worker back to the list of workers
            available_workers += 1
            workers_list.append(worker_addr)

            #   Second frame is empty
            empty = message[1]
            assert empty == b""

            # Third frame is READY or else a client reply address
            client_addr = message[2]

            # If client reply, send rest back to frontend
            if client_addr != b'READY':

                # Following frame is empty
                empty = message[3]
                assert empty == b""

                reply = message[4]

                frontend.send_multipart([client_addr, b"", reply])

                client_nbr -= 1

                if client_nbr == 0:
                    break  # Exit after N messages

        # poll on frontend only if workers are available
        if available_workers > 0:

            if (frontend in socks and socks[frontend] == zmq.POLLIN):
                # Now get next client request, route to LRU worker
                # Client request is [address][empty][request]

                [client_addr, empty, request] = frontend.recv_multipart()

                assert empty == b""

                #  Dequeue and drop the next worker address
                available_workers += -1
                worker_id = workers_list.pop()

                backend.send_multipart([worker_id, b"",
                                        client_addr, b"", request])

    #out of infinite loop: do some housekeeping
    time.sleep(1)

    frontend.close()
    backend.close()
    context.term()


if __name__ == "__main__":
    main()



這是最好的版本的 lbbroker,使用 IOLoop


lbbroker3.py


"""

   Least-recently used (LRU) queue device
   Demonstrates use of pyzmq IOLoop reactor

   While this example runs in a single process, that is just to make
   it easier to start and stop the example. Each thread has its own
   context and conceptually acts as a separate process.
"""

from __future__ import print_function
import threading
import time
import zmq

from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream

NBR_CLIENTS = 10
NBR_WORKERS = 3


def worker_thread(worker_url, i):
    """ Worker using REQ socket to do LRU routing """
    context = zmq.Context.instance()

    socket = context.socket(zmq.REQ)

    # set worker identity
    socket.identity = (u"Worker-%d" % (i)).encode('ascii')

    socket.connect(worker_url)

    # Tell the broker we are ready for work
    socket.send(b"READY")

    try:
        while True:

            address, empty, request = socket.recv_multipart()

            print("%s: %s\n" % (socket.identity.decode('ascii'),
                                request.decode('ascii')), end='')

            socket.send_multipart([address, b'', b'OK'])

    except zmq.ContextTerminated:
        # context terminated so quit silently
        return


def client_thread(client_url, i):
    """ Basic request-reply client using REQ socket """
    context = zmq.Context.instance()

    socket = context.socket(zmq.REQ)

    # Set client identity. Makes tracing easier
    socket.identity = (u"Client-%d" % (i)).encode('ascii')

    socket.connect(client_url)

    #  Send request, get reply
    socket.send(b"HELLO")
    reply = socket.recv()

    print("%s: %s\n" % (socket.identity.decode('ascii'),
                        reply.decode('ascii')), end='')


class LRUQueue(object):
    """LRUQueue class using ZMQStream/IOLoop for event dispatching"""

    def __init__(self, backend_socket, frontend_socket):
        self.available_workers = 0
        self.workers = []
        self.client_nbr = NBR_CLIENTS

        self.backend = ZMQStream(backend_socket)
        self.frontend = ZMQStream(frontend_socket)
        self.backend.on_recv(self.handle_backend)

        self.loop = IOLoop.instance()

    def handle_backend(self, msg):
        # 處理 backend 接收 worker 發送的訊息
        # address, empty, request
        # Queue worker address for LRU routing
        worker_addr, empty, client_addr = msg[:3]

        assert self.available_workers < NBR_WORKERS

        # add worker back to the list of workers
        # 收到訊息後,增加 workers
        self.available_workers += 1
        self.workers.append(worker_addr)

        #   Second frame is empty
        assert empty == b""

        # Third frame is READY or else a client reply address
        # If client reply, send rest back to frontend
        # 因為 worker 啟動時,只會發送 b"READY",其他狀況都是 address, empty, request
        if client_addr != b"READY":
            empty, reply = msg[3:]

            # Following frame is empty
            assert empty == b""

            self.frontend.send_multipart([client_addr, b'', reply])

            # 以 client request 數量,判斷是否要停止 IOLoop
            # 0 表示已經接收到 10 個 worker 傳來的,已經轉發給 client 的處理結果
            self.client_nbr -= 1
            if self.client_nbr == 0:
                # Exit after N messages
                self.loop.add_timeout(time.time() + 1, self.loop.stop)

        if self.available_workers == 1:
            # on first recv, start accepting frontend messages
            # 啟動接收 frontend 訊息
            self.frontend.on_recv(self.handle_frontend)

    def handle_frontend(self, msg):
        # Now get next client request, route to LRU worker
        # Client request is [address][empty][request]
        client_addr, empty, request = msg

        assert empty == b""

        #  Dequeue and drop the next worker address
        self.available_workers -= 1
        worker_id = self.workers.pop()

        self.backend.send_multipart([worker_id, b'', client_addr, b'', request])
        if self.available_workers == 0:
            # stop receiving until workers become available again
            # 如果沒有 workers 就暫停接收 frontend 訊息
            self.frontend.stop_on_recv()


def main():
    """main method"""

    url_worker = "ipc://backend.ipc"
    url_client = "ipc://frontend.ipc"

    # Prepare our context and sockets
    context = zmq.Context()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind(url_client)
    backend = context.socket(zmq.ROUTER)
    backend.bind(url_worker)

    # create workers and clients threads
    for i in range(NBR_WORKERS):
        thread = threading.Thread(target=worker_thread, args=(url_worker, i, ))
        thread.daemon = True
        thread.start()

    for i in range(NBR_CLIENTS):
        thread_c = threading.Thread(target=client_thread,
                                    args=(url_client, i, ))
        thread_c.daemon = True
        thread_c.start()

    # create queue with the sockets
    queue = LRUQueue(backend, frontend)

    # start reactor
    IOLoop.instance().start()

if __name__ == "__main__":
    main()

執行時,會出現 IOLoop warning


lbbroker3.py:21: VisibleDeprecationWarning: zmq.eventloop.minitornado is deprecated in pyzmq 14.0 and will be removed.
    Install tornado itself to use zmq with the tornado IOLoop.

  from zmq.eventloop.ioloop import IOLoop

這邊注意 pyzmq 14.0 以後,需要使用者改直接安裝 tornado,使用 tornado IOLoop,在不修改程式的狀況下,直接安裝 tornado 後,就不會再出現 warning


sudo pip3 install tornado

Asynchronous Client/Server Pattern


在 ROUTER/DEALER 範例中,已經看過 1-N 的使用情境,一個 server 可非同步跟多個 worker 溝通,現在要改為 N-to-1 的架構,多個 clients 跟 一個 server 以非同步方式溝通。



  • clients 連到 server,發送 requests
  • 針對每個 request,server 會送 0 到多個 replies
  • clients 不需要等待 replies,可發送多個 requests
  • server 不需要等待 requests,可發送多個 replies

asyncsrv.py



import zmq
import sys
import threading
import time
from random import randint, random

__author__ = "Felipe Cruz <felipecruz@loogica.net>"
__license__ = "MIT/X11"

def tprint(msg):
    """like print, but won't get newlines confused with multiple threads"""
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

class ClientTask(threading.Thread):
    """ClientTask"""
    def __init__(self, id):
        self.id = id
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.DEALER)
        identity = u'worker-%d' % self.id
        socket.identity = identity.encode('ascii')
        socket.connect('tcp://localhost:5570')
        print('Client %s started' % (identity))
        poll = zmq.Poller()
        poll.register(socket, zmq.POLLIN)
        reqs = 0
        while True:
            # 發送 request
            reqs = reqs + 1
            print('Req #%d sent..' % (reqs))
            socket.send_string(u'request #%d' % (reqs))

            # 取出 replies 執行 5 次
            for i in range(5):
                sockets = dict(poll.poll(1000))
                if socket in sockets:
                    msg = socket.recv()
                    tprint('Client %s received: %s' % (identity, msg))

        socket.close()
        context.term()

class ServerTask(threading.Thread):
    """ServerTask"""
    def __init__(self):
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        frontend = context.socket(zmq.ROUTER)
        frontend.bind('tcp://*:5570')

        backend = context.socket(zmq.DEALER)
        backend.bind('inproc://backend')

        # 啟動 5 個 server workers
        workers = []
        for i in range(5):
            worker = ServerWorker(context)
            worker.start()
            workers.append(worker)

        # 以 proxy 轉接 frontend, backend
        zmq.proxy(frontend, backend)

        frontend.close()
        backend.close()
        context.term()

class ServerWorker(threading.Thread):
    """ServerWorker"""
    def __init__(self, context):
        threading.Thread.__init__ (self)
        self.context = context

    def run(self):
        worker = self.context.socket(zmq.DEALER)
        worker.connect('inproc://backend')
        tprint('Worker started')
        while True:
            # 收到 task,處理後,發送 reply
            ident, msg = worker.recv_multipart()
            tprint('Worker received %s from %s' % (msg, ident))

            # 發送 0~5 個replies
            replies = randint(0,4)
            for i in range(replies):
                time.sleep(1. / (randint(1,10)))
                worker.send_multipart([ident, msg])

        worker.close()


def main():
    """main function"""
    server = ServerTask()
    server.start()
    for i in range(3):
        client = ClientTask(i)
        client.start()

    server.join()


if __name__ == "__main__":
    main()

以 multithread 方式模擬多個 process 的狀況。執行後會有 3 個 clients。


  • clients 每秒發送一個 request,取得 0~多個 replies。為了讓 zmq_poll() 運作,不能 pool with 1 sec timeout,或是改成收到 reply 後再發送新的 request,因此程式是以 100 times at 1/100th of a second per pool 方式,高速進行 poll,讓程式接近正確
  • server 會使用 pool of worker threads,每一個 server 一次處理一個 request,透過 internal queue 連接到 frontend socket,透過 zmq_proxy() 連接 frontend, backend sockets


clinet-server 是使用 DEALER-ROUTER dialog,server 內部的 server-worker 是使用 DEALER-DEALER,如果 worker 需要同步機制,就改用 REP。因為要發送多個 replies,所以需要使用非同部的 DELAER。


client 發送單一 frame 的 request,server 收到 2 frames(前面增加 client address),server 發送 2 frames 給 worker,收到 2 frames 結果,再用第一個 frame 將結果傳回原本的 client。


     client          server       frontend       worker
   [ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
             1 part         2 parts       2 parts

在 server-worker 之間,可以改用 ROUTER-DEALER 增加 load balancing 的機制。


如果要實作有保存 client 狀態的 server,會遇到過一個問題,如果 server 儲存了每一個 client 的狀態,因為 client 來來去去,最終會耗盡資源。如果使用 default identity,也會讓每一個 connection 都像是新的。


我們可以實作 server,並只在一小段時間內(足夠讓 worker 處理 request)保存狀態,然後就丟棄狀態,但這樣的機制並不實用。為了實作 stateful asychronous server,儲存 client state,必須:


  1. 在 client-server 間處理 heartbeating,例如每秒發送一個 request
  2. 使用 client identity 為 key,用來儲存 state
  3. 偵測 heartbeat 停止機制,如果 client 沒有發送新的 request (ex: 2 seconds),server 可偵測到,並在 2s 後刪除 state

Example: Inter-Broker Routing


實作一個 real application


實作細節


  1. worker 運作在多種 hardware 上,每個 cluster 有上百個 workers,全部有 12 個 cluster
  2. clients 會產生 task 給 workers,每一個 task 是獨立的工作單位,所有 client 都需要盡快找到一個 worker 並分配 task。有很多 clients,且 client 可隨時連線或斷線。
  3. 在任意時間點,可以增加或減少 clusters,這部分很難做。cluster 要能夠即時 joing/leave cloud,並帶來 workers 與 clients
  4. 如果在某個 cluster 沒有 workers,client 的 task 會去 cloud 尋找其他 workers
  5. clients 一次發送一個 task,如果在 X seconds 內沒有收到結果,會再重新發送一次,這部分由 client 自行決定要怎麼做。
  6. workers 一次處理一個 task,如果 crash 就直接用 script 重新啟動。


  • 這是 clusters 之間的 super-duper network interconnect 機制
  • 每個 cluster 有上千個 clients,每一個 client 每秒有 10 requests,request 跟 reply 都很小,不會超過 1K bytes

簡單計算一下
2500 clients * 10/second * 1000 bytes * 2 directions = 50 MB/sec or 400 Mb/sec,這對 Gb ethernet 來說沒有問題,可以運作。


單一 cluster 的架構

workers, clients 是 synchronous,要用 load balancing pattern 將 task 轉發到 workers。所有 workers 都是一樣的,不需要識別不同 services 的機制,workers are anonymous。不提供保證送達的機制,發生問題就 retry。


client 及 worker 不會直接連線,因為這樣做會很難動態新增或移除 nodes,因此架構上是使用先前看過的 request-reply load balacing arachitecture。



提升到 multiple clusters


要增加 cluster,每一個 cluster 處理一組 clients 及 workers



討論如何讓不同 cluster 之間的 clients 及 workers 都可以互相溝通,有以下幾種方式,各有優缺點


  1. client 直接連到所有 brokers,優點是不需要修改 brokers/workers,但 client 會變得比較複雜,要知道 overall topology,如果新增 3rd or 4th cluster,會影響到所有 clients,也就是要在 clients 實作 routing 及 failover logic
  2. workers 直接連到所有 brokers,但REQ worker 不能這樣做,他們只能連接到單一個 broker,我們可使用 REPs,但 REP 不能客製 broker-to-worker routing (load balancing),只能使用內建的 load balancing。這樣會讓我們無法將 task 分配給空閒地 workers,有一個解決方案是在 worker node 改用 ROUTER,這個標記為 "Idea #1"



    看起來可以運作,但還是有問題,因為 clients 應該盡可能先使用 local workers,另外 workers 需要發送 "ready" signal 給兩個 brokers,可能會一次取得兩個 tasks。

  3. brokers 互相連線,這種方式最簡單,可產生最短路徑。但無法動態新增 clusters。clients 及 workers 一樣不需要知道 network topology,brokers 之間會分享空閒 workers 的資訊,這個標記為 "Idea #2"



    這個方案比較好,brokers 之間互相連線交換資訊。brokers 之間需要複雜一點的 routing algorithm

  4. 把同一個 cluster 的 clients, workers 視為預設的選項,另外增加例外狀況,在 clusters 之間交換 tasks

  5. 可針對不同類型的 task 採用不同的 message flows,例如使用不同類型的 network connection

  6. 要改成 3 個 brokers 也不會太難,如果有問題,也可以增加一個 super-broker


以下就開發一個範例,先將整個範例放到 1 process 裡面,ZeroMQ 可以由 micro-level scale 很快擴增到 macro-level。將 thread 改成 processes,每一個 cluster process 都包含 clients threads, worker threads, a broker thread


  • REQ client threads 產生 workloads,傳送給 broker (ROUTER)
  • REQ worker threads 接收 workloads,回傳結果給 broker(ROUTER)
  • broker 使用 load balancing pattern,進行 queues 並分配 workloads

Federation Versus Peering

brokers 之間需要互相通知 "we have capacity" 的資訊,並接收更多 tasks。也需要互相通知 "stop, we're full" 的資訊。這些資訊不需要非常即時且正確,有時候多收一些無法立刻處理的 task 也沒關係。


最簡單的方式是 "federation",brokers 之間互相模擬成 clients, workers。也就是把 frontend 街道另一個 broker 的 backend。



這個方案可解決問題,但會讓 broker 一次只能接受一個來自另一個 broker 的 task。


federation model 可處理 service-oriented architecture (SOAs),就是用 service name 做 routing 而不是 load balancing or round robin。


另一種方案是 "peering",brokers 以特殊的 channel 互相溝通,N brokers 就是有 N-1 peers


  • broker 要告訴 peers,目前有幾個 workers,可以只簡單通知數量就好了,正確的方式是要用 pub-sub socket pattern。每個 broker 使用 PUB 發布 state information,並使用 SUB socket 接收來自其他 peers 的 state information
  • broker 需要非同步發送 task 給其他 peers 並取得 replies 的方法。可使用 ROUTER sockets。broker 需要兩個 sockets,一個用來接收 replies,一個用來發送 tasks,用量個 sockets 可簡化 frames 內容的設計,不需要判斷是 request 或是 reply

Naming Ceremony

broker 需要用到 3 flows * 2 sockets/flow = 6 sockets,需要對 socket 有個好的命名機制。


3 flows 分別為


  1. local request-reply flow: 用在 broker 跟 clients, workers 之間
  2. cloud request-reply flow: 用在 broker 跟 peer brokers 之間
  3. state flow: 用在 broker 跟 peer broker 之間

已有意義且使用相同的長度的的名稱,會讓程式運作更好,我們用以下名稱


  1. localfe, localbe for local flow
  2. cloudfw, cloudbe for cloud flow
  3. statefe, statebe for state flow

範例先使用 ipc 模擬,不使用 inproc,ipc 跟 tcp 是 disconnected transport。ipc endpoint 名稱訂為 something-local, something-cloud, something-state



Prototyping the State Flow


peering1.py


#
#   Broker peering simulation (part 1) in Python
#   Prototypes the state flow
#
#   Author : Piero Cornice
#   Contact: root(at)pieroland(dot)net
#
import sys
import time
import random

import zmq


def main(myself, others):
    print("Hello, I am %s" % myself)

    context = zmq.Context()

    # State Back-End
    statebe = context.socket(zmq.PUB)

    # State Front-End
    statefe = context.socket(zmq.SUB)
    statefe.setsockopt(zmq.SUBSCRIBE, b'')

    bind_address = u"ipc://%s-state.ipc" % myself
    statebe.bind(bind_address)

    for other in others:
        statefe.connect(u"ipc://%s-state.ipc" % other)
        time.sleep(1.0)

    poller = zmq.Poller()
    poller.register(statefe, zmq.POLLIN)

    while True:

########## Solution with poll() ##########
        socks = dict(poller.poll(1000))

        # Handle incoming status message
        if socks.get(statefe) == zmq.POLLIN:
            msg = statefe.recv_multipart()
            print('%s Received: %s' % (myself, msg))

        else:
            # Send our address and a random value
            # for worker availability
            msg = [bind_address, (u'%i' % random.randrange(1, 10))]
            msg = [ m.encode('ascii') for m in msg]
            statebe.send_multipart(msg)
##################################

######### Solution with select() #########
#        pollin, pollout, pollerr = zmq.select([statefe], [], [], 1)
#
#        if pollin and pollin[0] == statefe:
#            # Handle incoming status message
#            msg = statefe.recv_multipart()
#            print 'Received:', msg
#
#        else:
#            # Send our address and a random value
#            # for worker availability
#            msg = [bind_address, str(random.randrange(1, 10))]
#            statebe.send_multipart(msg)
##################################


if __name__ == '__main__':
    if len(sys.argv) >= 2:
        main(myself=sys.argv[1], others=sys.argv[2:])
    else:
        print("Usage: peering.py <myself> <peer_1> ... <peer_N>")
        sys.exit(1)

執行結果


$ python peering1.py p1 p2
Hello, I am p1
p1 Received: [b'ipc://p2-state.ipc', b'5']
p1 Received: [b'ipc://p2-state.ipc', b'6']
p1 Received: [b'ipc://p2-state.ipc', b'7']

$ python peering1.py p2 p1
Hello, I am p2
p2 Received: [b'ipc://p1-state.ipc', b'7']
p2 Received: [b'ipc://p1-state.ipc', b'7']
p2 Received: [b'ipc://p1-state.ipc', b'4']
p2 Received: [b'ipc://p1-state.ipc', b'5']

  • broker 使用 identity 產生 ipc endpoint names,實際上正式環境的 broker 需要用 TCP transport 才對。
  • 使用 zmq_poll() loop 作為程式的核心,只會在沒有收到任何 incoming messages 時,會等 1s 然後發送 state message,如果在每次收到訊息後,就發送一個 state message,會造成 message storms
  • 使用 2-parts pub-sub message 包含 sender address 及 data。需要知道 publisher 的 address,因為要發送 task,唯一的方式,是放在訊息的一部分。
  • 不需要設定 subscribers 的 identities,因為不需要取得已經在運作中的 brokers 的 outdated state information。
  • 不需要設定 publisher 的 HWM

實際上並不需要定時發送 state messages,只要在 state change (a worker become available or unavailable) 時發送即可。


如果要定時發送 state message,可建立 child thread 並在該 thread 使用 statebe socket,可在 state change 時,發送給 child thread,再由該 thread,定時發送 state message。


Prototyping the Local and Cloud Flows

由 client 取得 requests,並轉發給 local workers,或是 cloud peers



需要有兩條 queues,一個是 local clients 的 requests,一個是 cloud clients的 requests。因為 ZeroMQ 已經有內建 queue,所以要使用 ZeroMQ 的 socket buffers。


這是在 load balancing broker 使用的技術,由 2 個 frontends 讀入 requests,由 backend 讀入,取得 replies 使用的 route back address,如果沒有 backend workers,也不需要讀取 frontends。


main loop 為


  • pool backends for activity 如取得 message,可能是 worker 傳送的 "ready",或是 reply,如果是 reply 就 route back 回 local 或 cloud frontend
  • 如果 worker 發送 reply,就表示該worker 可接收新的 task
  • 如果還有 available workers,由 local frontend 取出一個 request,轉發給 local worker,或是 randomly 轉發給 cloud peer (目前暫時先用 random 方式傳給 cloud peer)

在 brokers 之間使用 broker identity 去 route messages,每個 broker 都有自己的名字


peering2.py


#
#   Broker peering simulation (part 2) in Python
#   Prototypes the request-reply flow
#
#   While this example runs in a single process, that is just to make
#   it easier to start and stop the example. Each thread has its own
#   context and conceptually acts as a separate process.
#
#   Author : Min RK
#   Contact: benjaminrk(at)gmail(dot)com
#
import random
import sys
import threading
import time

import zmq

try:
    raw_input
except NameError:
    # Python 3
    raw_input = input

NBR_CLIENTS = 10
NBR_WORKERS = 3

def tprint(msg):
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

def client_task(name, i):
    """Request-reply client using REQ socket"""
    ctx = zmq.Context()
    client = ctx.socket(zmq.REQ)
    client.identity = (u"Client-%s-%s" % (name, i)).encode('ascii')
    client.connect("ipc://%s-localfe.ipc" % name)
    while True:
        client.send(b"HELLO")
        try:
            reply = client.recv()
        except zmq.ZMQError:
            # interrupted
            return
        tprint("Client-%s: %s" % (i, reply))
        time.sleep(1)


def worker_task(name, i):
    """Worker using REQ socket to do LRU routing"""
    ctx = zmq.Context()
    worker = ctx.socket(zmq.REQ)
    worker.identity = (u"Worker-%s-%s" % (name, i)).encode('ascii')
    worker.connect("ipc://%s-localbe.ipc" % name)

    # Tell broker we're ready for work
    worker.send(b"READY")

    # Process messages as they arrive
    while True:
        try:
            msg = worker.recv_multipart()
        except zmq.ZMQError:
            # interrupted
            return
        tprint("Worker-%s: %s\n" % (i, msg))
        msg[-1] = b"OK"
        worker.send_multipart(msg)

def main(myself, peers):
    print("I: preparing broker at %s..." % myself)

    # Prepare our context and sockets
    ctx = zmq.Context()

    # Bind cloud frontend to endpoint
    cloudfe = ctx.socket(zmq.ROUTER)
    if not isinstance(myself, bytes):
        ident = myself.encode('ascii')
    else:
        ident = myself
    cloudfe.identity = ident
    cloudfe.bind("ipc://%s-cloud.ipc" % myself)

    # Connect cloud backend to all peers
    cloudbe = ctx.socket(zmq.ROUTER)
    cloudbe.identity = ident
    for peer in peers:
        tprint("I: connecting to cloud frontend at %s" % peer)
        cloudbe.connect("ipc://%s-cloud.ipc" % peer)


    if not isinstance(peers[0], bytes):
        peers = [peer.encode('ascii') for peer in peers]

    # Prepare local frontend and backend
    localfe = ctx.socket(zmq.ROUTER)
    localfe.bind("ipc://%s-localfe.ipc" % myself)
    localbe = ctx.socket(zmq.ROUTER)
    localbe.bind("ipc://%s-localbe.ipc" % myself)

    # Get user to tell us when we can start...
    raw_input("Press Enter when all brokers are started: ")

    # create workers and clients threads
    for i in range(NBR_WORKERS):
        thread = threading.Thread(target=worker_task, args=(myself, i))
        thread.daemon = True
        thread.start()

    for i in range(NBR_CLIENTS):
        thread_c = threading.Thread(target=client_task, args=(myself, i))
        thread_c.daemon = True
        thread_c.start()

    # Interesting part
    # -------------------------------------------------------------
    # Request-reply flow
    # - Poll backends and process local/cloud replies
    # - While worker available, route localfe to local or cloud

    workers = []

    # setup pollers
    pollerbe = zmq.Poller()
    pollerbe.register(localbe, zmq.POLLIN)
    pollerbe.register(cloudbe, zmq.POLLIN)

    pollerfe = zmq.Poller()
    pollerfe.register(localfe, zmq.POLLIN)
    pollerfe.register(cloudfe, zmq.POLLIN)

    while True:
        # If we have no workers anyhow, wait indefinitely
        try:
            events = dict(pollerbe.poll(1000 if workers else None))
        except zmq.ZMQError:
            break  # interrupted

        # Handle reply from local worker
        msg = None
        if localbe in events:
            msg = localbe.recv_multipart()
            (address, empty), msg = msg[:2], msg[2:]
            workers.append(address)

            # If it's READY, don't route the message any further
            if msg[-1] == b'READY':
                msg = None
        elif cloudbe in events:
            msg = cloudbe.recv_multipart()
            (address, empty), msg = msg[:2], msg[2:]

            # We don't use peer broker address for anything

        if msg is not None:
            address = msg[0]
            if address in peers:
                # Route reply to cloud if it's addressed to a broker
                cloudfe.send_multipart(msg)
            else:
                # Route reply to client if we still need to
                localfe.send_multipart(msg)

        # Now route as many clients requests as we can handle
        while workers:
            events = dict(pollerfe.poll(0))
            reroutable = False
            # We'll do peer brokers first, to prevent starvation
            # 先處理 cloudfe, cloudfe 的 msg 不能再繞回原本的 peer
            if cloudfe in events:
                msg = cloudfe.recv_multipart()
                reroutable = False
            elif localfe in events:
                msg = localfe.recv_multipart()
                reroutable = True
            else:
                break  # No work, go back to backends

            # If reroutable, send to cloud 20% of the time
            # Here we'd normally use cloud status information
            # 以 20% 比例送給 cloudbe
            if reroutable and peers and random.randint(0, 4) == 0:
                # Route to random broker peer
                msg = [random.choice(peers), b''] + msg
                cloudbe.send_multipart(msg)
            else:
                msg = [workers.pop(0), b''] + msg
                localbe.send_multipart(msg)

if __name__ == '__main__':
    if len(sys.argv) >= 2:
        main(myself=sys.argv[1], peers=sys.argv[2:])
    else:
        print("Usage: peering2.py <me> [<peer_1> [... <peer_N>]]")
        sys.exit(1)

執行結果,peer task 會增加 b'p2', b'' 兩個 frame


$ python peering2.py p2 p1
I: preparing broker at p2...
I: connecting to cloud frontend at p1
Press Enter when all brokers are started:
Worker-0: [b'Client-p2-0', b'', b'HELLO']


$ python peering2.py p1 p2
I: preparing broker at p1...
I: connecting to cloud frontend at p2
Press Enter when all brokers are started:
Worker-0: [b'p2', b'', b'Client-p2-1', b'', b'HELLO']

Worker-1: [b'p2', b'', b'Client-p2-5', b'', b'HELLO']

  • C 語言如果使用 CZMQ library 會比較容易開發
  • 因為沒有從 peer 取得 state information,所以是假設遠端 workers 都有啟動。

Putting it all together

將 peering1 及 peering2 合併在一起


peering3.py


#
#   Broker peering simulation (part 3) in Python
#   Prototypes the full flow of status and tasks
#
#   While this example runs in a single process, that is just to make
#   it easier to start and stop the example. Each thread has its own
#   context and conceptually acts as a separate process.

import random
import sys
import threading
import time

import zmq


NBR_CLIENTS = 10
NBR_WORKERS = 5

def asbytes(obj):
    s = str(obj)
    if str is not bytes:
        # Python 3
        s = s.encode('ascii')
    return s

def client_task(name, i):
    """Request-reply client using REQ socket"""
    ctx = zmq.Context()
    client = ctx.socket(zmq.REQ)
    client.identity = (u"Client-%s-%s" % (name, i)).encode('ascii')

    client.connect("ipc://%s-localfe.ipc" % name)
    monitor = ctx.socket(zmq.PUSH)
    monitor.connect("ipc://%s-monitor.ipc" % name)

    poller = zmq.Poller()
    poller.register(client, zmq.POLLIN)
    while True:
        time.sleep(random.randint(0, 5))
        for _ in range(random.randint(0, 15)):
            # send request with random hex ID
            task_id = u"%04X" % random.randint(0, 10000)
            client.send_string(task_id)

            # wait max 10 seconds for a reply, then complain
            # 等 10s,看看有沒有收到 message,沒有訊息就是發生 error
            try:
                events = dict(poller.poll(10000))
            except zmq.ZMQError:
                return # interrupted

            if events:
                reply = client.recv_string()
                assert reply == task_id, "expected %s, got %s" % (task_id, reply)
                monitor.send_string(reply)
            else:
                monitor.send_string(u"E: CLIENT EXIT - lost task %s" % task_id)
                return

def worker_task(name, i):
    """Worker using REQ socket to do LRU routing"""
    ctx = zmq.Context()
    worker = ctx.socket(zmq.REQ)
    worker.identity = ("Worker-%s-%s" % (name, i)).encode('ascii')
    worker.connect("ipc://%s-localbe.ipc" % name)

    # Tell broker we're ready for work
    worker.send(b"READY")

    # Process messages as they arrive
    while True:
        try:
            msg = worker.recv_multipart()
        except zmq.ZMQError:
            # interrupted
            return
        # Workers are busy for 0/1 seconds
        time.sleep(random.randint(0, 1))
        worker.send_multipart(msg)

def main(myself, peers):
    print("I: preparing broker at %s..." % myself)

    # Prepare our context and sockets
    ctx = zmq.Context()

    # Bind cloud frontend to endpoint
    cloudfe = ctx.socket(zmq.ROUTER)
    cloudfe.setsockopt(zmq.IDENTITY, myself)
    cloudfe.bind("ipc://%s-cloud.ipc" % myself)

    # Bind state backend / publisher to endpoint
    statebe = ctx.socket(zmq.PUB)
    statebe.bind("ipc://%s-state.ipc" % myself)

    # Connect cloud and state backends to all peers
    cloudbe = ctx.socket(zmq.ROUTER)
    statefe = ctx.socket(zmq.SUB)

    # statefe 註冊取得所有 messages
    statefe.setsockopt(zmq.SUBSCRIBE, b"")
    cloudbe.setsockopt(zmq.IDENTITY, myself)

    for peer in peers:
        print("I: connecting to cloud frontend at %s" % peer)
        cloudbe.connect("ipc://%s-cloud.ipc" % peer)
        print("I: connecting to state backend at %s" % peer)
        statefe.connect("ipc://%s-state.ipc" % peer)

    # Prepare local frontend and backend
    localfe = ctx.socket(zmq.ROUTER)
    localfe.bind("ipc://%s-localfe.ipc" % myself)
    localbe = ctx.socket(zmq.ROUTER)
    localbe.bind("ipc://%s-localbe.ipc" % myself)

    # Prepare monitor socket
    monitor = ctx.socket(zmq.PULL)
    monitor.bind("ipc://%s-monitor.ipc" % myself)

    # Get user to tell us when we can start...
    # raw_input("Press Enter when all brokers are started: ")

    # create workers and clients threads
    for i in range(NBR_WORKERS):
        thread = threading.Thread(target=worker_task, args=(myself, i))
        thread.daemon = True
        thread.start()

    for i in range(NBR_CLIENTS):
        thread_c = threading.Thread(target=client_task, args=(myself, i))
        thread_c.daemon = True
        thread_c.start()

    # Interesting part
    # -------------------------------------------------------------
    # Publish-subscribe flow
    # - Poll statefe and process capacity updates
    # - Each time capacity changes, broadcast new value
    # Request-reply flow
    # - Poll primary and process local/cloud replies
    # - While worker available, route localfe to local or cloud

    local_capacity = 0
    cloud_capacity = 0
    workers = []

    # setup backend poller
    pollerbe = zmq.Poller()
    pollerbe.register(localbe, zmq.POLLIN)
    pollerbe.register(cloudbe, zmq.POLLIN)
    pollerbe.register(statefe, zmq.POLLIN)
    pollerbe.register(monitor, zmq.POLLIN)

    while True:
        # If we have no workers anyhow, wait indefinitely
        try:
            events = dict(pollerbe.poll(1000 if local_capacity else None))
        except zmq.ZMQError:
            break  # interrupted

        previous = local_capacity
        # Handle reply from local worker
        msg = None
        if localbe in events:
            msg = localbe.recv_multipart()
            (address, empty), msg = msg[:2], msg[2:]
            workers.append(address)
            local_capacity += 1

            # If it's READY, don't route the message any further
            if msg[-1] == b'READY':
                msg = None
        elif cloudbe in events:
            msg = cloudbe.recv_multipart()
            (address, empty), msg = msg[:2], msg[2:]

            # We don't use peer broker address for anything

        if msg is not None:
            address = msg[0]
            if address in peers:
                # Route reply to cloud if it's addressed to a broker
                cloudfe.send_multipart(msg)
            else:
                # Route reply to client if we still need to
                localfe.send_multipart(msg)

        # Handle capacity updates
        if statefe in events:
            peer, s = statefe.recv_multipart()
            cloud_capacity = int(s)

        # handle monitor message
        if monitor in events:
            print(monitor.recv_string())


        # Now route as many clients requests as we can handle
        # - If we have local capacity we poll both localfe and cloudfe
        # - If we have cloud capacity only, we poll just localfe
        # - Route any request locally if we can, else to cloud
        while local_capacity + cloud_capacity:
            secondary = zmq.Poller()
            secondary.register(localfe, zmq.POLLIN)
            if local_capacity:
                secondary.register(cloudfe, zmq.POLLIN)
            events = dict(secondary.poll(0))

            # We'll do peer brokers first, to prevent starvation
            # 先處理 cloudfe, task from peers
            if cloudfe in events:
                msg = cloudfe.recv_multipart()
            elif localfe in events:
                msg = localfe.recv_multipart()
            else:
                break  # No work, go back to backends

            if local_capacity:
                msg = [workers.pop(0), b''] + msg
                localbe.send_multipart(msg)
                local_capacity -= 1
            else:
                # Route to random broker peer
                msg = [random.choice(peers), b''] + msg
                cloudbe.send_multipart(msg)

        # local_capacity 異動時,發送 state information 到 statebe
        if local_capacity != previous:
            statebe.send_multipart([myself, asbytes(local_capacity)])

if __name__ == '__main__':
    if len(sys.argv) >= 2:
        myself = asbytes(sys.argv[1])
        main(myself, peers=[ asbytes(a) for a in sys.argv[2:] ])
    else:
        print("Usage: peering3.py <me> [<peer_1> [... <peer_N>]]")
        sys.exit(1)

  • client thread 會偵測並回報失敗的 request。是以 10s 內沒有取得 response 的方式偵測。
  • client thread 不會直接 print,而是發送給 monitor socket (PUSH),然後另一端 POLL 進行 print。這是一種用 ZMQ 處理 logging 的方式。
  • client 會模擬不同的 loading,偶爾讓 cluster 達到 100%,這時候所有 task 會轉發到 cloud
  • main loop 有兩個 poolsets,事實上是三個: information, backend, frontend。

開發時要注意的問題


  1. client 可能會停住,因為 request/replies 遺失造成的。ROUTER socket 會丟棄無法 route 的訊息,對應方法是修改 client thread,並偵測遺失的問題。另外加上在 main loop 發送前及接收後,都呼叫 zmsg_dump(),直到問題消失為止。
  2. main loop 可能會讀到超過 1 個 ready socket,這是因為第一個訊息遺失了,以只讀取第一個 ready socket 解決
  3. zmsg class 不能將 UUIDs 正確編碼為 C strings,因為 UUIDs 失敗時,會變成 0 bytes,將 UUIDs 編碼為 printable hex strings 修正這個問題。

  4. 程式沒有偵測 cloud peer 消失的問題,如果啟動多個 peers後,停止一個,因為他已經有廣播 capacity,其他 peers 會發送 task 給他,這會造成 client 會發出 lost message report。解決方式為:只保存 capacity information 一小段時間。或是增加 request-reply chain 的 reliability。


References


ØMQ - The Guide

沒有留言:

張貼留言