Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

#748: don't share httpx.AsyncHTTPTransport between event loops#1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
ewjoachim wants to merge2 commits intopydantic:main
base:main
Choose a base branch
Loading
fromewjoachim:fix-event-loop-gemini

Conversation

ewjoachim
Copy link

@ewjoachimewjoachim commentedMay 12, 2025
edited
Loading

Fixes#748

Incached_async_http_client we cache the http client (...duh) and its transport. The transport keeps a pointer to the current event loop through:

transport._pool._connections[0]._connection._network_stream._stream.transport_stream._transport._loop

and will try to reuse this event loop on subsequent calls. This causesRuntimeError('Event loop is closed') in case that loop was closed and we're using another one.

My fix makes it so that the real instance of the transport is only evaluated when we're in a loop context, and returns a different instance for each distinct loop.

I've been usinghttps://github.com/oscar-broman/pydantic-ai-gemini-issue-mre as a reproducer, and it fails onmain and passes on my branch.

The reason we don't see the same test failure in this repo's tests surely isthis: because there are fixtures withscope='session', anyIO only creates a single event loop that each test share, so we can't see the issue (also, we mock HTTP calls withpytest-recording and I'm not 100% sure the mock occurs at a level that would make the test fail visible)

All that is to say: I'm not sure how best to implement a reproducer without impeding too much on the codebase. Opinions from maintainers welcome :)

Warning

I'm afraid this makes the following fixture very brittle. It will work if the anyio single event loop issue continues to be, but if we manage to have an event loop for each test, then we won't be able to properly close the clients. That said: isn't that an issue with real code in the real life too ? Who's responsible for closing those cached clients ? (maybe they close on__del__ but that's not ideal, is it ?

@pytest.fixture(autouse=True)
asyncdefclose_cached_httpx_client()->AsyncIterator[None]:
yield
forproviderin [
'openai',
'anthropic',
'azure',
'google-gla',
'google-vertex',
'groq',
'mistral',
'cohere',
'deepseek',
None,
]:
awaitcached_async_http_client(provider=provider).aclose()

@ewjoachimewjoachimforce-pushed thefix-event-loop-gemini branch fromc27343f toe5ccea4CompareMay 12, 2025 13:55
@ewjoachim
Copy link
Author

Screenshot 2025-05-12 at 16 24 41

We have another issue. Everywhere we copy the http client on the model from the provider (see screenshot above), we then won't be using the cache attributes to know whether this client belongs to the appropriate loop or not.

@ewjoachimewjoachimforce-pushed thefix-event-loop-gemini branch frome5ccea4 to45b685dCompareMay 12, 2025 15:47
@ewjoachim
Copy link
Author

Ok, I've amended my commit with a new proposal, though I haven't tested it as much as the previous one. Now, this idea is to use a proxy object for the Transport, and decide on the real object only when we're sure we're in a valid async context.

What do you think ? Is that accesptable and should I work on that or do you see a flaw in that plan ?

@ewjoachim
Copy link
Author

Hm, I thought of a much cleaner solution: keeping transports in aweakrefs.WeakKeyDictionary[EventLoop, Transports]. This way, we keep the transports while the loop is alive and drop it when it's gone.

BUT the whole point is that transports keep a strong ref to the loop, so if we keep a strong ref to the transport, the loop's refcount is never going to go to zero 😭

I guess though we could remove any item from the dict where the loop is closed 🤔

@ewjoachimewjoachimforce-pushed thefix-event-loop-gemini branch from45b685d tobbc34beCompareMay 12, 2025 21:13
@ewjoachim
Copy link
Author

ewjoachim commentedMay 12, 2025
edited
Loading

New tentative (likely going to get complain for missing coverage).

I've kept the weakref just in case, but I've added code that will remove from the (weak) keys when the loops that are closed.

I think this should solve the issue. Happy to get some feedback before spending time adding missing tests. Also, if you want to take it from here, feel free :)

bojan2501 reacted with thumbs up emoji

@bojan2501
Copy link

I tested changes with simple Streamlit app and Vertex AI.
Old issue with Closed Loops is gone.
Will try do some more tests and see how it works.
Chears for the fix.

@KludexKludex self-assigned thisMay 13, 2025
Copy link
Author

@ewjoachimewjoachim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Just some comments to make review maybe easier ?

if loop.is_closed():
del self.transports[loop]

return self.transports.setdefault(asyncio.get_running_loop(), httpx.AsyncHTTPTransport())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Because I'm using setdefault here, it meanshttpx.AsyncHTTPTransport() is always assigned even when it's not going to be used. Is that something we want to optimize ?

Comment on lines +538 to +548
async def __aenter__(self):
await self.get_transport().__aenter__()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
await self.get_transport().__aexit__(exc_type, exc_value, traceback)
Copy link
Author

@ewjoachimewjoachimMay 13, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Those seem to be the not-covered lines. Meaning we (or rather httpx) never use theaenter/aexit API of the transport. We could just raiseNotImplementError rather than do something not obvious and not needed wait for someone to actually need it ?

I'm saying "not obvious" because it's very slightly unclear whether we would want__aenter__ to returnself or the underlying transport

Comment on lines 518 to +520
@cache
def _cached_async_http_transport() -> httpx.AsyncHTTPTransport:
return httpx.AsyncHTTPTransport()
def _get_transport_for_loop() -> _PerLoopTransport:
return _PerLoopTransport()
Copy link
Author

@ewjoachimewjoachimMay 13, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is a convoluted way of just defining the instance as a module variable, but it works. Also, it was kinda-already done that way before so I kept it

Comment on lines +526 to +528
self.transports: MutableMapping[asyncio.AbstractEventLoop, httpx.AsyncHTTPTransport] = (
weakref.WeakKeyDictionary()
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It might be debatable if we want this to be a WeakKeyDictionary or a normal dict.

It would probably work with a normal dict, but it feels that if a loop is referenced by nobody but this mapping, we really don't need to have it here, so I'd say it makes sense to keep it.

Note that if we found a way to remove the strong reference to the loop, e.g. by closing the pool, then we would probably be happy with the WeakKeyDictionary without the for loop inget_transport, but I'm not sure I know when we should close the pool for this to stay useful (in the end, the goal is to reuse the tcp connections in the end)

Like they say, 2 hardest problems in compsci: naming things andcache invalidation and off-by-one errors.

@ewjoachim
Copy link
Author

I'll be leaving this PR for Pydiantic folks to whatever they want with them, but specifically for solving the Gemini issue, using the new GoogleModel should be a better alternative. Now the transport issue still needs to be solved at some point.

@Kludex
Copy link
Member

@ewjoachim sorry the lack of reply, I was not ignoring, I was talking it internally, because I actually expected unexpected issues with that implementation. 😔

I think the way we intend to move forward is to not do the cache we are doing, and have an async context manager to do the right thing in the Agent level.

I'll write more about it soon. But yeah, you can leave it to us. I'll get back to this soon.

@ewjoachim
Copy link
Author

Great !

Yeah, I too think the proper fix is to avoid creating any httpx session/transport/... until we're 100% sure we're in an async context, and then make sure we stop it when we leave the context. But yeah, either there's going to be change in the API and that's probably annoying, or there's going to be more profound change in the integration, but it doesn't make a lot of sense that this kind of change would be lead by a first time contributor, so yeah, it's probably better this way :)

@DouweMDouweM mentioned this pull requestJul 7, 2025
6 tasks
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers
No reviews
Assignees

@KludexKludex

Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

Gemini causes 'Event loop is closed' when running inside an async context
3 participants
@ewjoachim@bojan2501@Kludex

[8]ページ先頭

©2009-2025 Movatter.jp