Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1d0d49a

Browse files
authored
gh-113538: Add asycio.Server.{close,abort}_clients (#114432)
These give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.
1 parent872c071 commit1d0d49a

File tree

8 files changed

+152
-20
lines changed

8 files changed

+152
-20
lines changed

‎Doc/library/asyncio-eventloop.rst‎

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,6 +1641,31 @@ Do not instantiate the :class:`Server` class directly.
16411641
coroutine to wait until the server is closed (and no more
16421642
connections are active).
16431643

1644+
..method::close_clients()
1645+
1646+
Close all existing incoming client connections.
1647+
1648+
Calls:meth:`~asyncio.BaseTransport.close` on all associated
1649+
transports.
1650+
1651+
:meth:`close` should be called before:meth:`close_clients` when
1652+
closing the server to avoid races with new clients connecting.
1653+
1654+
..versionadded::3.13
1655+
1656+
..method::abort_clients()
1657+
1658+
Close all existing incoming client connections immediately,
1659+
without waiting for pending operations to complete.
1660+
1661+
Calls:meth:`~asyncio.WriteTransport.abort` on all associated
1662+
transports.
1663+
1664+
:meth:`close` should be called before:meth:`abort_clients` when
1665+
closing the server to avoid races with new clients connecting.
1666+
1667+
..versionadded::3.13
1668+
16441669
..method::get_loop()
16451670

16461671
Return the event loop associated with the server object.

‎Doc/whatsnew/3.13.rst‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ asyncio
270270
the buffer size.
271271
(Contributed by Jamie Phan in:gh:`115199`.)
272272

273+
* Add:meth:`asyncio.Server.close_clients` and
274+
:meth:`asyncio.Server.abort_clients` methods which allow to more
275+
forcefully close an asyncio server.
276+
(Contributed by Pierre Ossman in:gh:`113538`.)
277+
273278
base64
274279
---
275280

‎Lib/asyncio/base_events.py‎

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,9 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
279279
ssl_handshake_timeout,ssl_shutdown_timeout=None):
280280
self._loop=loop
281281
self._sockets=sockets
282-
self._active_count=0
282+
# Weak references so we don't break Transport's ability to
283+
# detect abandoned transports
284+
self._clients=weakref.WeakSet()
283285
self._waiters= []
284286
self._protocol_factory=protocol_factory
285287
self._backlog=backlog
@@ -292,14 +294,13 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
292294
def__repr__(self):
293295
returnf'<{self.__class__.__name__} sockets={self.sockets!r}>'
294296

295-
def_attach(self):
297+
def_attach(self,transport):
296298
assertself._socketsisnotNone
297-
self._active_count+=1
299+
self._clients.add(transport)
298300

299-
def_detach(self):
300-
assertself._active_count>0
301-
self._active_count-=1
302-
ifself._active_count==0andself._socketsisNone:
301+
def_detach(self,transport):
302+
self._clients.discard(transport)
303+
iflen(self._clients)==0andself._socketsisNone:
303304
self._wakeup()
304305

305306
def_wakeup(self):
@@ -348,9 +349,17 @@ def close(self):
348349
self._serving_forever_fut.cancel()
349350
self._serving_forever_fut=None
350351

351-
ifself._active_count==0:
352+
iflen(self._clients)==0:
352353
self._wakeup()
353354

355+
defclose_clients(self):
356+
fortransportinself._clients.copy():
357+
transport.close()
358+
359+
defabort_clients(self):
360+
fortransportinself._clients.copy():
361+
transport.abort()
362+
354363
asyncdefstart_serving(self):
355364
self._start_serving()
356365
# Skip one loop iteration so that all 'loop.add_reader'

‎Lib/asyncio/events.py‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ def close(self):
175175
"""Stop serving. This leaves existing connections open."""
176176
raiseNotImplementedError
177177

178+
defclose_clients(self):
179+
"""Close all active connections."""
180+
raiseNotImplementedError
181+
182+
defabort_clients(self):
183+
"""Close all active connections immediately."""
184+
raiseNotImplementedError
185+
178186
defget_loop(self):
179187
"""Get the event loop the Server object is attached to."""
180188
raiseNotImplementedError

‎Lib/asyncio/proactor_events.py‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
6363
self._called_connection_lost=False
6464
self._eof_written=False
6565
ifself._serverisnotNone:
66-
self._server._attach()
66+
self._server._attach(self)
6767
self._loop.call_soon(self._protocol.connection_made,self)
6868
ifwaiterisnotNone:
6969
# only wake up the waiter when connection_made() has been called
@@ -167,7 +167,7 @@ def _call_connection_lost(self, exc):
167167
self._sock=None
168168
server=self._server
169169
ifserverisnotNone:
170-
server._detach()
170+
server._detach(self)
171171
self._server=None
172172
self._called_connection_lost=True
173173

‎Lib/asyncio/selector_events.py‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
791791
self._paused=False# Set when pause_reading() called
792792

793793
ifself._serverisnotNone:
794-
self._server._attach()
794+
self._server._attach(self)
795795
loop._transports[self._sock_fd]=self
796796

797797
def__repr__(self):
@@ -868,6 +868,8 @@ def __del__(self, _warn=warnings.warn):
868868
ifself._sockisnotNone:
869869
_warn(f"unclosed transport{self!r}",ResourceWarning,source=self)
870870
self._sock.close()
871+
ifself._serverisnotNone:
872+
self._server._detach(self)
871873

872874
def_fatal_error(self,exc,message='Fatal error on transport'):
873875
# Should be called from exception handler only.
@@ -906,7 +908,7 @@ def _call_connection_lost(self, exc):
906908
self._loop=None
907909
server=self._server
908910
ifserverisnotNone:
909-
server._detach()
911+
server._detach(self)
910912
self._server=None
911913

912914
defget_write_buffer_size(self):

‎Lib/test/test_asyncio/test_server.py‎

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,12 @@ async def main(srv):
125125
classTestServer2(unittest.IsolatedAsyncioTestCase):
126126

127127
asyncdeftest_wait_closed_basic(self):
128-
asyncdefserve(*args):
129-
pass
128+
asyncdefserve(rd,wr):
129+
try:
130+
awaitrd.read()
131+
finally:
132+
wr.close()
133+
awaitwr.wait_closed()
130134

131135
srv=awaitasyncio.start_server(serve,socket_helper.HOSTv4,0)
132136
self.addCleanup(srv.close)
@@ -137,7 +141,8 @@ async def serve(*args):
137141
self.assertFalse(task1.done())
138142

139143
# active count != 0, not closed: should block
140-
srv._attach()
144+
addr=srv.sockets[0].getsockname()
145+
(rd,wr)=awaitasyncio.open_connection(addr[0],addr[1])
141146
task2=asyncio.create_task(srv.wait_closed())
142147
awaitasyncio.sleep(0)
143148
self.assertFalse(task1.done())
@@ -152,7 +157,8 @@ async def serve(*args):
152157
self.assertFalse(task2.done())
153158
self.assertFalse(task3.done())
154159

155-
srv._detach()
160+
wr.close()
161+
awaitwr.wait_closed()
156162
# active count == 0, closed: should unblock
157163
awaittask1
158164
awaittask2
@@ -161,22 +167,96 @@ async def serve(*args):
161167

162168
asyncdeftest_wait_closed_race(self):
163169
# Test a regression in 3.12.0, should be fixed in 3.12.1
164-
asyncdefserve(*args):
165-
pass
170+
asyncdefserve(rd,wr):
171+
try:
172+
awaitrd.read()
173+
finally:
174+
wr.close()
175+
awaitwr.wait_closed()
166176

167177
srv=awaitasyncio.start_server(serve,socket_helper.HOSTv4,0)
168178
self.addCleanup(srv.close)
169179

170180
task=asyncio.create_task(srv.wait_closed())
171181
awaitasyncio.sleep(0)
172182
self.assertFalse(task.done())
173-
srv._attach()
183+
addr=srv.sockets[0].getsockname()
184+
(rd,wr)=awaitasyncio.open_connection(addr[0],addr[1])
174185
loop=asyncio.get_running_loop()
175186
loop.call_soon(srv.close)
176-
loop.call_soon(srv._detach)
187+
loop.call_soon(wr.close)
177188
awaitsrv.wait_closed()
178189

190+
asyncdeftest_close_clients(self):
191+
asyncdefserve(rd,wr):
192+
try:
193+
awaitrd.read()
194+
finally:
195+
wr.close()
196+
awaitwr.wait_closed()
197+
198+
srv=awaitasyncio.start_server(serve,socket_helper.HOSTv4,0)
199+
self.addCleanup(srv.close)
200+
201+
addr=srv.sockets[0].getsockname()
202+
(rd,wr)=awaitasyncio.open_connection(addr[0],addr[1])
203+
self.addCleanup(wr.close)
204+
205+
task=asyncio.create_task(srv.wait_closed())
206+
awaitasyncio.sleep(0)
207+
self.assertFalse(task.done())
208+
209+
srv.close()
210+
srv.close_clients()
211+
awaitasyncio.sleep(0)
212+
awaitasyncio.sleep(0)
213+
self.assertTrue(task.done())
214+
215+
asyncdeftest_abort_clients(self):
216+
asyncdefserve(rd,wr):
217+
nonlocals_rd,s_wr
218+
s_rd=rd
219+
s_wr=wr
220+
awaitwr.wait_closed()
221+
222+
s_rd=s_wr=None
223+
srv=awaitasyncio.start_server(serve,socket_helper.HOSTv4,0)
224+
self.addCleanup(srv.close)
225+
226+
addr=srv.sockets[0].getsockname()
227+
(c_rd,c_wr)=awaitasyncio.open_connection(addr[0],addr[1],limit=4096)
228+
self.addCleanup(c_wr.close)
229+
230+
# Limit the socket buffers so we can reliably overfill them
231+
s_sock=s_wr.get_extra_info('socket')
232+
s_sock.setsockopt(socket.SOL_SOCKET,socket.SO_SNDBUF,65536)
233+
c_sock=c_wr.get_extra_info('socket')
234+
c_sock.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,65536)
235+
236+
# Get the reader in to a paused state by sending more than twice
237+
# the configured limit
238+
s_wr.write(b'a'*4096)
239+
s_wr.write(b'a'*4096)
240+
s_wr.write(b'a'*4096)
241+
whilec_wr.transport.is_reading():
242+
awaitasyncio.sleep(0)
243+
244+
# Get the writer in a waiting state by sending data until the
245+
# socket buffers are full on both server and client sockets and
246+
# the kernel stops accepting more data
247+
s_wr.write(b'a'*c_sock.getsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF))
248+
s_wr.write(b'a'*s_sock.getsockopt(socket.SOL_SOCKET,socket.SO_SNDBUF))
249+
self.assertNotEqual(s_wr.transport.get_write_buffer_size(),0)
250+
251+
task=asyncio.create_task(srv.wait_closed())
252+
awaitasyncio.sleep(0)
253+
self.assertFalse(task.done())
179254

255+
srv.close()
256+
srv.abort_clients()
257+
awaitasyncio.sleep(0)
258+
awaitasyncio.sleep(0)
259+
self.assertTrue(task.done())
180260

181261

182262
# Test the various corner cases of Unix server socket removal
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add:meth:`asyncio.Server.close_clients` and
2+
:meth:`asyncio.Server.abort_clients` methods which allow to more forcefully
3+
close an asyncio server.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp