Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork32k
GH-91166: zero copy SelectorSocketTransport transport implementation#31871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
39b538f
abd2dc3
669b661
0692952
2725334
bed096d
f090e8d
d6c77cd
f2ee404
effab03
e1e4362
bdb1bda
d1fae6c
cd45016
5b962f5
85d6909
152b748
2c62bcb
9b92cff
7e05c2c
97de955
57b1ba0
2ca3571
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -9,6 +9,8 @@ | ||
import collections | ||
import errno | ||
import functools | ||
import itertools | ||
import os | ||
import selectors | ||
import socket | ||
import warnings | ||
@@ -28,6 +30,14 @@ | ||
from . import trsock | ||
from .log import logger | ||
_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') | ||
if _HAS_SENDMSG: | ||
try: | ||
SC_IOV_MAX = os.sysconf('SC_IOV_MAX') | ||
except OSError: | ||
# Fallback to send | ||
_HAS_SENDMSG = False | ||
def _test_selector_event(selector, fd, event): | ||
# Test if the selector is monitoring 'event' events | ||
@@ -757,8 +767,6 @@ class _SelectorTransport(transports._FlowControlMixin, | ||
max_size = 256 * 1024 # Buffer size passed to recv(). | ||
# Attribute used in the destructor: it must be set even if the constructor | ||
# is not called (see _SelectorSslTransport which may start by raising an | ||
# exception) | ||
@@ -783,7 +791,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): | ||
self.set_protocol(protocol) | ||
self._server = server | ||
self._buffer =collections.deque() | ||
self._conn_lost = 0 # Set when call to connection_lost scheduled. | ||
self._closing = False # Set when close() called. | ||
if self._server is not None: | ||
@@ -887,7 +895,7 @@ def _call_connection_lost(self, exc): | ||
self._server = None | ||
def get_write_buffer_size(self): | ||
returnsum(map(len,self._buffer)) | ||
def _add_reader(self, fd, callback, *args): | ||
if self._closing: | ||
@@ -909,7 +917,10 @@ def __init__(self, loop, sock, protocol, waiter=None, | ||
self._eof = False | ||
self._paused = False | ||
self._empty_waiter = None | ||
if _HAS_SENDMSG: | ||
self._write_ready = self._write_sendmsg | ||
else: | ||
self._write_ready = self._write_send | ||
# Disable the Nagle algorithm -- small writes will be | ||
# sent without waiting for the TCP ACK. This generally | ||
# decreases the latency (in some cases significantly.) | ||
@@ -1066,23 +1077,68 @@ def write(self, data): | ||
self._fatal_error(exc, 'Fatal write error on socket transport') | ||
return | ||
else: | ||
data =memoryview(data)[n:] | ||
if not data: | ||
return | ||
# Not all was written; register write handler. | ||
self._loop._add_writer(self._sock_fd, self._write_ready) | ||
# Add it to the buffer. | ||
self._buffer.append(data) | ||
self._maybe_pause_protocol() | ||
def _get_sendmsg_buffer(self): | ||
return itertools.islice(self._buffer, SC_IOV_MAX) | ||
def _write_sendmsg(self): | ||
assert self._buffer, 'Data should not be empty' | ||
if self._conn_lost: | ||
return | ||
try: | ||
nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) | ||
self._adjust_leftover_buffer(nbytes) | ||
except (BlockingIOError, InterruptedError): | ||
pass | ||
except (SystemExit, KeyboardInterrupt): | ||
raise | ||
except BaseException as exc: | ||
self._loop._remove_writer(self._sock_fd) | ||
self._buffer.clear() | ||
self._fatal_error(exc, 'Fatal write error on socket transport') | ||
if self._empty_waiter is not None: | ||
self._empty_waiter.set_exception(exc) | ||
else: | ||
self._maybe_resume_protocol() # May append to buffer. | ||
if not self._buffer: | ||
self._loop._remove_writer(self._sock_fd) | ||
if self._empty_waiter is not None: | ||
self._empty_waiter.set_result(None) | ||
if self._closing: | ||
self._call_connection_lost(None) | ||
elif self._eof: | ||
self._sock.shutdown(socket.SHUT_WR) | ||
def _adjust_leftover_buffer(self, nbytes: int) -> None: | ||
buffer = self._buffer | ||
while nbytes: | ||
b = buffer.popleft() | ||
b_len = len(b) | ||
if b_len <= nbytes: | ||
nbytes -= b_len | ||
else: | ||
buffer.appendleft(b[nbytes:]) | ||
break | ||
def _write_send(self): | ||
assert self._buffer, 'Data should not be empty' | ||
if self._conn_lost: | ||
return | ||
try: | ||
buffer = self._buffer.popleft() | ||
n = self._sock.send(buffer) | ||
kumaraditya303 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if n != len(buffer): | ||
# Not all data was written | ||
self._buffer.appendleft(buffer[n:]) | ||
except (BlockingIOError, InterruptedError): | ||
pass | ||
except (SystemExit, KeyboardInterrupt): | ||
@@ -1094,8 +1150,6 @@ def _write_ready(self): | ||
if self._empty_waiter is not None: | ||
self._empty_waiter.set_exception(exc) | ||
else: | ||
self._maybe_resume_protocol() # May append to buffer. | ||
if not self._buffer: | ||
self._loop._remove_writer(self._sock_fd) | ||
@@ -1113,6 +1167,16 @@ def write_eof(self): | ||
if not self._buffer: | ||
self._sock.shutdown(socket.SHUT_WR) | ||
def writelines(self, list_of_data): | ||
kumaraditya303 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
if self._eof: | ||
raise RuntimeError('Cannot call writelines() after write_eof()') | ||
if self._empty_waiter is not None: | ||
raise RuntimeError('unable to writelines; sendfile is in progress') | ||
if not list_of_data: | ||
return | ||
self._buffer.extend([memoryview(data) for data in list_of_data]) | ||
self._write_ready() | ||
kumaraditya303 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
def can_write_eof(self): | ||
return True | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,25 @@ | ||
"""Tests for selector_events.py""" | ||
importcollections | ||
import selectors | ||
import socket | ||
import sys | ||
import unittest | ||
from asyncio import selector_events | ||
from unittest import mock | ||
try: | ||
import ssl | ||
except ImportError: | ||
ssl = None | ||
import asyncio | ||
from asyncio.selector_events import(BaseSelectorEventLoop, | ||
_SelectorDatagramTransport, | ||
_SelectorSocketTransport, | ||
_SelectorTransport) | ||
from test.test_asyncio import utils as test_utils | ||
MOCK_ANY = mock.ANY | ||
@@ -37,7 +39,10 @@ def _close_self_pipe(self): | ||
def list_to_buffer(l=()): | ||
buffer = collections.deque() | ||
buffer.extend((memoryview(i) for i in l)) | ||
return buffer | ||
def close_transport(transport): | ||
@@ -493,9 +498,13 @@ def setUp(self): | ||
self.sock = mock.Mock(socket.socket) | ||
self.sock_fd = self.sock.fileno.return_value = 7 | ||
def socket_transport(self, waiter=None, sendmsg=False): | ||
transport = _SelectorSocketTransport(self.loop, self.sock, | ||
self.protocol, waiter=waiter) | ||
if sendmsg: | ||
transport._write_ready = transport._write_sendmsg | ||
else: | ||
transport._write_ready = transport._write_send | ||
self.addCleanup(close_transport, transport) | ||
return transport | ||
@@ -664,14 +673,14 @@ def test_write_memoryview(self): | ||
def test_write_no_data(self): | ||
transport = self.socket_transport() | ||
transport._buffer.append(memoryview(b'data')) | ||
transport.write(b'') | ||
self.assertFalse(self.sock.send.called) | ||
self.assertEqual(list_to_buffer([b'data']), transport._buffer) | ||
def test_write_buffer(self): | ||
transport = self.socket_transport() | ||
transport._buffer.append(b'data1') | ||
transport.write(b'data2') | ||
self.assertFalse(self.sock.send.called) | ||
self.assertEqual(list_to_buffer([b'data1', b'data2']), | ||
@@ -729,6 +738,77 @@ def test_write_tryagain(self): | ||
self.loop.assert_writer(7, transport._write_ready) | ||
self.assertEqual(list_to_buffer([b'data']), transport._buffer) | ||
def test_write_sendmsg_no_data(self): | ||
self.sock.sendmsg = mock.Mock() | ||
self.sock.sendmsg.return_value = 0 | ||
transport = self.socket_transport(sendmsg=True) | ||
transport._buffer.append(memoryview(b'data')) | ||
transport.write(b'') | ||
self.assertFalse(self.sock.sendmsg.called) | ||
self.assertEqual(list_to_buffer([b'data']), transport._buffer) | ||
@unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') | ||
def test_write_sendmsg_full(self): | ||
data = memoryview(b'data') | ||
self.sock.sendmsg = mock.Mock() | ||
self.sock.sendmsg.return_value = len(data) | ||
transport = self.socket_transport(sendmsg=True) | ||
transport._buffer.append(data) | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
self.assertTrue(self.sock.sendmsg.called) | ||
self.assertFalse(self.loop.writers) | ||
@unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') | ||
def test_write_sendmsg_partial(self): | ||
data = memoryview(b'data') | ||
self.sock.sendmsg = mock.Mock() | ||
# Sent partial data | ||
self.sock.sendmsg.return_value = 2 | ||
transport = self.socket_transport(sendmsg=True) | ||
transport._buffer.append(data) | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
self.assertTrue(self.sock.sendmsg.called) | ||
self.assertTrue(self.loop.writers) | ||
self.assertEqual(list_to_buffer([b'ta']), transport._buffer) | ||
@unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') | ||
def test_write_sendmsg_half_buffer(self): | ||
data = [memoryview(b'data1'), memoryview(b'data2')] | ||
self.sock.sendmsg = mock.Mock() | ||
# Sent partial data | ||
self.sock.sendmsg.return_value = 2 | ||
transport = self.socket_transport(sendmsg=True) | ||
transport._buffer.extend(data) | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
kumaraditya303 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
self.assertTrue(self.sock.sendmsg.called) | ||
self.assertTrue(self.loop.writers) | ||
self.assertEqual(list_to_buffer([b'ta1', b'data2']), transport._buffer) | ||
kumaraditya303 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
@unittest.skipUnless(selector_events._HAS_SENDMSG, 'no sendmsg') | ||
def test_write_sendmsg_OSError(self): | ||
data = memoryview(b'data') | ||
self.sock.sendmsg = mock.Mock() | ||
err = self.sock.sendmsg.side_effect = OSError() | ||
transport = self.socket_transport(sendmsg=True) | ||
transport._fatal_error = mock.Mock() | ||
transport._buffer.extend(data) | ||
# Calls _fatal_error and clears the buffer | ||
transport._write_ready() | ||
kumaraditya303 marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
self.assertTrue(self.sock.sendmsg.called) | ||
self.assertFalse(self.loop.writers) | ||
self.assertEqual(list_to_buffer([]), transport._buffer) | ||
transport._fatal_error.assert_called_with( | ||
err, | ||
'Fatal write error on socket transport') | ||
@mock.patch('asyncio.selector_events.logger') | ||
def test_write_exception(self, m_log): | ||
err = self.sock.send.side_effect = OSError() | ||
@@ -768,19 +848,19 @@ def test_write_ready(self): | ||
self.sock.send.return_value = len(data) | ||
transport = self.socket_transport() | ||
transport._buffer.append(data) | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
self.assertTrue(self.sock.send.called) | ||
self.assertFalse(self.loop.writers) | ||
def test_write_ready_closing(self): | ||
data =memoryview(b'data') | ||
self.sock.send.return_value = len(data) | ||
transport = self.socket_transport() | ||
transport._closing = True | ||
transport._buffer.append(data) | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
self.assertTrue(self.sock.send.called) | ||
@@ -795,11 +875,11 @@ def test_write_ready_no_data(self): | ||
self.assertRaises(AssertionError, transport._write_ready) | ||
def test_write_ready_partial(self): | ||
data =memoryview(b'data') | ||
self.sock.send.return_value = 2 | ||
transport = self.socket_transport() | ||
transport._buffer.append(data) | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
self.loop.assert_writer(7, transport._write_ready) | ||
@@ -810,7 +890,7 @@ def test_write_ready_partial_none(self): | ||
self.sock.send.return_value = 0 | ||
transport = self.socket_transport() | ||
transport._buffer.append(data) | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
self.loop.assert_writer(7, transport._write_ready) | ||
@@ -820,12 +900,13 @@ def test_write_ready_tryagain(self): | ||
self.sock.send.side_effect = BlockingIOError | ||
transport = self.socket_transport() | ||
buffer = list_to_buffer([b'data1', b'data2']) | ||
transport._buffer = buffer | ||
self.loop._add_writer(7, transport._write_ready) | ||
transport._write_ready() | ||
self.loop.assert_writer(7, transport._write_ready) | ||
self.assertEqual(buffer, transport._buffer) | ||
def test_write_ready_exception(self): | ||
err = self.sock.send.side_effect = OSError() | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
:mod:`asyncio` is optimized to avoid excessive copying when writing to socket and use :meth:`~socket.socket.sendmsg` if the platform supports it. Patch by Kumar Aditya. |