transport 負責處理 data bytes 的傳輸方法。是 socket (類似 I/O endpoint) 的 abstraction
protocol 決定什麼時候,要傳送哪些 data bytes。是 application 的 abstraction
transport 跟 protocol 物件永遠都是 1:1 的關係,protocol 呼叫 transport methods 傳送資料,transport 呼叫 protocol methods,將收到的資料傳給它
最常用的 event loop method 為 loop.create_connection()
,通常以 protocol_factory 為參數,會產生一個 Protocol object 用來處理以 Transport 物件代表的 connection,這個 method 通常回傳 tuple: (transport, protocol)
。
tcp echo server
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.peername = transport.get_extra_info('peername')
print('Connection from {}'.format(self.peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('{} received: {!r}'.format(self.peername, message))
print('{} Send: {!r}'.format(self.peername, message))
self.transport.write(data)
def connection_lost(self, exc):
print('{} closed the connection'.format(self.peername))
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
tcp echo client
import asyncio
# aioconsole 套件,用來取得 console input,一次一行
from aioconsole import ainput
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def write(self, message):
self.transport.write(message.encode())
# print('Data sent: {!r}'.format(message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(on_con_lost),
'127.0.0.1', 8888)
# protocol.write('Hello World! 22')
while True:
# aioconsole 是從 stdin 讀取一行字串的套件
cmd = await ainput('')
if cmd == 'q':
transport.close()
break
else:
protocol.write(cmd)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
udp echo server
import asyncio
class EchoServerProtocol:
def __init__(self, on_con_lost):
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
def connection_lost(self, exc):
print("server closed")
self.on_con_lost.set_result(True)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(on_con_lost),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
udp echo client
import asyncio
# # aioconsole 套件,用來取得 console input,一次一行
from aioconsole import ainput
class EchoClientProtocol:
def __init__(self, on_con_lost):
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
def datagram_send(self, message):
self.transport.sendto(message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
# 前一個 send/receive operation 發生 OSError,很少發生
def error_received(self, exc):
print('Error received:', exc)
# 當 connection is lost or closed 的 callback function
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
def close(self):
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(on_con_lost),
remote_addr=('127.0.0.1', 9999))
while True:
# aioconsole 是從 stdin 讀取一行字串的套件
cmd = await ainput('')
if cmd == 'q':
protocol.close()
break
else:
protocol.datagram_send(cmd)
try:
# 等待 EchoClientProtocol 在 connection_lost 時,
# 填入 on_con_lost,將該 connection 結束並交回控制權
await on_con_lost
finally:
transport.close()
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
tcp echo server with Streams
Streams 是以 async/await 處理網路連線的高階 API,可不透過 callback/low-level protocols, transports 收發資料。
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
tcp echo client with Streams
import asyncio
from aioconsole import ainput
async def tcp_echo_client():
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
message = await ainput('')
print(f'Send: {message!r}')
writer.write( message.encode() )
await writer.drain()
print(f'wait for repsone')
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client())
沒有留言:
張貼留言