ストリーム¶
ソースコード:Lib/asyncio/streams.py
ストリームはネットワークコネクションと合わせて動作する高水準の async/await 可能な基本要素です。ストリームはコールバックや低水準のプロトコルやトランスポートを使うことなくデータを送受信することを可能にします。
以下は asyncio ストリームを使って書いた TCP エコークライアントの例です:
importasyncioasyncdeftcp_echo_client(message):reader,writer=awaitasyncio.open_connection('127.0.0.1',8888)print(f'Send:{message!r}')writer.write(message.encode())awaitwriter.drain()data=awaitreader.read(100)print(f'Received:{data.decode()!r}')print('Close the connection')writer.close()awaitwriter.wait_closed()asyncio.run(tcp_echo_client('Hello World!'))
下記の使用例 節も参照してください。
ストリーム関数
以下の asyncio のトップレベル関数はストリームの作成や操作を行うことができます:
- coroutineasyncio.open_connection(host=None,port=None,*,limit=None,ssl=None,family=0,proto=0,flags=0,sock=None,local_addr=None,server_hostname=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None,happy_eyeballs_delay=None,interleave=None)¶
ネットワークコネクションを確立し、
(reader,writer)
のオブジェクトのペアを返します。戻り値のreader とwriter はそれぞれ
StreamReader
とStreamWriter
クラスのインスタンスです。limit は戻り値の
StreamReader
インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトではlimit は 64 KiB に設定されます。残りの引数は直接
loop.create_connection()
に渡されます。注釈
Thesock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.バージョン 3.7 で変更:Added thessl_handshake_timeout parameter.
バージョン 3.8 で変更:happy_eyeballs_delay とinterleave が追加されました。
バージョン 3.10 で変更:loop パラメータが削除されました。
バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。
- coroutineasyncio.start_server(client_connected_cb,host=None,port=None,*,limit=None,family=socket.AF_UNSPEC,flags=socket.AI_PASSIVE,sock=None,backlog=100,ssl=None,reuse_address=None,reuse_port=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None,start_serving=True)¶
ソケットサーバーを起動します。
client_connected_cb コールバックは新しいクライアントコネクションが確立されるたびに呼び出されます。このコールバックは
StreamReader
とStreamWriter
クラスのインスタンスのペア(reader,writer)
を2つの引数として受け取ります。client_connected_cb には単純な呼び出し可能オブジェクトか、またはコルーチン関数 を指定します; コルーチン関数が指定された場合、コールバックの呼び出しは自動的に
Task
としてスケジュールされます。limit は戻り値の
StreamReader
インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトではlimit は 64 KiB に設定されます。残りの引数は直接
loop.create_server()
に渡されます。注釈
Thesock argument transfers ownership of the socket to theserver created. To close the socket, call the server's
close()
method.バージョン 3.7 で変更:ssl_handshake_timeout とstart_serving パラメータが追加されました。
バージョン 3.10 で変更:loop パラメータが削除されました。
バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。
Unix ソケット
- coroutineasyncio.open_unix_connection(path=None,*,limit=None,ssl=None,sock=None,server_hostname=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None)¶
Unix ソケットコネクションを確立し、
(reader,writer)
のオブジェクトのペアを返します。この関数は
open_connection()
と似ていますが Unix ソケットに対して動作します。loop.create_unix_connection()
のドキュメントも参照してください。注釈
Thesock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.Availability: Unix.
バージョン 3.7 で変更:Added thessl_handshake_timeout parameter.Thepath parameter can now be apath-like object
バージョン 3.10 で変更:loop パラメータが削除されました。
バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。
- coroutineasyncio.start_unix_server(client_connected_cb,path=None,*,limit=None,sock=None,backlog=100,ssl=None,ssl_handshake_timeout=None,ssl_shutdown_timeout=None,start_serving=True)¶
Unix のソケットサーバーを起動します。
start_server()
と似ていますが Unix ソケットに対して動作します。loop.create_unix_server()
のドキュメントも参照してください。注釈
Thesock argument transfers ownership of the socket to theserver created. To close the socket, call the server's
close()
method.Availability: Unix.
バージョン 3.7 で変更:Added thessl_handshake_timeout andstart_serving parameters.Thepath parameter can now be apath-like object.
バージョン 3.10 で変更:loop パラメータが削除されました。
バージョン 3.11 で変更:ssl_shutdown_timeout パラメータが追加されました。
StreamReader¶
- classasyncio.StreamReader¶
Represents a reader object that provides APIs to read datafrom the IO stream. As anasynchronous iterable, theobject supports the
asyncfor
statement.StreamReader オブジェクトを直接インスタンス化することは推奨されません; 代わりに
open_connection()
やstart_server()
を使ってください。- feed_eof()¶
EOF の肯定応答を行います。
- coroutineread(n=-1)¶
ストリームから最大n バイト読み込みます。
n が指定されないか
-1
が指定されていた場合 EOF になるまで読み込み、読み込まれた全てのbytes
を返します。EOF を受信し、かつ内部バッファーが空の場合、空のbytes
オブジェクトを返します。n が
0
なら、ただちに空のbytes
オブジェクトを返します。n が正なら、内部バッファで最低でも 1 バイトが利用可能になり次第、利用可能な最大n
bytes
を返します。1 バイトも読み込まないうちに EOF を受信したなら、空のbytes
オブジェクトを返します。
- coroutinereadline()¶
1 行読み込みます。 "行" とは、
\n
で終了するバイト列のシーケンスです。EOF を受信し、かつ
\n
が見つからない場合、このメソッドは部分的に読み込んだデータを返します。EOF を受信し、かつ内部バッファーが空の場合、空の
bytes
オブジェクトを返します。
- coroutinereadexactly(n)¶
厳密にn バイトのデータを読み出します。
n バイトを読み出す前に EOF に達した場合
IncompleteReadError
を送出します。部分的に読み出したデータを取得するにはIncompleteReadError.partial
属性を使ってください。
- coroutinereaduntil(separator=b'\n')¶
separator が見つかるまでストリームからデータを読み出します。
成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。
読み出したデータの量が設定したストリームの上限を超えると
LimitOverrunError
例外が送出されます。このときデータは内部バッファーに残され、再度読み出すことができます。完全な区切り文字が見つかる前に EOF に達すると
IncompleteReadError
例外が送出され、内部バッファーがリセットされます。このときIncompleteReadError.partial
属性は区切り文字の一部を含むかもしれません。バージョン 3.5.2 で追加.
- at_eof()¶
バッファーが空で
feed_eof()
が呼ばれていた場合True
を返します。
StreamWriter¶
- classasyncio.StreamWriter¶
IO ストリームにデータを書き込むための API を提供するライターオブジェクトを表します。
StreamWriter オブジェクトを直接インスタンス化することは推奨されません; 代わりに
open_connection()
やstart_server()
を使ってください。- write(data)¶
このメソッドは、背後にあるソケットにデータdata を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。
このメソッドは
drain()
メソッドと組み合わせて使うべきです:stream.write(data)awaitstream.drain()
- writelines(data)¶
このメソッドは、背後にあるソケットにバイトデータのリスト (またはイテラブル) を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。
このメソッドは
drain()
メソッドと組み合わせて使うべきです:stream.writelines(lines)awaitstream.drain()
- close()¶
このメソッドはストリームと背後にあるソケットをクローズします。
The method should be used, though not mandatory,along with the
wait_closed()
method:stream.close()awaitstream.wait_closed()
- can_write_eof()¶
背後にあるトランスポートが
write_eof()
メソッドをサポートしている場合True
を返し、そうでない場合はFalse
を返します。
- write_eof()¶
バッファーされた書き込みデータを全て書き込んでから、ストリームの書き込み側終端をクローズします。
- transport¶
背後にある asyncio トランスポートを返します。
- get_extra_info(name,default=None)¶
オプションのトランスポート情報にアクセスします。詳細は
BaseTransport.get_extra_info()
を参照してください。
- coroutinedrain()¶
ストリームへの書き込み再開に適切な状態になるまで待ちます。使用例:
writer.write(data)awaitwriter.drain()
このメソッドは背後にある IO 書き込みバッファーとやりとりを行うフロー制御メソッドです。バッファーのサイズが最高水位点に達した場合、drain() はバッファのサイズが最低水位点を下回るまで減量され、書き込み再開可能になるまで書き込みをブロックします。待ち受けの必要がない場合、
drain()
は即座にリターンします。
- coroutinestart_tls(sslcontext,*,server_hostname=None,ssl_handshake_timeout=None)¶
Upgrade an existing stream-based connection to TLS.
引数:
sslcontext: 構成済みの
SSLContext
インスタンスです。server_hostname: 対象のサーバーの証明書との照合に使われるホスト名を設定または上書きします。
ssl_handshake_timeout is the time in seconds to wait for the TLShandshake to complete before aborting the connection.
60.0
secondsifNone
(default).
バージョン 3.11 で追加.
- is_closing()¶
ストリームがクローズされたか、またはクローズ処理中の場合に
True
を返します。バージョン 3.7 で追加.
使用例¶
ストリームを使った TCP Echo クライアント¶
asyncio.open_connection()
関数を使った TCP Echo クライアントです:
importasyncioasyncdeftcp_echo_client(message):reader,writer=awaitasyncio.open_connection('127.0.0.1',8888)print(f'Send:{message!r}')writer.write(message.encode())awaitwriter.drain()data=awaitreader.read(100)print(f'Received:{data.decode()!r}')print('Close the connection')writer.close()awaitwriter.wait_closed()asyncio.run(tcp_echo_client('Hello World!'))
参考
TCP エコークライアントプロトコル の例は低水準のloop.create_connection()
メソッドを使っています。
ストリームを使った TCP Echo サーバー¶
asyncio.start_server()
関数を使った TCP Echo サーバーです:
importasyncioasyncdefhandle_echo(reader,writer):data=awaitreader.read(100)message=data.decode()addr=writer.get_extra_info('peername')print(f"Received{message!r} from{addr!r}")print(f"Send:{message!r}")writer.write(data)awaitwriter.drain()print("Close the connection")writer.close()awaitwriter.wait_closed()asyncdefmain():server=awaitasyncio.start_server(handle_echo,'127.0.0.1',8888)addrs=', '.join(str(sock.getsockname())forsockinserver.sockets)print(f'Serving on{addrs}')asyncwithserver:awaitserver.serve_forever()asyncio.run(main())
参考
TCP エコーサーバープロトコル の例はloop.create_server()
メソッドを使っています。
HTTP ヘッダーの取得¶
コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:
importasyncioimporturllib.parseimportsysasyncdefprint_http_headers(url):url=urllib.parse.urlsplit(url)ifurl.scheme=='https':reader,writer=awaitasyncio.open_connection(url.hostname,443,ssl=True)else:reader,writer=awaitasyncio.open_connection(url.hostname,80)query=(f"HEAD{url.pathor'/'} HTTP/1.0\r\n"f"Host:{url.hostname}\r\n"f"\r\n")writer.write(query.encode('latin-1'))whileTrue:line=awaitreader.readline()ifnotline:breakline=line.decode('latin1').rstrip()ifline:print(f'HTTP header>{line}')# Ignore the body, close the socketwriter.close()awaitwriter.wait_closed()url=sys.argv[1]asyncio.run(print_http_headers(url))
使い方:
pythonexample.pyhttp://example.com/path/page.html
または HTTPS を使用:
pythonexample.pyhttps://example.com/path/page.html
ストリームを使ってデータを待つオープンソケットの登録¶
open_connection()
関数を使ってソケットがデータを受信するまで待つコルーチンです:
importasyncioimportsocketasyncdefwait_for_data():# Get a reference to the current event loop because# we want to access low-level APIs.loop=asyncio.get_running_loop()# Create a pair of connected sockets.rsock,wsock=socket.socketpair()# Register the open socket to wait for data.reader,writer=awaitasyncio.open_connection(sock=rsock)# Simulate the reception of data from the networkloop.call_soon(wsock.send,'abc'.encode())# Wait for datadata=awaitreader.read(100)# Got data, we are done: close the socketprint("Received:",data.decode())writer.close()awaitwriter.wait_closed()# Close the second socketwsock.close()asyncio.run(wait_for_data())
参考
プロトコルを使ってオープンしたソケットをデータ待ち受けのために登録する 例では、低水準のプロトコルとloop.create_connection()
メソッドを使っています。
ファイル記述子の読み出しイベントを監視する 例では、低水準のloop.add_reader()
メソッドを使ってファイル記述子を監視しています。