- Notifications
You must be signed in to change notification settings - Fork1k
#748: don't share httpx.AsyncHTTPTransport between event loops#1695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Uh oh!
There was an error while loading.Please reload this page.
Conversation
c27343f
toe5ccea4
Comparee5ccea4
to45b685d
CompareOk, I've amended my commit with a new proposal, though I haven't tested it as much as the previous one. Now, this idea is to use a proxy object for the Transport, and decide on the real object only when we're sure we're in a valid async context. What do you think ? Is that accesptable and should I work on that or do you see a flaw in that plan ? |
Hm, I thought of a much cleaner solution: keeping transports in a BUT the whole point is that transports keep a strong ref to the loop, so if we keep a strong ref to the transport, the loop's refcount is never going to go to zero 😭 I guess though we could remove any item from the dict where the loop is closed 🤔 |
45b685d
tobbc34be
Compareewjoachim commentedMay 12, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
New tentative (likely going to get complain for missing coverage). I've kept the weakref just in case, but I've added code that will remove from the (weak) keys when the loops that are closed. I think this should solve the issue. Happy to get some feedback before spending time adding missing tests. Also, if you want to take it from here, feel free :) |
bojan2501 commentedMay 13, 2025
I tested changes with simple Streamlit app and Vertex AI. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Just some comments to make review maybe easier ?
if loop.is_closed(): | ||
del self.transports[loop] | ||
return self.transports.setdefault(asyncio.get_running_loop(), httpx.AsyncHTTPTransport()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Because I'm using setdefault here, it meanshttpx.AsyncHTTPTransport()
is always assigned even when it's not going to be used. Is that something we want to optimize ?
async def __aenter__(self): | ||
await self.get_transport().__aenter__() | ||
return self | ||
async def __aexit__( | ||
self, | ||
exc_type: type[BaseException] | None = None, | ||
exc_value: BaseException | None = None, | ||
traceback: TracebackType | None = None, | ||
) -> None: | ||
await self.get_transport().__aexit__(exc_type, exc_value, traceback) |
ewjoachimMay 13, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Those seem to be the not-covered lines. Meaning we (or rather httpx) never use theaenter
/aexit
API of the transport. We could just raiseNotImplementError
rather than do something not obvious and not needed wait for someone to actually need it ?
I'm saying "not obvious" because it's very slightly unclear whether we would want__aenter__
to returnself
or the underlying transport
@cache | ||
def _cached_async_http_transport() -> httpx.AsyncHTTPTransport: | ||
return httpx.AsyncHTTPTransport() | ||
def _get_transport_for_loop() -> _PerLoopTransport: | ||
return _PerLoopTransport() |
ewjoachimMay 13, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is a convoluted way of just defining the instance as a module variable, but it works. Also, it was kinda-already done that way before so I kept it
self.transports: MutableMapping[asyncio.AbstractEventLoop, httpx.AsyncHTTPTransport] = ( | ||
weakref.WeakKeyDictionary() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It might be debatable if we want this to be a WeakKeyDictionary or a normal dict.
It would probably work with a normal dict, but it feels that if a loop is referenced by nobody but this mapping, we really don't need to have it here, so I'd say it makes sense to keep it.
Note that if we found a way to remove the strong reference to the loop, e.g. by closing the pool, then we would probably be happy with the WeakKeyDictionary without the for loop inget_transport
, but I'm not sure I know when we should close the pool for this to stay useful (in the end, the goal is to reuse the tcp connections in the end)
Like they say, 2 hardest problems in compsci: naming things andcache invalidation and off-by-one errors.
Uh oh!
There was an error while loading.Please reload this page.
I'll be leaving this PR for Pydiantic folks to whatever they want with them, but specifically for solving the Gemini issue, using the new GoogleModel should be a better alternative. Now the transport issue still needs to be solved at some point. |
@ewjoachim sorry the lack of reply, I was not ignoring, I was talking it internally, because I actually expected unexpected issues with that implementation. 😔 I think the way we intend to move forward is to not do the cache we are doing, and have an async context manager to do the right thing in the Agent level. I'll write more about it soon. But yeah, you can leave it to us. I'll get back to this soon. |
Great ! Yeah, I too think the proper fix is to avoid creating any httpx session/transport/... until we're 100% sure we're in an async context, and then make sure we stop it when we leave the context. But yeah, either there's going to be change in the API and that's probably annoying, or there's going to be more profound change in the integration, but it doesn't make a lot of sense that this kind of change would be lead by a first time contributor, so yeah, it's probably better this way :) |
Uh oh!
There was an error while loading.Please reload this page.
Fixes#748
In
cached_async_http_client
we cache the http client (...duh) and its transport. The transport keeps a pointer to the current event loop through:and will try to reuse this event loop on subsequent calls. This causes
RuntimeError('Event loop is closed')
in case that loop was closed and we're using another one.My fix makes it so that the real instance of the transport is only evaluated when we're in a loop context, and returns a different instance for each distinct loop.
I've been usinghttps://github.com/oscar-broman/pydantic-ai-gemini-issue-mre as a reproducer, and it fails on
main
and passes on my branch.The reason we don't see the same test failure in this repo's tests surely isthis: because there are fixtures with
scope='session'
, anyIO only creates a single event loop that each test share, so we can't see the issue (also, we mock HTTP calls withpytest-recording
and I'm not 100% sure the mock occurs at a level that would make the test fail visible)All that is to say: I'm not sure how best to implement a reproducer without impeding too much on the codebase. Opinions from maintainers welcome :)
Warning
I'm afraid this makes the following fixture very brittle. It will work if the anyio single event loop issue continues to be, but if we manage to have an event loop for each test, then we won't be able to properly close the clients. That said: isn't that an issue with real code in the real life too ? Who's responsible for closing those cached clients ? (maybe they close on
__del__
but that's not ideal, is it ?pydantic-ai/tests/conftest.py
Lines 202 to 217 ine7ce675