2021/11/8

asyncio - transports and protocols

transport 負責處理 data bytes 的傳輸方法。是 socket (類似 I/O endpoint) 的 abstraction

protocol 決定什麼時候,要傳送哪些 data bytes。是 application 的 abstraction

transport 跟 protocol 物件永遠都是 1:1 的關係,protocol 呼叫 transport methods 傳送資料,transport 呼叫 protocol methods,將收到的資料傳給它

最常用的 event loop method 為 loop.create_connection(),通常以 protocol_factory 為參數,會產生一個 Protocol object 用來處理以 Transport 物件代表的 connection,這個 method 通常回傳 tuple: (transport, protocol)

tcp echo server

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(self.peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('{} received: {!r}'.format(self.peername, message))

        print('{} Send: {!r}'.format(self.peername, message))
        self.transport.write(data)

    def connection_lost(self, exc):
        print('{} closed the connection'.format(self.peername))


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

tcp echo client

import asyncio
# aioconsole 套件,用來取得 console input,一次一行
from aioconsole import ainput


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, on_con_lost):
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def write(self, message):
        self.transport.write(message.encode())
        # print('Data sent: {!r}'.format(message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(on_con_lost),
        '127.0.0.1', 8888)


    # protocol.write('Hello World! 22')
    while True:
        # aioconsole 是從 stdin 讀取一行字串的套件
        cmd = await ainput('')
        if cmd == 'q':
            transport.close()
            break
        else:
            protocol.write(cmd)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

udp echo server

import asyncio


class EchoServerProtocol:
    def __init__(self, on_con_lost):
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

    def connection_lost(self, exc):
        print("server closed")
        self.on_con_lost.set_result(True)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(on_con_lost),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()

try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

udp echo client

import asyncio
# # aioconsole 套件,用來取得 console input,一次一行
from aioconsole import ainput


class EchoClientProtocol:
    def __init__(self, on_con_lost):
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def datagram_send(self, message):
        self.transport.sendto(message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

    # 前一個 send/receive operation 發生 OSError,很少發生
    def error_received(self, exc):
        print('Error received:', exc)

    # 當 connection is lost or closed 的 callback function
    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)

    def close(self):
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    while True:
        # aioconsole 是從 stdin 讀取一行字串的套件
        cmd = await ainput('')
        if cmd == 'q':
            protocol.close()
            break
        else:
            protocol.datagram_send(cmd)


    try:
        # 等待 EchoClientProtocol 在 connection_lost 時,
        # 填入 on_con_lost,將該 connection 結束並交回控制權
        await on_con_lost
    finally:
        transport.close()


try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

tcp echo server with Streams

Streams 是以 async/await 處理網路連線的高階 API,可不透過 callback/low-level protocols, transports 收發資料。

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

tcp echo client with Streams

import asyncio
from aioconsole import ainput

async def tcp_echo_client():
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    message = await ainput('')
    print(f'Send: {message!r}')
    writer.write( message.encode() )
    await writer.drain()

    print(f'wait for repsone')
    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client())

References

Transports and Protocols

udp2tcp

沒有留言:

張貼留言