Using with asyncio
Theelasticsearch
package supports async/await withasyncio andaiohttp. You can either installaiohttp
directly or use the[async]
extra:
$ python -m pip install elasticsearch aiohttp# - OR -$ python -m pip install elasticsearch[async]
After installation all async API endpoints are available via~elasticsearch.AsyncElasticsearch
and are used in the same way as other APIs, with an extraawait
:
import asynciofrom elasticsearch import AsyncElasticsearchclient = AsyncElasticsearch()async def main(): resp = await client.search( index="documents", body={"query": {"match_all": {}}}, size=20, ) print(resp)asyncio.run(main())
All APIs that are available under the sync client are also available under the async client.
ASGI (Asynchronous Server Gateway Interface) is a way to serve Python web applications making use of async I/O to achieve better performance. Some examples of ASGI frameworks include FastAPI, Django 3.0+, and Starlette. If you’re using one of these frameworks along with Elasticsearch then you should be using~elasticsearch.AsyncElasticsearch
to avoid blocking the event loop with synchronous network calls for optimal performance.
Elastic APM also supports tracing of async Elasticsearch queries just the same as synchronous queries. For an example on how to configureAsyncElasticsearch
with a popular ASGI frameworkFastAPI and APM tracing there is apre-built example in theexamples/fastapi-apm
directory.
See also theUsing OpenTelemetry page.
If when trying to useAsyncElasticsearch
you receiveValueError: You must have 'aiohttp' installed to use AiohttpHttpNode
you should ensure that you haveaiohttp
installed in your environment (check with$ python -m pip freeze | grep aiohttp
). Otherwise, async support won’t be available.
Previously asyncio was supported separately via theelasticsearch-async package. Theelasticsearch-async
package has been deprecated in favor ofAsyncElasticsearch
provided by theelasticsearch
package in v7.8 and onwards.
This warning is created byaiohttp
when an open HTTP connection is garbage collected. You’ll typically run into this when closing your application. To resolve the issue ensure that~elasticsearch.AsyncElasticsearch.close
is called before the~elasticsearch.AsyncElasticsearch
instance is garbage collected.
For example if using FastAPI that might look like this:
import osfrom contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom elasticsearch import AsyncElasticsearchELASTICSEARCH_URL = os.environ["ELASTICSEARCH_URL"]client = None@asynccontextmanagerasync def lifespan(app: FastAPI): global client client = AsyncElasticsearch(ELASTICSEARCH_URL) yield await client.close()app = FastAPI(lifespan=lifespan)@app.get("/")async def main(): return await client.info()
You can run this example by saving it tomain.py
and executingELASTICSEARCH_URL=http://localhost:9200 uvicorn main:app
.
Async variants of all helpers are available inelasticsearch.helpers
and are all prefixed withasync_*
. You’ll notice that these APIs are identical to the ones in the syncClient helpers documentation.
All async helpers that accept an iterator or generator also accept async iterators and async generators.