18.5.5.Streams (coroutine based API)

Source code:Lib/asyncio/streams.py

18.5.5.1.Stream functions

Note

The top-level functions in this module are meant as convenience wrappersonly; there’s really nothing special there, and if they don’t doexactly what you want, feel free to copy their code.

coroutineasyncio.open_connection(host=None,port=None,*,loop=None,limit=None,**kwds)

A wrapper forcreate_connection() returning a (reader,writer) pair.

The reader returned is aStreamReader instance; the writer isaStreamWriter instance.

The arguments are all the usual arguments toAbstractEventLoop.create_connection() exceptprotocol_factory; mostcommon are positional host and port, with various optional keyword argumentsfollowing.

Additional optional keyword arguments areloop (to set the event loopinstance to use) andlimit (to set the buffer limit passed to theStreamReader).

This function is acoroutine.

coroutineasyncio.start_server(client_connected_cb,host=None,port=None,*,loop=None,limit=None,**kwds)

Start a socket server, with a callback for each client connected. The returnvalue is the same ascreate_server().

Theclient_connected_cb parameter is called with two parameters:client_reader,client_writer.client_reader is aStreamReader object, whileclient_writer is aStreamWriter object. Theclient_connected_cb parameter caneither be a plain callback function or acoroutine function; if it is a coroutine function, it will be automaticallyconverted into aTask.

The rest of the arguments are all the usual arguments tocreate_server() exceptprotocol_factory; mostcommon are positionalhost andport, with various optional keywordarguments following.

Additional optional keyword arguments areloop (to set the event loopinstance to use) andlimit (to set the buffer limit passed to theStreamReader).

This function is acoroutine.

coroutineasyncio.open_unix_connection(path=None,*,loop=None,limit=None,**kwds)

A wrapper forcreate_unix_connection() returninga (reader, writer) pair.

Seeopen_connection() for information about return value and otherdetails.

This function is acoroutine.

Availability: UNIX.

coroutineasyncio.start_unix_server(client_connected_cb,path=None,*,loop=None,limit=None,**kwds)

Start a UNIX Domain Socket server, with a callback for each client connected.

Seestart_server() for information about return value and otherdetails.

This function is acoroutine.

Availability: UNIX.

18.5.5.2.StreamReader

classasyncio.StreamReader(limit=_DEFAULT_LIMIT,loop=None)

This class isnot thread safe.

Thelimit argument’s default value is set to _DEFAULT_LIMIT which is 2**16 (64 KiB)

exception()

Get the exception.

feed_eof()

Acknowledge the EOF.

feed_data(data)

Feeddata bytes in the internal buffer. Any operations waitingfor the data will be resumed.

set_exception(exc)

Set the exception.

set_transport(transport)

Set the transport.

coroutineread(n=-1)

Read up ton bytes. Ifn is not provided, or set to-1,read until EOF and return all read bytes.

If the EOF was received and the internal buffer is empty,return an emptybytes object.

This method is acoroutine.

coroutinereadline()

Read one line, where “line” is a sequence of bytes ending with\n.

If EOF is received, and\n was not found, the method willreturn the partial read bytes.

If the EOF was received and the internal buffer is empty,return an emptybytes object.

This method is acoroutine.

coroutinereadexactly(n)

Read exactlyn bytes. Raise anIncompleteReadError if the end ofthe stream is reached beforen can be read, theIncompleteReadError.partial attribute of the exception containsthe partial read bytes.

This method is acoroutine.

coroutinereaduntil(separator=b'\n')

Read data from the stream untilseparator is found.

On success, the data and separator will be removed from theinternal buffer (consumed). Returned data will include theseparator at the end.

Configured stream limit is used to check result. Limit sets themaximal length of data that can be returned, not counting theseparator.

If an EOF occurs and the complete separator is still not found,anIncompleteReadError exception will beraised, and the internal buffer will be reset. TheIncompleteReadError.partial attribute may contain theseparator partially.

If the data cannot be read because of over limit, aLimitOverrunError exception will be raised, and the datawill be left in the internal buffer, so it can be read again.

New in version 3.5.2.

at_eof()

ReturnTrue if the buffer is empty andfeed_eof() was called.

18.5.5.3.StreamWriter

classasyncio.StreamWriter(transport,protocol,reader,loop)

Wraps a Transport.

This exposeswrite(),writelines(),can_write_eof(),write_eof(),get_extra_info() andclose(). It addsdrain() which returns an optionalFuture on which you canwait for flow control. It also adds a transport attribute which referencestheTransport directly.

This class isnot thread safe.

transport

Transport.

can_write_eof()

ReturnTrue if the transport supportswrite_eof(),False if not. SeeWriteTransport.can_write_eof().

close()

Close the transport: seeBaseTransport.close().

coroutinedrain()

Let the write buffer of the underlying transport a chance to be flushed.

The intended use is to write:

w.write(data)yield fromw.drain()

When the size of the transport buffer reaches the high-water limit (theprotocol is paused), block until the size of the buffer is drained downto the low-water limit and the protocol is resumed. When there is nothingto wait for, the yield-from continues immediately.

Yielding fromdrain() gives the opportunity for the loop toschedule the write operation and flush the buffer. It should especiallybe used when a possibly large amount of data is written to the transport,and the coroutine does not yield-from between calls towrite().

This method is acoroutine.

get_extra_info(name,default=None)

Return optional transport information: seeBaseTransport.get_extra_info().

write(data)

Write somedata bytes to the transport: seeWriteTransport.write().

writelines(data)

Write a list (or any iterable) of data bytes to the transport:seeWriteTransport.writelines().

write_eof()

Close the write end of the transport after flushing buffered data:seeWriteTransport.write_eof().

18.5.5.4.StreamReaderProtocol

classasyncio.StreamReaderProtocol(stream_reader,client_connected_cb=None,loop=None)

Trivial helper class to adapt betweenProtocol andStreamReader. Subclass ofProtocol.

stream_reader is aStreamReader instance,client_connected_cbis an optional function called with (stream_reader, stream_writer) when aconnection is made,loop is the event loop instance to use.

(This is a helper class instead of makingStreamReader itself aProtocol subclass, because theStreamReader has otherpotential uses, and to prevent the user of theStreamReader fromaccidentally calling inappropriate methods of the protocol.)

18.5.5.5.IncompleteReadError

exceptionasyncio.IncompleteReadError

Incomplete read error, subclass ofEOFError.

expected

Total number of expected bytes (int).

partial

Read bytes string before the end of stream was reached (bytes).

18.5.5.6.LimitOverrunError

exceptionasyncio.LimitOverrunError

Reached the buffer limit while looking for a separator.

consumed

Total number of to be consumed bytes.

18.5.5.7.Stream examples

18.5.5.7.1.TCP echo client using streams

TCP echo client using theasyncio.open_connection() function:

importasyncio@asyncio.coroutinedeftcp_echo_client(message,loop):reader,writer=yield fromasyncio.open_connection('127.0.0.1',8888,loop=loop)print('Send:%r'%message)writer.write(message.encode())data=yield fromreader.read(100)print('Received:%r'%data.decode())print('Close the socket')writer.close()message='Hello World!'loop=asyncio.get_event_loop()loop.run_until_complete(tcp_echo_client(message,loop))loop.close()

18.5.5.7.2.TCP echo server using streams

TCP echo server using theasyncio.start_server() function:

importasyncio@asyncio.coroutinedefhandle_echo(reader,writer):data=yield fromreader.read(100)message=data.decode()addr=writer.get_extra_info('peername')print("Received%r from%r"%(message,addr))print("Send:%r"%message)writer.write(data)yield fromwriter.drain()print("Close the client socket")writer.close()loop=asyncio.get_event_loop()coro=asyncio.start_server(handle_echo,'127.0.0.1',8888,loop=loop)server=loop.run_until_complete(coro)# Serve requests until Ctrl+C is pressedprint('Serving on{}'.format(server.sockets[0].getsockname()))try:loop.run_forever()exceptKeyboardInterrupt:pass# Close the serverserver.close()loop.run_until_complete(server.wait_closed())loop.close()

18.5.5.7.3.Get HTTP headers

Simple example querying HTTP headers of the URL passed on the command line:

importasyncioimporturllib.parseimportsys@asyncio.coroutinedefprint_http_headers(url):url=urllib.parse.urlsplit(url)ifurl.scheme=='https':connect=asyncio.open_connection(url.hostname,443,ssl=True)else:connect=asyncio.open_connection(url.hostname,80)reader,writer=yield fromconnectquery=('HEAD{path} HTTP/1.0\r\n''Host:{hostname}\r\n''\r\n').format(path=url.pathor'/',hostname=url.hostname)writer.write(query.encode('latin-1'))whileTrue:line=yield fromreader.readline()ifnotline:breakline=line.decode('latin1').rstrip()ifline:print('HTTP header>%s'%line)# Ignore the body, close the socketwriter.close()url=sys.argv[1]loop=asyncio.get_event_loop()task=asyncio.ensure_future(print_http_headers(url))loop.run_until_complete(task)loop.close()

Usage:

pythonexample.pyhttp://example.com/path/page.html

or with HTTPS:

pythonexample.pyhttps://example.com/path/page.html

18.5.5.7.4.Register an open socket to wait for data using streams

Coroutine waiting until a socket receives data using theopen_connection() function:

importasynciotry:fromsocketimportsocketpairexceptImportError:fromasyncio.windows_utilsimportsocketpair@asyncio.coroutinedefwait_for_data(loop):# Create a pair of connected socketsrsock,wsock=socketpair()# Register the open socket to wait for datareader,writer=yield fromasyncio.open_connection(sock=rsock,loop=loop)# Simulate the reception of data from the networkloop.call_soon(wsock.send,'abc'.encode())# Wait for datadata=yield fromreader.read(100)# Got data, we are done: close the socketprint("Received:",data.decode())writer.close()# Close the second socketwsock.close()loop=asyncio.get_event_loop()loop.run_until_complete(wait_for_data(loop))loop.close()

See also

Theregister an open socket to wait for data using a protocol example uses a low-level protocol created by theAbstractEventLoop.create_connection() method.

Thewatch a file descriptor for read events example uses the low-levelAbstractEventLoop.add_reader() method to register the file descriptor of asocket.