18.5.5.Streams (coroutine based API)¶
Source code:Lib/asyncio/streams.py
18.5.5.1.Stream functions¶
Note
The top-level functions in this module are meant as convenience wrappersonly; there’s really nothing special there, and if they don’t doexactly what you want, feel free to copy their code.
- coroutine
asyncio.open_connection(host=None,port=None,*,loop=None,limit=None,**kwds)¶ A wrapper for
create_connection()returning a (reader,writer) pair.The reader returned is a
StreamReaderinstance; the writer isaStreamWriterinstance.The arguments are all the usual arguments to
AbstractEventLoop.create_connection()exceptprotocol_factory; mostcommon are positional host and port, with various optional keyword argumentsfollowing.Additional optional keyword arguments areloop (to set the event loopinstance to use) andlimit (to set the buffer limit passed to the
StreamReader).This function is acoroutine.
- coroutine
asyncio.start_server(client_connected_cb,host=None,port=None,*,loop=None,limit=None,**kwds)¶ Start a socket server, with a callback for each client connected. The returnvalue is the same as
create_server().Theclient_connected_cb parameter is called with two parameters:client_reader,client_writer.client_reader is a
StreamReaderobject, whileclient_writer is aStreamWriterobject. Theclient_connected_cb parameter caneither be a plain callback function or acoroutine function; if it is a coroutine function, it will be automaticallyconverted into aTask.The rest of the arguments are all the usual arguments to
create_server()exceptprotocol_factory; mostcommon are positionalhost andport, with various optional keywordarguments following.Additional optional keyword arguments areloop (to set the event loopinstance to use) andlimit (to set the buffer limit passed to the
StreamReader).This function is acoroutine.
- coroutine
asyncio.open_unix_connection(path=None,*,loop=None,limit=None,**kwds)¶ A wrapper for
create_unix_connection()returninga (reader, writer) pair.See
open_connection()for information about return value and otherdetails.This function is acoroutine.
Availability: UNIX.
- coroutine
asyncio.start_unix_server(client_connected_cb,path=None,*,loop=None,limit=None,**kwds)¶ Start a UNIX Domain Socket server, with a callback for each client connected.
See
start_server()for information about return value and otherdetails.This function is acoroutine.
Availability: UNIX.
18.5.5.2.StreamReader¶
- class
asyncio.StreamReader(limit=_DEFAULT_LIMIT,loop=None)¶ This class isnot thread safe.
Thelimit argument’s default value is set to _DEFAULT_LIMIT which is 2**16 (64 KiB)
exception()¶Get the exception.
feed_eof()¶Acknowledge the EOF.
feed_data(data)¶Feeddata bytes in the internal buffer. Any operations waitingfor the data will be resumed.
set_exception(exc)¶Set the exception.
set_transport(transport)¶Set the transport.
- coroutine
read(n=-1)¶ Read up ton bytes. Ifn is not provided, or set to
-1,read until EOF and return all read bytes.If the EOF was received and the internal buffer is empty,return an empty
bytesobject.This method is acoroutine.
- coroutine
readline()¶ Read one line, where “line” is a sequence of bytes ending with
\n.If EOF is received, and
\nwas not found, the method willreturn the partial read bytes.If the EOF was received and the internal buffer is empty,return an empty
bytesobject.This method is acoroutine.
- coroutine
readexactly(n)¶ Read exactlyn bytes. Raise an
IncompleteReadErrorif the end ofthe stream is reached beforen can be read, theIncompleteReadError.partialattribute of the exception containsthe partial read bytes.This method is acoroutine.
- coroutine
readuntil(separator=b'\n')¶ Read data from the stream until
separatoris found.On success, the data and separator will be removed from theinternal buffer (consumed). Returned data will include theseparator at the end.
Configured stream limit is used to check result. Limit sets themaximal length of data that can be returned, not counting theseparator.
If an EOF occurs and the complete separator is still not found,an
IncompleteReadErrorexception will beraised, and the internal buffer will be reset. TheIncompleteReadError.partialattribute may contain theseparator partially.If the data cannot be read because of over limit, a
LimitOverrunErrorexception will be raised, and the datawill be left in the internal buffer, so it can be read again.New in version 3.5.2.
at_eof()¶Return
Trueif the buffer is empty andfeed_eof()was called.
18.5.5.3.StreamWriter¶
- class
asyncio.StreamWriter(transport,protocol,reader,loop)¶ Wraps a Transport.
This exposes
write(),writelines(),can_write_eof(),write_eof(),get_extra_info()andclose(). It addsdrain()which returns an optionalFutureon which you canwait for flow control. It also adds a transport attribute which referencestheTransportdirectly.This class isnot thread safe.
transport¶Transport.
can_write_eof()¶Return
Trueif the transport supportswrite_eof(),Falseif not. SeeWriteTransport.can_write_eof().
close()¶Close the transport: see
BaseTransport.close().
- coroutine
drain()¶ Let the write buffer of the underlying transport a chance to be flushed.
The intended use is to write:
w.write(data)yield fromw.drain()
When the size of the transport buffer reaches the high-water limit (theprotocol is paused), block until the size of the buffer is drained downto the low-water limit and the protocol is resumed. When there is nothingto wait for, the yield-from continues immediately.
Yielding from
drain()gives the opportunity for the loop toschedule the write operation and flush the buffer. It should especiallybe used when a possibly large amount of data is written to the transport,and the coroutine does not yield-from between calls towrite().This method is acoroutine.
get_extra_info(name,default=None)¶Return optional transport information: see
BaseTransport.get_extra_info().
write(data)¶Write somedata bytes to the transport: see
WriteTransport.write().
writelines(data)¶Write a list (or any iterable) of data bytes to the transport:see
WriteTransport.writelines().
write_eof()¶Close the write end of the transport after flushing buffered data:see
WriteTransport.write_eof().
18.5.5.4.StreamReaderProtocol¶
- class
asyncio.StreamReaderProtocol(stream_reader,client_connected_cb=None,loop=None)¶ Trivial helper class to adapt between
ProtocolandStreamReader. Subclass ofProtocol.stream_reader is a
StreamReaderinstance,client_connected_cbis an optional function called with (stream_reader, stream_writer) when aconnection is made,loop is the event loop instance to use.(This is a helper class instead of making
StreamReaderitself aProtocolsubclass, because theStreamReaderhas otherpotential uses, and to prevent the user of theStreamReaderfromaccidentally calling inappropriate methods of the protocol.)
18.5.5.5.IncompleteReadError¶
18.5.5.6.LimitOverrunError¶
18.5.5.7.Stream examples¶
18.5.5.7.1.TCP echo client using streams¶
TCP echo client using theasyncio.open_connection() function:
importasyncio@asyncio.coroutinedeftcp_echo_client(message,loop):reader,writer=yield fromasyncio.open_connection('127.0.0.1',8888,loop=loop)print('Send:%r'%message)writer.write(message.encode())data=yield fromreader.read(100)print('Received:%r'%data.decode())print('Close the socket')writer.close()message='Hello World!'loop=asyncio.get_event_loop()loop.run_until_complete(tcp_echo_client(message,loop))loop.close()
See also
TheTCP echo client protocolexample uses theAbstractEventLoop.create_connection() method.
18.5.5.7.2.TCP echo server using streams¶
TCP echo server using theasyncio.start_server() function:
importasyncio@asyncio.coroutinedefhandle_echo(reader,writer):data=yield fromreader.read(100)message=data.decode()addr=writer.get_extra_info('peername')print("Received%r from%r"%(message,addr))print("Send:%r"%message)writer.write(data)yield fromwriter.drain()print("Close the client socket")writer.close()loop=asyncio.get_event_loop()coro=asyncio.start_server(handle_echo,'127.0.0.1',8888,loop=loop)server=loop.run_until_complete(coro)# Serve requests until Ctrl+C is pressedprint('Serving on{}'.format(server.sockets[0].getsockname()))try:loop.run_forever()exceptKeyboardInterrupt:pass# Close the serverserver.close()loop.run_until_complete(server.wait_closed())loop.close()
See also
TheTCP echo server protocolexample uses theAbstractEventLoop.create_server() method.
18.5.5.7.3.Get HTTP headers¶
Simple example querying HTTP headers of the URL passed on the command line:
importasyncioimporturllib.parseimportsys@asyncio.coroutinedefprint_http_headers(url):url=urllib.parse.urlsplit(url)ifurl.scheme=='https':connect=asyncio.open_connection(url.hostname,443,ssl=True)else:connect=asyncio.open_connection(url.hostname,80)reader,writer=yield fromconnectquery=('HEAD{path} HTTP/1.0\r\n''Host:{hostname}\r\n''\r\n').format(path=url.pathor'/',hostname=url.hostname)writer.write(query.encode('latin-1'))whileTrue:line=yield fromreader.readline()ifnotline:breakline=line.decode('latin1').rstrip()ifline:print('HTTP header>%s'%line)# Ignore the body, close the socketwriter.close()url=sys.argv[1]loop=asyncio.get_event_loop()task=asyncio.ensure_future(print_http_headers(url))loop.run_until_complete(task)loop.close()
Usage:
pythonexample.pyhttp://example.com/path/page.html
or with HTTPS:
pythonexample.pyhttps://example.com/path/page.html
18.5.5.7.4.Register an open socket to wait for data using streams¶
Coroutine waiting until a socket receives data using theopen_connection() function:
importasynciotry:fromsocketimportsocketpairexceptImportError:fromasyncio.windows_utilsimportsocketpair@asyncio.coroutinedefwait_for_data(loop):# Create a pair of connected socketsrsock,wsock=socketpair()# Register the open socket to wait for datareader,writer=yield fromasyncio.open_connection(sock=rsock,loop=loop)# Simulate the reception of data from the networkloop.call_soon(wsock.send,'abc'.encode())# Wait for datadata=yield fromreader.read(100)# Got data, we are done: close the socketprint("Received:",data.decode())writer.close()# Close the second socketwsock.close()loop=asyncio.get_event_loop()loop.run_until_complete(wait_for_data(loop))loop.close()
See also
Theregister an open socket to wait for data using a protocol example uses a low-level protocol created by theAbstractEventLoop.create_connection() method.
Thewatch a file descriptor for read events example uses the low-levelAbstractEventLoop.add_reader() method to register the file descriptor of asocket.
