ElasticsearchRetriever
Elasticsearch is a distributed, RESTful search and analytics engine. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. It supports keyword search, vector search, hybrid search and complex filtering.
TheElasticsearchRetriever
is a generic wrapper to enable flexible access to allElasticsearch
features through theQuery DSL. For most use cases the other classes (ElasticsearchStore
,ElasticsearchEmbeddings
, etc.) should suffice, but if they don't you can useElasticsearchRetriever
.
This guide will help you get started with the Elasticsearchretriever. For detailed documentation of allElasticsearchRetriever
features and configurations head to theAPI reference.
Integration details
Retriever | Self-host | Cloud offering | Package |
---|---|---|---|
ElasticsearchRetriever | ✅ | ✅ | langchain_elasticsearch |
Setup
There are two main ways to set up an Elasticsearch instance:
Elastic Cloud:Elastic Cloud is a managed Elasticsearch service. Sign up for afree trial.To connect to an Elasticsearch instance that does not require login credentials (starting the docker instance with security enabled), pass the Elasticsearch URL and index name along with the embedding object to the constructor.
Local Install Elasticsearch: Get started with Elasticsearch by running it locally. The easiest way is to use the official Elasticsearch Docker image. See theElasticsearch Docker documentation for more information.
If you want to get automated tracing from individual queries, you can also set yourLangSmith API key by uncommenting below:
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"
Installation
This retriever lives in thelangchain-elasticsearch
package. For demonstration purposes, we will also installlangchain-community
to generate textembeddings.
%pip install-qU langchain-community langchain-elasticsearch
from typingimport Any, Dict, Iterable
from elasticsearchimport Elasticsearch
from elasticsearch.helpersimport bulk
from langchain_community.embeddingsimport DeterministicFakeEmbedding
from langchain_core.documentsimport Document
from langchain_core.embeddingsimport Embeddings
from langchain_elasticsearchimport ElasticsearchRetriever
Configure
Here we define the connection to Elasticsearch. In this example we use a locally running instance. Alternatively, you can make an account inElastic Cloud and start afree trial.
es_url="http://localhost:9200"
es_client= Elasticsearch(hosts=[es_url])
es_client.info()
For vector search, we are going to use random embeddings just for illustration. For real use cases, pick one of the available LangChainEmbeddings classes.
embeddings= DeterministicFakeEmbedding(size=3)
Define example data
index_name="test-langchain-retriever"
text_field="text"
dense_vector_field="fake_embedding"
num_characters_field="num_characters"
texts=[
"foo",
"bar",
"world",
"hello world",
"hello",
"foo bar",
"bla bla foo",
]
Index data
Typically, users make use ofElasticsearchRetriever
when they already have data in an Elasticsearch index. Here we index some example text documents. If you created an index for example usingElasticsearchStore.from_documents
that's also fine.
defcreate_index(
es_client: Elasticsearch,
index_name:str,
text_field:str,
dense_vector_field:str,
num_characters_field:str,
):
es_client.indices.create(
index=index_name,
mappings={
"properties":{
text_field:{"type":"text"},
dense_vector_field:{"type":"dense_vector"},
num_characters_field:{"type":"integer"},
}
},
)
defindex_data(
es_client: Elasticsearch,
index_name:str,
text_field:str,
dense_vector_field:str,
embeddings: Embeddings,
texts: Iterable[str],
refresh:bool=True,
)->None:
create_index(
es_client, index_name, text_field, dense_vector_field, num_characters_field
)
vectors= embeddings.embed_documents(list(texts))
requests=[
{
"_op_type":"index",
"_index": index_name,
"_id": i,
text_field: text,
dense_vector_field: vector,
num_characters_field:len(text),
}
for i,(text, vector)inenumerate(zip(texts, vectors))
]
bulk(es_client, requests)
if refresh:
es_client.indices.refresh(index=index_name)
returnlen(requests)
index_data(es_client, index_name, text_field, dense_vector_field, embeddings, texts)
7
Instantiation
Vector search
Dense vector retrival using fake embeddings in this example.
defvector_query(search_query:str)-> Dict:
vector= embeddings.embed_query(search_query)# same embeddings as for indexing
return{
"knn":{
"field": dense_vector_field,
"query_vector": vector,
"k":5,
"num_candidates":10,
}
}
vector_retriever= ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=vector_query,
content_field=text_field,
url=es_url,
)
vector_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 1.0, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 0.6770179, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 0.4816144, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 0.46853775, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.2086992, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}})]
BM25
Traditional keyword matching.
defbm25_query(search_query:str)-> Dict:
return{
"query":{
"match":{
text_field: search_query,
},
},
}
bm25_retriever= ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=bm25_query,
content_field=text_field,
url=es_url,
)
bm25_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
Hybrid search
The combination of vector search and BM25 search usingReciprocal Rank Fusion (RRF) to combine the result sets.
defhybrid_query(search_query:str)-> Dict:
vector= embeddings.embed_query(search_query)# same embeddings as for indexing
return{
"retriever":{
"rrf":{
"retrievers":[
{
"standard":{
"query":{
"match":{
text_field: search_query,
}
}
}
},
{
"knn":{
"field": dense_vector_field,
"query_vector": vector,
"k":5,
"num_candidates":10,
}
},
]
}
}
}
hybrid_retriever= ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=hybrid_query,
content_field=text_field,
url=es_url,
)
hybrid_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
Fuzzy matching
Keyword matching with typo tolerance.
deffuzzy_query(search_query:str)-> Dict:
return{
"query":{
"match":{
text_field:{
"query": search_query,
"fuzziness":"AUTO",
}
},
},
}
fuzzy_retriever= ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=fuzzy_query,
content_field=text_field,
url=es_url,
)
fuzzy_retriever.invoke("fox")# note the character tolernace
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.6474311, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.49580228, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.40171927, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
Complex filtering
Combination of filters on different fields.
deffilter_query_func(search_query:str)-> Dict:
return{
"query":{
"bool":{
"must":[
{"range":{num_characters_field:{"gte":5}}},
],
"must_not":[
{"prefix":{text_field:"bla"}},
],
"should":[
{"match":{text_field: search_query}},
],
}
}
}
filtering_retriever= ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
content_field=text_field,
url=es_url,
)
filtering_retriever.invoke("foo")
[Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 1.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 1.0, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 1.0, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 1.0, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}})]
Note that the query match is on top. The other documents that got passed the filter are also in the result set, but they all have the same score.
Custom document mapper
It is possible to cusomize the function that maps an Elasticsearch result (hit) to a LangChain document.
defnum_characters_mapper(hit: Dict[str, Any])-> Document:
num_chars= hit["_source"][num_characters_field]
content= hit["_source"][text_field]
return Document(
page_content=f"This document has{num_chars} characters",
metadata={"text_content": content},
)
custom_mapped_retriever= ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
document_mapper=num_characters_mapper,
url=es_url,
)
custom_mapped_retriever.invoke("foo")
[Document(page_content='This document has 7 characters', metadata={'text_content': 'foo bar'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'world'}),
Document(page_content='This document has 11 characters', metadata={'text_content': 'hello world'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'hello'})]
Usage
Following the above examples, we use.invoke
to issue a single query. Because retrievers are Runnables, we can use any method in theRunnable interface, such as.batch
, as well.
Use within a chain
We can also incorporate retrievers intochains to build larger applications, such as a simpleRAG application. For demonstration purposes, we instantiate an OpenAI chat model as well.
%pip install-qU langchain-openai
from langchain_core.output_parsersimport StrOutputParser
from langchain_core.promptsimport ChatPromptTemplate
from langchain_core.runnablesimport RunnablePassthrough
from langchain_openaiimport ChatOpenAI
prompt= ChatPromptTemplate.from_template(
"""Answer the question based only on the context provided.
Context: {context}
Question: {question}"""
)
llm= ChatOpenAI(model="gpt-4o-mini")
defformat_docs(docs):
return"\n\n".join(doc.page_contentfor docin docs)
chain=(
{"context": vector_retriever| format_docs,"question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
chain.invoke("what is foo?")
API reference
For detailed documentation of allElasticsearchRetriever
features and configurations head to theAPI reference.
Related
- Retrieverconceptual guide
- Retrieverhow-to guides