Pathway
Pathway is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.
This notebook demonstrates how to use a livePathway
data indexing pipeline withLangchain
. You can query the results of this pipeline from your chains in the same manner as you would a regular vector store. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.
In this notebook, we will use apublic demo document processing pipeline that:
- Monitors several cloud data sources for data changes.
- Builds a vector index for the data.
To have your own document processing pipeline check thehosted offering orbuild your own.
We will connect to the index using aVectorStore
client, which implements thesimilarity_search
function to retrieve matching documents.
The basic pipeline used in this document allows to effortlessly build a simple vector index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors.
You'll need to installlangchain-community
withpip install -qU langchain-community
to use this integration
Querying the data pipeline
To instantiate and configure the client you need to provide either theurl
or thehost
andport
of your document indexing pipeline. In the code below we use a publicly availabledemo pipeline, which REST API you can access athttps://demo-document-indexing.pathway.stream
. This demo ingests documents fromGoogle Drive andSharepoint and maintains an index for retrieving documents.
from langchain_community.vectorstoresimport PathwayVectorClient
client= PathwayVectorClient(url="https://demo-document-indexing.pathway.stream")
And we can start asking queries
query="What is Pathway?"
docs= client.similarity_search(query)
print(docs[0].page_content)
Your turn!Get your pipeline or uploadnew documents to the demo pipeline and retry the query!
Filtering based on file metadata
We support document filtering usingjmespath expressions, for instance:
# take into account only sources modified later than unix timestamp
docs= client.similarity_search(query, metadata_filter="modified_at >= `1702672093`")
# take into account only sources modified later than unix timestamp
docs= client.similarity_search(query, metadata_filter="owner == `james`")
# take into account only sources with path containing 'repo_readme'
docs= client.similarity_search(query, metadata_filter="contains(path, 'repo_readme')")
# and of two conditions
docs= client.similarity_search(
query, metadata_filter="owner == `james` && modified_at >= `1702672093`"
)
# or of two conditions
docs= client.similarity_search(
query, metadata_filter="owner == `james` || modified_at >= `1702672093`"
)
Getting information on indexed files
PathwayVectorClient.get_vectorstore_statistics()
gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of last updated one. You can use it in your chains to tell the user how fresh is your knowledge base.
client.get_vectorstore_statistics()
Your own pipeline
Running in production
To have your own Pathway data indexing pipeline check the Pathway's offer forhosted pipelines. You can also run your own Pathway pipeline - for information on how to build the pipeline refer toPathway guide.
Processing documents
The vectorization pipeline supports pluggable components for parsing, splitting and embedding documents. For embedding and splitting you can useLangchain components or checkembedders andsplitters available in Pathway. If parser is not provided, it defaults toUTF-8
parser. You can find available parsershere.
Related
- Vector storeconceptual guide
- Vector storehow-to guides