Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-113538: Allow client connections to be closed#114432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
gvanrossum merged 14 commits intopython:mainfromCendioOssman:server_close
Mar 11, 2024

Conversation

CendioOssman
Copy link
Contributor

@CendioOssmanCendioOssman commentedJan 22, 2024
edited by github-actionsbot
Loading

Give applications the option of more forcefully terminating client connections for asyncio servers. Useful when terminating a service and there is limited time to wait for clients to finish up their work.


📚 Documentation preview 📚:https://cpython-previews--114432.org.readthedocs.build/

Give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.
@CendioOssman
Copy link
ContributorAuthor

PR includes unit tests, but I also used this more complete example to test and showcase the functionality:

#!/usr/bin/python3importasyncioimportsocket# Well behaved or spammy protocol?FLOOD=TrueclassEchoServerProtocol(asyncio.Protocol):defconnection_made(self,transport):peername=transport.get_extra_info('peername')print('Connection from {}'.format(peername))self.transport=transportself._paused=Falseself._write()defpause_writing(self):self._paused=Truedefresume_writing(self):self._paused=Falseself._write()def_write(self):ifnotFLOOD:returnifself._paused:returnself.transport.write(b'a'*65536)asyncio.get_running_loop().call_soon(self._write)defdata_received(self,data):message=data.decode()print('Data received: {!r}'.format(message))defstop():loop=asyncio.get_running_loop()loop.stop()client=Nonedefserver_up(fut):globalserverserver=fut.result()print('Server running')globalclientclient=socket.create_connection(('127.0.0.1',8888))loop=asyncio.new_event_loop()asyncio.set_event_loop(loop)fut=asyncio.ensure_future(loop.create_server(lambda:EchoServerProtocol(),'127.0.0.1',8888))fut.add_done_callback(server_up)# Simulate terminationloop.call_later(2.5,stop)try:loop.run_forever()finally:print('Closing...')server.close()try:loop.run_until_complete(asyncio.wait_for(server.wait_closed(),0.1))exceptTimeoutError:print('Timeout waiting for clients. Attempting close...')server.close_clients()try:loop.run_until_complete(asyncio.wait_for(server.wait_closed(),0.1))exceptTimeoutError:print('Timeout waiting for client close. Attempting abort...')server.abort_clients()loop.run_until_complete(asyncio.wait_for(server.wait_closed(),0.1))loop.close()ifclientisnotNone:client.close()print('Closed')

Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This looks great, but I have one reservation. The server now keeps the transports alive, through the set of clients. Maybe it would be better if that was aweak set, so that if somehow a transport was freed without going through the usual close routine, the server doesn't hold onto it forever? Thoughts?

@CendioOssman
Copy link
ContributorAuthor

I figured I'd start things simple, so no weak references unless there is a practical issue that requires them.

So, are there any scenarios where this is relevant?

The transport detaches from the server in the same place where it closes the server. That should mean that this is something thatmust be done, as otherwise the destructor will complain. So I don't think well-written code should have an issue.

Might there be an issue with badly written code, then? The primary case would be if the destructor check fails to trigger. Which I guess is the case with my suggestion. It is difficult to trigger, though. Streams are a bit buggy, so abandoning them doesn't always result in a cleanup (#114914), and transports are referenced by the event loop until you tell them to pause reading.

But difficult isn't impossible, so this case breaks when applying this PR:

#!/usr/bin/python3importasyncioimportsocketclassEchoServerProtocol(asyncio.Protocol):defconnection_made(self,transport):peername=transport.get_extra_info('peername')print('Connection from {}'.format(peername))self.transport=transportself.transport.pause_reading()defdata_received(self,data):message=data.decode()print('Data received: {!r}'.format(message))client=Nonedefserver_up(fut):globalserverserver=fut.result()print('Server running')globalclientclient=socket.create_connection(('127.0.0.1',8888))loop=asyncio.new_event_loop()asyncio.set_event_loop(loop)fut=asyncio.ensure_future(loop.create_server(lambda:EchoServerProtocol(),'127.0.0.1',8888))fut.add_done_callback(server_up)# Simulate terminationloop.call_later(2.5,loop.stop)try:loop.run_forever()importgcgc.collect()finally:server.close()loop.close()ifclientisnotNone:client.close()

This case creates a transport, pauses it, and then fails to maintain any other reference to it. Without this PR it gets collected whengc.collect() is called (or earlier). With this PR, it only gets collected at the end.

So a weak reference does indeed seem appropriate.

We want to be able to detect if the application fails to keep track ofthe transports, so we cannot keep them alive by using a hard reference.
The application might be waiting for all transports to close, so we needto properly inform the server that this transport is done.
@gvanrossum
Copy link
Member

Sorry for the delay -- thisjust made it to the top of my review queue. I hope to get to this soon!

Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Not bad, some thoughts.

@@ -277,7 +277,8 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout, ssl_shutdown_timeout=None):
self._loop = loop
self._sockets = sockets
self._active_count = 0
# Weak references so abandoned transports can be detected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
# Weak references so abandoned transports can bedetected
# Weak references so abandoned transports can beignored

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The wording here was intentional. Weak references is to my knowledge the only way to detect abandoned objects. But it's not this code that does that detection, so I can understand the confusion. How about:

Weak references so we don't break Transport's ability to detect abandoned transports

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Ah, you're thinking from the POV of the transport, whose__del__ must be called to "detect" (i.e., warn about) that it was abandoned. I was thinking from the POV of the loop inclose_clients(), where we want to ignore (not encounter) transports that have been closed already.

I'll make it your choice which wording to use.


srv.close()
srv.close_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yeah, short sleeps are a nuisance in asyncio tests. Usually they can be replaced by a small number of sleep(0) calls though -- usually 1, rarely 2 or 3. sleep(0) is special and guarantees we go through the event loop exactly once.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

A bunch ofsleep(0) felt even more arbitrary. :/

What's your suggestion here? Keep it as is? Or something else?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

A bunch of sleep(0) wastes less time (the asyncio tests are already too slow), and doesn't risk the test becoming flaky due to timing out on slow platforms (a problem we've struggled with). The right number of sleep(0) call takes up no more time than needed, and lets the machinery go through its motions in a deterministic matter. So I recommend sleep(0).

while s_wr.transport.get_write_buffer_size() == 0:
s_wr.write(b'a' * 65536)
await asyncio.sleep(0)
await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Eh, what's going on with this one? Is it the same issue that can be fixed with a small number of sleep(0) calls, or different?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't know, to be honest. Might be a kernel or libc issue where it shuffles buffers around and/or allocates more space.

Without it, I cannot reliably get both the server and client to a state where buffers are full. Which is needed for the test to check the right thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

IMO this PR is not finished until you get to the bottom of that. If you really need then to reach a specific state and there's no deterministic way to get there, consider manipulating internal APIs.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This was a pain, but I think I got it fixed. The core problem is that the kernel dynamically increases the socket send buffer size. And it takes about 20-30 ms to do so.

I think I've worked around that by specifying an explicit buffer size. That should turn off the dynamic resizing, if I remember things correctly.


srv.close()
srv.abort_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This one looks similar to the first sleep.

@@ -277,7 +277,8 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout, ssl_shutdown_timeout=None):
self._loop = loop
self._sockets = sockets
self._active_count = 0
# Weak references so abandoned transports can be detected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Ah, you're thinking from the POV of the transport, whose__del__ must be called to "detect" (i.e., warn about) that it was abandoned. I was thinking from the POV of the loop inclose_clients(), where we want to ignore (not encounter) transports that have been closed already.

I'll make it your choice which wording to use.


srv.close()
srv.close_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

A bunch of sleep(0) wastes less time (the asyncio tests are already too slow), and doesn't risk the test becoming flaky due to timing out on slow platforms (a problem we've struggled with). The right number of sleep(0) call takes up no more time than needed, and lets the machinery go through its motions in a deterministic matter. So I recommend sleep(0).

while s_wr.transport.get_write_buffer_size() == 0:
s_wr.write(b'a' * 65536)
await asyncio.sleep(0)
await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

IMO this PR is not finished until you get to the bottom of that. If you really need then to reach a specific state and there's no deterministic way to get there, consider manipulating internal APIs.

Comment on lines 299 to 300
# Note that 'transport' may already be missing from
# self._clients if it has been garbage collected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

IMO you don't need this comment either (it was explaining the condition).

One could be made clearar, and the other is probably superfluous.
Try to get the streams and the kernel in to a more deterministic stateby specifying fixed buffering limits.
Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Thanks for fixing all that. I have one remaining concern only.

Comment on lines 239 to 246
while c_wr.transport.is_reading():
await asyncio.sleep(0)

# Get the writer in a waiting state by sending data until the
# kernel stops accepting more in to the send buffer
while s_wr.transport.get_write_buffer_size() == 0:
s_wr.write(b'a' * 4096)
await asyncio.sleep(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Can we put an upper bound on thesewhile loops and fail the test when they loop too many times? (Possibly afor loop with abreak is the best idiom to do this.)

I worry that something in the network stack might cause one or the other to loop forever, and I'd rather not waste the CPU time in CI over this.

How many iterations do you expect? Is it deterministic regardless of platform?

willingc reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It's a bit sensitive to platform behaviour, but I believe I can get something that dynamically adapts

No possibly infinite loop. Instead ask the system how much buffer spaceit has and fill that.
Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

LG, but can you fix the merge conflict? The tests won't run until that's fixed.

@CendioOssman
Copy link
ContributorAuthor

Sure. How do you prefer to handle that? A rebase against currentmain?

@gvanrossum
Copy link
Member

No rebase please! Pleasemerge the updated main branch into your branch, fix the conflicts as part of the merge, and then commit and push (not push -f). The extra commits will be squashed when I merge back into main.

Copy link
Member

@gvanrossumgvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Thanks! LGTM. I'll merge.

@mhsmith
Copy link
Member

It looks like one of the new tests isn't always passing:#116423 (comment)

gvanrossum added a commit to gvanrossum/cpython that referenced this pull requestMar 11, 2024
@gvanrossum
Copy link
Member

It looks like one of the new tests isn't always passing:#116423 (comment)

Thanks, I've created a new PR on the same issue that will revert it. Sorry for the inconvenience.

gvanrossum added a commit that referenced this pull requestMar 12, 2024
…#114432)" (#116632)Revert "gh-113538: Add asycio.Server.{close,abort}_clients (#114432)"Reason: The new test doesn't always pass:#116423 (comment)This reverts commit1d0d49a.
@gvanrossum
Copy link
Member

The PR has been reverted.

@CendioOssman could you look into the test failures? I probably should have reviewed the test more carefully, sorry.

@CendioOssman
Copy link
ContributorAuthor

Of course. And once they are fixed, how do you want to proceed? Submit a new PR using the existing branch?

@gvanrossum
Copy link
Member

gvanrossum commentedMar 12, 2024
edited
Loading

Of course. And once they are fixed, how do you want to proceed? Submit a new PR using the existing branch?

That sounds fine to me. If you can't get it to work, just rename the branch locally and push that.

gvanrossum pushed a commit that referenced this pull requestMar 18, 2024
These give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.This is a do-over with a test fix forgh-114432, which was reverted.
vstinner pushed a commit to vstinner/cpython that referenced this pull requestMar 20, 2024
…on#116784)These give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.This is a do-over with a test fix forpythongh-114432, which was reverted.
adorilson pushed a commit to adorilson/cpython that referenced this pull requestMar 25, 2024
These give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.
adorilson pushed a commit to adorilson/cpython that referenced this pull requestMar 25, 2024
…ort}_clients (python#114432)" (python#116632)Revert "pythongh-113538: Add asycio.Server.{close,abort}_clients (python#114432)"Reason: The new test doesn't always pass:python#116423 (comment)This reverts commit1d0d49a.
adorilson pushed a commit to adorilson/cpython that referenced this pull requestMar 25, 2024
…on#116784)These give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.This is a do-over with a test fix forpythongh-114432, which was reverted.
diegorusso pushed a commit to diegorusso/cpython that referenced this pull requestApr 17, 2024
These give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.
diegorusso pushed a commit to diegorusso/cpython that referenced this pull requestApr 17, 2024
…ort}_clients (python#114432)" (python#116632)Revert "pythongh-113538: Add asycio.Server.{close,abort}_clients (python#114432)"Reason: The new test doesn't always pass:python#116423 (comment)This reverts commit1d0d49a.
diegorusso pushed a commit to diegorusso/cpython that referenced this pull requestApr 17, 2024
…on#116784)These give applications the option of more forcefully terminating clientconnections for asyncio servers. Useful when terminating a service andthere is limited time to wait for clients to finish up their work.This is a do-over with a test fix forpythongh-114432, which was reverted.
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@gvanrossumgvanrossumgvanrossum approved these changes

@1st11st1Awaiting requested review from 1st11st1 is a code owner

@asvetlovasvetlovAwaiting requested review from asvetlovasvetlov is a code owner

@kumaraditya303kumaraditya303Awaiting requested review from kumaraditya303kumaraditya303 is a code owner

@willingcwillingcAwaiting requested review from willingcwillingc is a code owner

Assignees
No one assigned
Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

3 participants
@CendioOssman@gvanrossum@mhsmith

[8]ページ先頭

©2009-2025 Movatter.jp