トランスポートとプロトコル

まえがき

トランスポートとプロトコルはloop.create_connection() のような低水準の イベントループ API から使われます。これらはコールバックに基づくプログラミングスタイルを使うことでネットワークや IPC プロトコル (HTTP など) の高性能な実装を可能にします。

基本的にトランスポートとプロトコルはライブラリやフレームワークからのみ使われるべきであり、高水準の asyncio アプリケーションから使われるものではありません。

このドキュメントはTransportsProtocols を扱います。

はじめに

最上位の観点からは、トランスポートはどのように バイトデータを送信するかに影響を与え、いっぽうプロトコルはどの バイトデータを送信するかを決定します (また、ある程度はいつ も決定します) 。

同じことを違う言い方で表現します: トランスポートはソケット (または同様の I/O 端点) の抽象化であり、プロトコルはトランスポートから見たときのアプリケーションの抽象化です。

さらにもう一つの見方として、トランスポートとプロトコルの2つのインターフェースは、協調してネットワーク I/O やプロセス間 I/O の抽象インターフェースを定義しています。

トランスポートオブジェクトとプロトコルオブジェクトの間には常に1対1の関係があります: プロトコルはデータを送信するためにトランスポートのメソッドを呼び出し、トランスポートは受信したデータを渡すためにプロトコルのメソッドを呼び出します。

ほとんどの接続に基づくイベントループメソッド (loop.create_connection() など) は通常protocol_factory 引数を受け取り、Transport オブジェクトで表現される確立した接続に対するProtocol オブジェクトを生成するために使います。そのようなメソッドは通常(transport,protocol) タプルを返します。

内容

このページは以下の節から構成されます:

トランスポート

ソースコード:Lib/asyncio/transports.py


トランスポートはさまざまな通信方法を抽象化するためにasyncio が提供するクラス群です。

トランスポートオブジェクトは常にasyncio イベントループ によってインスタンス化されます。

asyncio は TCP, UDP, SSL, およびサブプロセスパイプのトランスポートを実装しています。利用可能なトランスポートのメソッドはトランスポートの種類に依存します。

トランスポートクラスはスレッド安全ではありません

トランスポートのクラス階層構造

classasyncio.BaseTransport

全てのトランスポートの基底クラスです。すべての asyncio トランスポートが共有するメソッドを含んでいます。

classasyncio.WriteTransport(BaseTransport)

書き込み専用の接続に対する基底トランスポートクラスです。

WriteTransport クラスのインスタンスはloop.connect_write_pipe() イベントループメソッドから返され、loop.subprocess_exec() のようなサブプロセスに関連するメソッドから利用されます。

classasyncio.ReadTransport(BaseTransport)

読み込み専用の接続に対する基底トランスポートクラスです。

Instances of theReadTransport class are returned fromtheloop.connect_read_pipe() event loop method andare also used by subprocess-related methods likeloop.subprocess_exec().

classasyncio.Transport(WriteTransport,ReadTransport)

TCP 接続のような、読み出しと書き込みの双方向のトランスポートを表現するインターフェースです。

ユーザーはトランスポートを直接インスタンス化することはありません; ユーザーは、ユーティリティ関数にプロトコルファクトリとその他トランスポートとプロトコルを作成するために必要な情報を渡して呼び出します。

Transport クラスのインスタンスは、loop.create_connection(),loop.create_unix_connection(),loop.create_server(),loop.sendfile() などのイベントループメソッドから返されたり、これらのメソッドから利用されたりします。

classasyncio.DatagramTransport(BaseTransport)

データグラム (UDP) 接続のためのトランスポートです。

DatagramTransport クラスのインスタンスはloop.create_datagram_endpoint() イベントループメソッドから返されます。

classasyncio.SubprocessTransport(BaseTransport)

親プロセスとその子プロセスの間の接続を表現する抽象クラスです。

SubprocessTransport クラスのインスタンスはloop.subprocess_shell()loop.subprocess_exec() の2つのイベントループメソッドから返されます。

基底トランスポート

BaseTransport.close()

トランスポートをクローズします。

トランスポートが発信データのバッファーを持っていた場合、バッファーされたデータは非同期にフラッシュされます。それ以降データは受信されません。バッファーされていたデータがすべてフラッシュされた後、そのプロトコルのprotocol.connection_lost() メソッドが引数None で呼び出されます。一度閉じたトランスポートは、使用されるべきではありません。

BaseTransport.is_closing()

トランスポートを閉じている最中か閉じていた場合True を返します。

BaseTransport.get_extra_info(name,default=None)

トランスポートまたはそれが背後で利用しているリソースの情報を返します。

name は取得するトランスポート特有の情報を表す文字列です。

default は、取得したい情報が取得可能でなかったり、サードパーティのイベントループ実装や現在のプラットフォームがその情報の問い合わせをサポートしていない場合に返される値です。

例えば、以下のコードはトランスポート内のソケットオブジェクトを取得しようとします:

sock=transport.get_extra_info('socket')ifsockisnotNone:print(sock.getsockopt(...))

いくつかのトランスポートで問い合わせ可能な情報のカテゴリを示します:

  • ソケット:

  • SSL ソケット:

    • 'compression': 圧縮アルゴリズムで、ssl.SSLSocket.compression() の結果になります。圧縮されていないときはNone になります

    • 'cipher': 3 個の値 (使用されている暗号アルゴリズムの名称、使用が定義されている SSL プロトコルのバージョン、および使用されている秘密鍵のビット数) からなるタプルで、ssl.SSLSocket.cipher() の結果になります

    • 'peercert': ピアの証明書で、ssl.SSLSocket.getpeercert() の結果になります

    • 'sslcontext':ssl.SSLContext のインスタンスになります

    • 'ssl_object':ssl.SSLObject またはssl.SSLSocket インスタンス

  • パイプ:

    • 'pipe': パイプオブジェクトです

  • サブプロセス:

BaseTransport.set_protocol(protocol)

トランスポートに新しいプロトコルを設定します。

プロトコルの切り替えは、両方のプロトコルのドキュメントで切り替えがサポートされている場合にのみ行うべきです。

BaseTransport.get_protocol()

現在のプロトコルを返します。

読み出し専用のトランスポート

ReadTransport.is_reading()

トランスポートが新しいデータを受信中の場合True を返します。

Added in version 3.7.

ReadTransport.pause_reading()

トランスポートの受信側を一時停止します。resume_reading() メソッドが呼び出されるまでプロトコルのprotocol.data_received() メソッドにデータは渡されません。

バージョン 3.7 で変更:このメソッドはべき等です。すなわちトランスポートがすでに停止していたりクローズしていても呼び出すことができます。

ReadTransport.resume_reading()

受信を再開します。データが読み込み可能になるとプロトコルのprotocol.data_received() メソッドが再び呼び出されるようになります。

バージョン 3.7 で変更:このメソッドはべき等です。すなわちトランスポートがすでにデータを読み込み中であっても呼び出すことができます。

書き込み専用のトランスポート

WriteTransport.abort()

未完了の処理が完了するのを待たず、即座にトランスポートをクローズします。バッファーされているデータは失われます。このメソッドの呼び出し以降データは受信されません。最終的にプロトコルのprotocol.connection_lost() メソッドが引数None で呼び出されます。

WriteTransport.can_write_eof()

トランスポートがwrite_eof() メソッドをサポートしている場合True を返し、そうでない場合はFalse を返します。

WriteTransport.get_write_buffer_size()

トランスポートで使用されている出力バッファーの現在のサイズを返します。

WriteTransport.get_write_buffer_limits()

書き込みフロー制御の最高 および最低 水位点を取得します。(low,high) タプルを返します。ここでlowhigh はバイト数をあらわす正の整数です。

水位点の設定はset_write_buffer_limits() で行います。

Added in version 3.4.2.

WriteTransport.set_write_buffer_limits(high=None,low=None)

書き込みフロー制御の最高 および最低 水位点を設定します。

(バイト数をあらわす) これら2つの値はプロトコルのprotocol.pause_writing()protocol.resume_writing() の2つのメソッドがいつ呼ばれるかを制御します。指定する場合、lowhigh と等しいかまたはhigh より小さくなければなりません。また、highlow も負の値を指定することはできません。

pause_writing() はバッファーサイズがhigh の値以上になった場合に呼び出されます。書き込みが一時停止している場合、バッファーサイズがlow の値以下になるとresume_writing() メソッドが呼び出されます。

デフォルト値は実装固有になります。high のみ与えられた場合、lowhigh 以下の実装固有のデフォルト値になります。high をゼロに設定するとlow も強制的にゼロになり、バッファーが空でなくなるとすぐにpause_writing() メソッドが呼び出されるようになります。low をゼロに設定すると、バッファーが空になresume_writing() が呼び出されるようになります。どちらかにゼロを設定することは I/O と計算を並行に実行する機会を減少させるため、一般に最適ではありません。

上限値と下限値を取得するにはget_write_buffer_limits() メソッドを使ってください。

WriteTransport.write(data)

トランスポートにバイト列data を書き込みます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

WriteTransport.writelines(list_of_data)

バイト列のデータのリスト (またはイテラブル) をトランスポートに書き込みます。この振る舞いはイテラブルを yield して各要素でwrite() を呼び出すことと等価ですが、より効率的な実装となる場合があります。

WriteTransport.write_eof()

バッファーされた全てのデータをフラッシュした後トランスポートの送信側をクローズします。送信側をクローズした後もデータを受信することは可能です。

このメソッドはトランスポート (例えば SSL) がハーフクローズドな接続をサポートしていない場合NotImplementedError を送出します。

データグラムトランスポート

DatagramTransport.sendto(data,addr=None)

リモートピアaddr (トランスポート依存の対象アドレス) にバイト列data を送信します。addrNone の場合、データはトランスポートの作成時に指定された送信先に送られます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

バージョン 3.13 で変更:このメソッドは、長さがゼロのデータグラムを送信するために、空のバイトオブジェクトで呼び出すこともできます。フロー制御に使用されるバッファサイズ計算も、データグラムヘッダーを考慮するように更新されました。

DatagramTransport.abort()

未完了の処理が完了するのを待たず、即座にトランスポートをクローズします。バッファーされているデータは失われます。このメソッドの呼び出し以降データは受信されません。最終的にプロトコルのprotocol.connection_lost() メソッドが引数None で呼び出されます。

サブプロセス化されたトランスポート

SubprocessTransport.get_pid()

サブプロセスのプロセス ID (整数) を返します。

SubprocessTransport.get_pipe_transport(fd)

整数のファイル記述子fd に該当する通信パイプのトランスポートを返します:

  • 0: 標準入力 (stdin) の読み込み可能ストリーミングトランスポート。サブプロセスがstdin=PIPE で作成されていない場合はNone

  • 1: 標準出力 (stdout) の書き込み可能ストリーミングトランスポート。サブプロセスがstdout=PIPE で作成されていない場合はNone

  • 2: 標準エラー出力 (stderr) の書き込み可能ストリーミングトランスポート。サブプロセスがstderr=PIPE で作成されていない場合はNone

  • その他のfd:None

SubprocessTransport.get_returncode()

サブプロセスのリターンコードを整数で返します。サブプロセスがリターンしなかった場合はNone を返します。subprocess.Popen.returncode 属性と同じです。

SubprocessTransport.kill()

サブプロセスを強制終了 (kill) します。

POSIX システムでは、この関数はサブプロセスに SIGKILL を送信します。Windows では、このメソッドはterminate() の別名です。

subprocess.Popen.kill() も参照してください。

SubprocessTransport.send_signal(signal)

サブプロセスにシグナルsignal を送信します。subprocess.Popen.send_signal() と同じです。

SubprocessTransport.terminate()

サブプロセスを停止します。

POSIX システムでは、このメソッドはサブプロセスにSIGTERM を送信します。Windows では、Windows API 関数TerminateProcess() がサブプロセスを停止するために呼び出されます。

subprocess.Popen.terminate() も参照してください。

SubprocessTransport.close()

kill() メソッドを呼び出すことでサブプロセスを強制終了します。

サブプロセスがまだリターンしていない場合、stdin,stdout, およびstderr の各パイプのトランスポートをクローズします。

プロトコル

ソースコード:Lib/asyncio/protocols.py


asyncio はネットワークプロトコルを実装するために使う抽象基底クラス群を提供します。これらのクラスはトランスポート と組み合わせて使うことが想定されています。

抽象基底プロトコルクラスの派生クラスはメソッドの一部または全てを実装することができます。これらのメソッドは全てコールバックです: それらは、データを受信した、などの決まったイベントに対してトランスポートから呼び出されます。基底プロトコルメソッドは対応するトランスポートから呼び出されるべきです。

基底プロトコル

classasyncio.BaseProtocol

全てのプロトコルクラスが共有する全てのメソッドを持った基底プロトコルクラスです。

classasyncio.Protocol(BaseProtocol)

ストリーミングプロトコル (TCP, Unix ソケットなど) を実装するための基底クラスです。

classasyncio.BufferedProtocol(BaseProtocol)

受信バッファーを手動で制御するストリーミングプロトコルを実装するための基底クラスです。

classasyncio.DatagramProtocol(BaseProtocol)

データグラム (UDP) プロトコルを実装するための基底クラスです。

classasyncio.SubprocessProtocol(BaseProtocol)

子プロセスと (一方向パイプを通じて) 通信するプロトコルを実装するための基底クラスです。

基底プロトコル

全ての asyncio プロトコルは基底プロトコルのコールバックを実装することができます。

通信のコールバック

コネクションコールバックは全てのプロトコルから、成功したコネクションそれぞれにつきただ一度だけ呼び出されます。その他の全てのプロトコルコールバックはこれら2つのメソッドの間に呼び出すことができます。

BaseProtocol.connection_made(transport)

コネクションが作成されたときに呼び出されます。

引数transport はコネクションをあらわすトランスポートです。プロトコルはトランスポートへの参照を保存する責任を負います。

BaseProtocol.connection_lost(exc)

コネクションが失われた、あるいはクローズされたときに呼び出されます。

引数は例外オブジェクトまたはNone になります。None のとき、通常の EOF が受信されたか、あるいはコネクションがこちら側から中止またはクローズされたことを意味します。

フロー制御コールバック

フロー制御コールバックは、プロトコルによって実行される書き込み処理を停止または再開するためにトランスポートから呼び出されます。

詳しくはset_write_buffer_limits() メソッドのドキュメントを参照してください。

BaseProtocol.pause_writing()

トランスポートのバッファーサイズが最高水位点 (high watermark) を超えたときに呼び出されます。

BaseProtocol.resume_writing()

トランスポートのバッファーサイズが最低水位点 (low watermark) に達したきに呼び出されます。

バッファーサイズが最高水位点と等しい場合、pause_writing() は呼び出されません: バッファーサイズは必ず制限値を超えなければなりません。

それに対して、resume_writing() はバッファーサイズが最低水位点と等しいかそれよりも小さい場合に呼び出されます。これらの境界条件は、どちらの基準値もゼロである場合の処理が期待通りとなることを保証するために重要です。

ストリーミングプロトコル

loop.create_server(),loop.create_unix_server(),loop.create_connection(),loop.create_unix_connection(),loop.connect_accepted_socket(),loop.connect_read_pipe(), そしてloop.connect_write_pipe() などのイベントメソッドはストリーミングプロトコルを返すファクトリを受け付けます。

Protocol.data_received(data)

データを受信したときに呼び出されます。data は受信したデータを含む空ではないバイト列オブジェクトになります。

データがバッファーされるか、チャンキングされるか、または再構築されるかはトランスポートに依存します。一般には、特定のセマンティクスを信頼するべきではなく、代わりにデータのパースを全般的かつ柔軟に行うべきです。ただし、データは常に正しい順序で受信されます。

このメソッドは、コネクションがオープンである間は何度でも呼び出すことができます。

いっぽうで、protocol.eof_received() メソッドは最大でも一度だけ呼び出されます。いったんeof_received() が呼び出されると、それ以降data_received() は呼び出されません。

Protocol.eof_received()

コネクションの相手方がこれ以上データを送信しないことを伝えてきたとき (例えば相手方が asyncio を使用しており、transport.write_eof() を呼び出した場合) に呼び出されます。

このメソッドは (None を含む) 偽値 を返すことがあり、その場合トランスポートは自身をクローズします。一方メソッドが真値を返す場合は、利用しているプロトコルがトランスポートをクローズするかどうかを決めます。デフォルトの実装はNone を返すため、コネクションは暗黙のうちにクローズされます。

SSL を含む一部のトランスポートはハーフクローズ接続をサポートしません。そのような場合このメソッドが真値を返すとコネクションはクローズされます。

ステートマシン:

start -> connection_made    [-> data_received]*    [-> eof_received]?-> connection_lost -> end

バッファリングされたストリーミングプロトコル

Added in version 3.7.

バッファー付きプロトコルはストリーミングプロトコル をサポートするイベントループメソッドで利用することができます。

BufferedProtocol 実装は受信バッファーの手動での明示的な割り当てや制御を可能にします。イベントループはプロトコルにより提供されるバッファを利用することにより不要なデータのコピーを避けることができます。これにより大量のデータを受信するプロトコルにおいて顕著なパフォーマンスの向上をもたらします。精巧なプロトコル実装によりバッファー割り当ての数を劇的に減少させることができます。

以下に示すコールバックはBufferedProtocol インスタンスに対して呼び出されます:

BufferedProtocol.get_buffer(sizehint)

新しい受信バッファを割り当てるために呼び出します。

sizehint は返されるバッファーの推奨される最小サイズです。sizehint によって推奨された値より小さい、または大きいサイズのバッファーを返すことは容認されています。 -1 がセットされた場合、バッファーサイズは任意となります。サイズがゼロのバッファーを返すとエラーになります。

get_buffer()バッファープロトコル を実装したオブジェクトを返さなければなりません。

BufferedProtocol.buffer_updated(nbytes)

受信データによりバッファが更新された場合に呼び出されます。

nbytes はバッファに書き込まれた総バイト数です。

BufferedProtocol.eof_received()

protocol.eof_received() メソッドのドキュメントを参照してください。

コネクションの間、get_buffer() は何度でも呼び出すことができます。しかしprotocol.eof_received() が呼び出されるのは最大でも1回で、もし呼び出されると、それ以降get_buffer()buffer_updated() が呼び出されることはありません。

ステートマシン:

start -> connection_made    [-> get_buffer        [-> buffer_updated]?    ]*    [-> eof_received]?-> connection_lost -> end

データグラムプロトコル

データグラムプロトコルのインスタンスはloop.create_datagram_endpoint() メソッドに渡されたプロトコルファクトリによって生成されるべきです。

DatagramProtocol.datagram_received(data,addr)

データグラムを受信したときに呼び出されます。data は受信データを含むバイトオブジェクトです。addr はデータを送信するピアのアドレスです; 正確な形式はトランスポートに依存します。

DatagramProtocol.error_received(exc)

直前の送信あるいは受信がOSError を送出したときに呼び出されます。excOSError のインスタンスになります。

このメソッドが呼ばれるのは、トランスポート (UDP など) がデータグラムを受信側に配信できなかったことが検出されたなどの、まれな場合においてのみです。ほとんどの場合、データグラムが配信できなければそのまま通知されることなく破棄されます。

注釈

BSD システム (macOS, FreeBSD など) ではフロー制御はサポートされていません。これは非常に多くのパケットを書き込もうとしたことによる送信の失敗を検出する信頼できる方法が存在しないためです。

ソケットは常に '準備ができた状態' のように振る舞いますが、超過したパケットは破棄されます。この場合errnoerrno.ENOBUFS に設定したOSError 例外が送出されることがあります。もし例外が送出された場合はDatagramProtocol.error_received() に通知されますが、送出されない場合は単に無視されます。

サブプロセスプロトコル

サブプロセスプロトコルのインスタンスはloop.subprocess_exec()loop.subprocess_shell() メソッドに渡されたプロトコルファクトリにより生成されるべきです。

SubprocessProtocol.pipe_data_received(fd,data)

子プロセスが標準出力または標準エラー出力のパイプにデータを書き込んだ時に呼び出されます。

fd はパイプのファイル記述子を表す整数です。

data は受信データを含む空でないバイトオブジェクトです。

SubprocessProtocol.pipe_connection_lost(fd,exc)

子プロセスと通信するパイプのいずれかがクローズされたときに呼び出されます。

fd はクローズされたファイル記述子を表す整数です。

SubprocessProtocol.process_exited()

子プロセスが終了したときに呼び出されます。

これは、pipe_data_received()pipe_connection_lost() メソッドの前に呼び出すことができます。

使用例

TCP エコーサーバー

loop.create_server() メソッドを使って TCP エコーサーバーを生成し、受信したデータをそのまま送り返して、最後にコネクションをクローズします:

importasyncioclassEchoServerProtocol(asyncio.Protocol):defconnection_made(self,transport):peername=transport.get_extra_info('peername')print('Connection from{}'.format(peername))self.transport=transportdefdata_received(self,data):message=data.decode()print('Data received:{!r}'.format(message))print('Send:{!r}'.format(message))self.transport.write(data)print('Close the client socket')self.transport.close()asyncdefmain():# Get a reference to the event loop as we plan to use# low-level APIs.loop=asyncio.get_running_loop()server=awaitloop.create_server(EchoServerProtocol,'127.0.0.1',8888)asyncwithserver:awaitserver.serve_forever()asyncio.run(main())

参考

ストリームを使った TCP エコーサーバー の例では高水準のasyncio.start_server() 関数を使っています。

TCP エコークライアント

loop.create_connection() メソッドを使った TCP エコークライアントは、データを送信したあとコネクションがクローズされるまで待機します:

importasyncioclassEchoClientProtocol(asyncio.Protocol):def__init__(self,message,on_con_lost):self.message=messageself.on_con_lost=on_con_lostdefconnection_made(self,transport):transport.write(self.message.encode())print('Data sent:{!r}'.format(self.message))defdata_received(self,data):print('Data received:{!r}'.format(data.decode()))defconnection_lost(self,exc):print('The server closed the connection')self.on_con_lost.set_result(True)asyncdefmain():# Get a reference to the event loop as we plan to use# low-level APIs.loop=asyncio.get_running_loop()on_con_lost=loop.create_future()message='Hello World!'transport,protocol=awaitloop.create_connection(lambda:EchoClientProtocol(message,on_con_lost),'127.0.0.1',8888)# Wait until the protocol signals that the connection# is lost and close the transport.try:awaiton_con_lostfinally:transport.close()asyncio.run(main())

参考

ストリームを使った TCP エコークライアント の例では高水準のasyncio.open_connection() 関数を使っています。

UDP エコーサーバー

loop.create_datagram_endpoint() メソッドを使った UDP エコーサーバーは受信したデータをそのまま送り返します:

importasyncioclassEchoServerProtocol:defconnection_made(self,transport):self.transport=transportdefdatagram_received(self,data,addr):message=data.decode()print('Received%r from%s'%(message,addr))print('Send%r to%s'%(message,addr))self.transport.sendto(data,addr)asyncdefmain():print("Starting UDP server")# Get a reference to the event loop as we plan to use# low-level APIs.loop=asyncio.get_running_loop()# One protocol instance will be created to serve all# client requests.transport,protocol=awaitloop.create_datagram_endpoint(EchoServerProtocol,local_addr=('127.0.0.1',9999))try:awaitasyncio.sleep(3600)# Serve for 1 hour.finally:transport.close()asyncio.run(main())

UDP エコークライアント

loop.create_datagram_endpoint() メソッドを使った UDP エコークライアントはデータを送信し、応答を受信するとトランスポートをクローズします:

importasyncioclassEchoClientProtocol:def__init__(self,message,on_con_lost):self.message=messageself.on_con_lost=on_con_lostself.transport=Nonedefconnection_made(self,transport):self.transport=transportprint('Send:',self.message)self.transport.sendto(self.message.encode())defdatagram_received(self,data,addr):print("Received:",data.decode())print("Close the socket")self.transport.close()deferror_received(self,exc):print('Error received:',exc)defconnection_lost(self,exc):print("Connection closed")self.on_con_lost.set_result(True)asyncdefmain():# Get a reference to the event loop as we plan to use# low-level APIs.loop=asyncio.get_running_loop()on_con_lost=loop.create_future()message="Hello World!"transport,protocol=awaitloop.create_datagram_endpoint(lambda:EchoClientProtocol(message,on_con_lost),remote_addr=('127.0.0.1',9999))try:awaiton_con_lostfinally:transport.close()asyncio.run(main())

既存のソケットへの接続

プロトコルを設定したloop.create_connection() メソッドを使ってソケットがデータを受信するまで待機します:

importasyncioimportsocketclassMyProtocol(asyncio.Protocol):def__init__(self,on_con_lost):self.transport=Noneself.on_con_lost=on_con_lostdefconnection_made(self,transport):self.transport=transportdefdata_received(self,data):print("Received:",data.decode())# We are done: close the transport;# connection_lost() will be called automatically.self.transport.close()defconnection_lost(self,exc):# The socket has been closedself.on_con_lost.set_result(True)asyncdefmain():# Get a reference to the event loop as we plan to use# low-level APIs.loop=asyncio.get_running_loop()on_con_lost=loop.create_future()# Create a pair of connected socketsrsock,wsock=socket.socketpair()# Register the socket to wait for data.transport,protocol=awaitloop.create_connection(lambda:MyProtocol(on_con_lost),sock=rsock)# Simulate the reception of data from the network.loop.call_soon(wsock.send,'abc'.encode())try:awaitprotocol.on_con_lostfinally:transport.close()wsock.close()asyncio.run(main())

参考

ファイル記述子の読み込みイベントを監視する 例では低レベルのloop.add_reader() メソッドを使ってファイル記述子 (FD) を登録しています。

ストリームを使ってデータを待ち受けるオープンなソケットを登録する 例ではコルーチン内でopen_connection() 関数によって生成されたストリームを使っています。

loop.subprocess_exec() と SubprocessProtocol

サブプロセスからの出力を受け取り、サブプロセスが終了するまで待機するために使われるサブプロセスプロトコルの例です。

サブプロセスはloop.subprocess_exec() メソッドにより生成されます:

importasyncioimportsysclassDateProtocol(asyncio.SubprocessProtocol):def__init__(self,exit_future):self.exit_future=exit_futureself.output=bytearray()self.pipe_closed=Falseself.exited=Falsedefpipe_connection_lost(self,fd,exc):self.pipe_closed=Trueself.check_for_exit()defpipe_data_received(self,fd,data):self.output.extend(data)defprocess_exited(self):self.exited=True# process_exited() method can be called before# pipe_connection_lost() method: wait until both methods are# called.self.check_for_exit()defcheck_for_exit(self):ifself.pipe_closedandself.exited:self.exit_future.set_result(True)asyncdefget_date():# Get a reference to the event loop as we plan to use# low-level APIs.loop=asyncio.get_running_loop()code='import datetime; print(datetime.datetime.now())'exit_future=asyncio.Future(loop=loop)# Create the subprocess controlled by DateProtocol;# redirect the standard output into a pipe.transport,protocol=awaitloop.subprocess_exec(lambda:DateProtocol(exit_future),sys.executable,'-c',code,stdin=None,stderr=None)# Wait for the subprocess exit using the process_exited()# method of the protocol.awaitexit_future# Close the stdout pipe.transport.close()# Read the output which was collected by the# pipe_data_received() method of the protocol.data=bytes(protocol.output)returndata.decode('ascii').rstrip()date=asyncio.run(get_date())print(f"Current date:{date}")

高水準の API を使って書かれた同様の例 も参照してください。