ストリーム

ソースコード:Lib/asyncio/streams.py


ストリームはネットワークコネクションと合わせて動作する高水準の async/await 可能な基本要素です。ストリームはコールバックや低水準のプロトコルやトランスポートを使うことなくデータを送受信することを可能にします。

以下は asyncio ストリームを使って書いた TCP エコークライアントの例です:

importasyncioasyncdeftcp_echo_client(message):reader,writer=awaitasyncio.open_connection('127.0.0.1',8888)print(f'Send:{message!r}')writer.write(message.encode())awaitwriter.drain()data=awaitreader.read(100)print(f'Received:{data.decode()!r}')print('Close the connection')writer.close()awaitwriter.wait_closed()asyncio.run(tcp_echo_client('Hello World!'))

下記の使用例 節も参照してください。

ストリーム関数

以下の asyncio のトップレベル関数はストリームの作成や操作を行うことができます:

asyncasyncio.open_connection(host=None,port=None,*,limit=None,ssl=None,family=0,proto=0,flags=0,sock=None,local_addr=None,server_hostname=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None,happy_eyeballs_delay=None,interleave=None)

ネットワークコネクションを確立し、(reader,writer) のオブジェクトのペアを返します。

戻り値のreaderwriter はそれぞれStreamReaderStreamWriter クラスのインスタンスです。

limit は戻り値のStreamReader インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトではlimit は 64 KiB に設定されます。

残りの引数は直接loop.create_connection() に渡されます。

注釈

Thesock argument transfers ownership of the socket to theStreamWriter created. To close the socket, call itsclose() method.

バージョン 3.7 で変更:Added thessl_handshake_timeout parameter.

バージョン 3.8 で変更:happy_eyeballs_delayinterleave が追加されました。

バージョン 3.10 で変更:loop パラメータが削除されました。

バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。

asyncasyncio.start_server(client_connected_cb,host=None,port=None,*,limit=None,family=socket.AF_UNSPEC,flags=socket.AI_PASSIVE,sock=None,backlog=100,ssl=None,reuse_address=None,reuse_port=None,keep_alive=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None,start_serving=True)

ソケットサーバーを起動します。

client_connected_cb コールバックは新しいクライアントコネクションが確立されるたびに呼び出されます。このコールバックはStreamReaderStreamWriter クラスのインスタンスのペア(reader,writer) を2つの引数として受け取ります。

client_connected_cb には単純な呼び出し可能オブジェクトか、またはコルーチン関数 を指定します; コルーチン関数が指定された場合、コールバックの呼び出しは自動的にTask としてスケジュールされます。

limit は戻り値のStreamReader インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトではlimit は 64 KiB に設定されます。

残りの引数は直接loop.create_server() に渡されます。

注釈

Thesock argument transfers ownership of the socket to theserver created. To close the socket, call the server'sclose() method.

バージョン 3.7 で変更:ssl_handshake_timeoutstart_serving パラメータが追加されました。

バージョン 3.10 で変更:loop パラメータが削除されました。

バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。

バージョン 3.13 で変更:Added thekeep_alive parameter.

Unix ソケット

asyncasyncio.open_unix_connection(path=None,*,limit=None,ssl=None,sock=None,server_hostname=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None)

Unix ソケットコネクションを確立し、(reader,writer) のオブジェクトのペアを返します。

この関数はopen_connection() と似ていますが Unix ソケットに対して動作します。

loop.create_unix_connection() のドキュメントも参照してください。

注釈

Thesock argument transfers ownership of the socket to theStreamWriter created. To close the socket, call itsclose() method.

バージョン 3.7 で変更:Added thessl_handshake_timeout parameter.Thepath parameter can now be apath-like object

バージョン 3.10 で変更:loop パラメータが削除されました。

バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。

asyncasyncio.start_unix_server(client_connected_cb,path=None,*,limit=None,sock=None,backlog=100,ssl=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None,start_serving=True,cleanup_socket=True)

Unix のソケットサーバーを起動します。

start_server() と似ていますが Unix ソケットに対して動作します。

Ifcleanup_socket is true then the Unix socket will automaticallybe removed from the filesystem when the server is closed, unless thesocket has been replaced after the server has been created.

loop.create_unix_server() のドキュメントも参照してください。

注釈

Thesock argument transfers ownership of the socket to theserver created. To close the socket, call the server'sclose() method.

バージョン 3.7 で変更:Added thessl_handshake_timeout andstart_serving parameters.Thepath parameter can now be apath-like object.

バージョン 3.10 で変更:loop パラメータが削除されました。

バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。

バージョン 3.13 で変更:Added thecleanup_socket parameter.

StreamReader

classasyncio.StreamReader

Represents a reader object that provides APIs to read datafrom the IO stream. As anasynchronous iterable, theobject supports theasyncfor statement.

StreamReader オブジェクトを直接インスタンス化することは推奨されません; 代わりにopen_connection()start_server() を使ってください。

feed_eof()

EOF の肯定応答を行います。

asyncread(n=-1)

ストリームから最大n バイト読み込みます。

n が指定されないか-1 が指定されていた場合 EOF になるまで読み込み、読み込まれた全てのbytes を返します。EOF を受信し、かつ内部バッファーが空の場合、空のbytes オブジェクトを返します。

n0 なら、ただちに空のbytes オブジェクトを返します。

n が正なら、内部バッファで最低でも 1 バイトが利用可能になり次第、利用可能な最大nbytes を返します。1 バイトも読み込まないうちに EOF を受信したなら、空のbytes オブジェクトを返します。

asyncreadline()

1 行読み込みます。 "行" とは、\n で終了するバイト列のシーケンスです。

EOF を受信し、かつ\n が見つからない場合、このメソッドは部分的に読み込んだデータを返します。

EOF を受信し、かつ内部バッファーが空の場合、空のbytes オブジェクトを返します。

asyncreadexactly(n)

厳密にn バイトのデータを読み出します。

n バイトを読み出す前に EOF に達した場合IncompleteReadError を送出します。部分的に読み出したデータを取得するにはIncompleteReadError.partial 属性を使ってください。

asyncreaduntil(separator=b'\n')

separator が見つかるまでストリームからデータを読み出します。

成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。

読み出したデータの量が設定したストリームの上限を超えるとLimitOverrunError 例外が送出されます。このときデータは内部バッファーに残され、再度読み出すことができます。

完全な区切り文字が見つかる前に EOF に達するとIncompleteReadError 例外が送出され、内部バッファーがリセットされます。このときIncompleteReadError.partial 属性は区切り文字の一部を含むかもしれません。

Theseparator may also be a tuple of separators. In thiscase the return value will be the shortest possible that has anyseparator as the suffix. For the purposes ofLimitOverrunError,the shortest possible separator is considered to be the one thatmatched.

Added in version 3.5.2.

バージョン 3.13 で変更:Theseparator parameter may now be atuple ofseparators.

at_eof()

バッファーが空でfeed_eof() が呼ばれていた場合True を返します。

StreamWriter

classasyncio.StreamWriter

IO ストリームにデータを書き込むための API を提供するライターオブジェクトを表します。

StreamWriter オブジェクトを直接インスタンス化することは推奨されません; 代わりにopen_connection()start_server() を使ってください。

write(data)

このメソッドは、背後にあるソケットにデータdata を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。

このメソッドはdrain() メソッドと組み合わせて使うべきです:

stream.write(data)awaitstream.drain()
writelines(data)

このメソッドは、背後にあるソケットにバイトデータのリスト (またはイテラブル) を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。

このメソッドはdrain() メソッドと組み合わせて使うべきです:

stream.writelines(lines)awaitstream.drain()
close()

このメソッドはストリームと背後にあるソケットをクローズします。

The method should be used, though not mandatory,along with thewait_closed() method:

stream.close()awaitstream.wait_closed()
can_write_eof()

背後にあるトランスポートがwrite_eof() メソッドをサポートしている場合True を返し、そうでない場合はFalse を返します。

write_eof()

バッファーされた書き込みデータを全て書き込んでから、ストリームの書き込み側終端をクローズします。

transport

背後にある asyncio トランスポートを返します。

get_extra_info(name,default=None)

オプションのトランスポート情報にアクセスします。詳細はBaseTransport.get_extra_info() を参照してください。

asyncdrain()

ストリームへの書き込み再開に適切な状態になるまで待ちます。使用例:

writer.write(data)awaitwriter.drain()

このメソッドは背後にある IO 書き込みバッファーとやりとりを行うフロー制御メソッドです。バッファーのサイズが最高水位点に達した場合、drain() はバッファのサイズが最低水位点を下回るまで減量され、書き込み再開可能になるまで書き込みをブロックします。待ち受けの必要がない場合、drain() は即座にリターンします。

asyncstart_tls(sslcontext,*,server_hostname=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None)

Upgrade an existing stream-based connection to TLS.

引数:

  • sslcontext: 構成済みのSSLContext インスタンスです。

  • server_hostname: 対象のサーバーの証明書との照合に使われるホスト名を設定または上書きします。

  • ssl_handshake_timeout is the time in seconds to wait for the TLShandshake to complete before aborting the connection.60.0 secondsifNone (default).

  • ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdownto complete before aborting the connection.30.0 seconds ifNone(default).

Added in version 3.11.

バージョン 3.12 で変更:ssl_shutdown_timeout パラメータが追加されました。

is_closing()

ストリームがクローズされたか、またはクローズ処理中の場合にTrue を返します。

Added in version 3.7.

asyncwait_closed()

ストリームがクローズされるまで待機します。

Should be called afterclose() to wait until the underlyingconnection is closed, ensuring that all data has been flushedbefore e.g. exiting the program.

Added in version 3.7.

使用例

ストリームを使った TCP Echo クライアント

asyncio.open_connection() 関数を使った TCP Echo クライアントです:

importasyncioasyncdeftcp_echo_client(message):reader,writer=awaitasyncio.open_connection('127.0.0.1',8888)print(f'Send:{message!r}')writer.write(message.encode())awaitwriter.drain()data=awaitreader.read(100)print(f'Received:{data.decode()!r}')print('Close the connection')writer.close()awaitwriter.wait_closed()asyncio.run(tcp_echo_client('Hello World!'))

参考

TCP エコークライアントプロトコル の例は低水準のloop.create_connection() メソッドを使っています。

ストリームを使った TCP Echo サーバー

asyncio.start_server() 関数を使った TCP Echo サーバーです:

importasyncioasyncdefhandle_echo(reader,writer):data=awaitreader.read(100)message=data.decode()addr=writer.get_extra_info('peername')print(f"Received{message!r} from{addr!r}")print(f"Send:{message!r}")writer.write(data)awaitwriter.drain()print("Close the connection")writer.close()awaitwriter.wait_closed()asyncdefmain():server=awaitasyncio.start_server(handle_echo,'127.0.0.1',8888)addrs=', '.join(str(sock.getsockname())forsockinserver.sockets)print(f'Serving on{addrs}')asyncwithserver:awaitserver.serve_forever()asyncio.run(main())

参考

TCP エコーサーバープロトコル の例はloop.create_server() メソッドを使っています。

HTTP ヘッダーの取得

コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:

importasyncioimporturllib.parseimportsysasyncdefprint_http_headers(url):url=urllib.parse.urlsplit(url)ifurl.scheme=='https':reader,writer=awaitasyncio.open_connection(url.hostname,443,ssl=True)else:reader,writer=awaitasyncio.open_connection(url.hostname,80)query=(f"HEAD{url.pathor'/'} HTTP/1.0\r\n"f"Host:{url.hostname}\r\n"f"\r\n")writer.write(query.encode('latin-1'))whileTrue:line=awaitreader.readline()ifnotline:breakline=line.decode('latin1').rstrip()ifline:print(f'HTTP header>{line}')# Ignore the body, close the socketwriter.close()awaitwriter.wait_closed()url=sys.argv[1]asyncio.run(print_http_headers(url))

使い方:

pythonexample.pyhttp://example.com/path/page.html

または HTTPS を使用:

pythonexample.pyhttps://example.com/path/page.html

ストリームを使ってデータを待つオープンソケットの登録

open_connection() 関数を使ってソケットがデータを受信するまで待つコルーチンです:

importasyncioimportsocketasyncdefwait_for_data():# Get a reference to the current event loop because# we want to access low-level APIs.loop=asyncio.get_running_loop()# Create a pair of connected sockets.rsock,wsock=socket.socketpair()# Register the open socket to wait for data.reader,writer=awaitasyncio.open_connection(sock=rsock)# Simulate the reception of data from the networkloop.call_soon(wsock.send,'abc'.encode())# Wait for datadata=awaitreader.read(100)# Got data, we are done: close the socketprint("Received:",data.decode())writer.close()awaitwriter.wait_closed()# Close the second socketwsock.close()asyncio.run(wait_for_data())

参考

プロトコルを使ってオープンしたソケットをデータ待ち受けのために登録する 例では、低水準のプロトコルとloop.create_connection() メソッドを使っています。

ファイル記述子の読み出しイベントを監視する 例では、低水準のloop.add_reader() メソッドを使ってファイル記述子を監視しています。