Movatterモバイル変換


[0]ホーム

URL:


Skip to main content
OurBuilding Ambient Agents with LangGraph course is now available on LangChain Academy!
Open In ColabOpen on GitHub

Airbyte CDK (Deprecated)

Note:AirbyteCDKLoader is deprecated. Please useAirbyteLoader instead.

Airbyte is a data integration platform for ELT pipelines from APIs, databases & files to warehouses & lakes. It has the largest catalog of ELT connectors to data warehouses and databases.

A lot of source connectors are implemented using theAirbyte CDK. This loader allows to run any of these connectors and return the data as documents.

Installation

First, you need to install theairbyte-cdk python package.

%pip install--upgrade--quiet  airbyte-cdk

Then, either install an existing connector from theAirbyte Github repository or create your own connector using theAirbyte CDK.

For example, to install the Github connector, run

%pip install--upgrade--quiet"source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

Some sources are also published as regular packages on PyPI

Example

Now you can create anAirbyteCDKLoader based on the imported source. It takes aconfig object that's passed to the connector. You also have to pick the stream you want to retrieve records from by name (stream_name). Check the connectors documentation page and spec definition for more information on the config object and available streams. For the Github connectors these are:

from langchain_community.document_loaders.airbyteimport AirbyteCDKLoader
from source_github.sourceimport SourceGithub# plug in your own source here

config={
# your github configuration
"credentials":{"api_url":"api.github.com","personal_access_token":"<token>"},
"repository":"<repo>",
"start_date":"<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}

issues_loader= AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues"
)
API Reference:AirbyteCDKLoader

Now you can load documents the usual way

docs= issues_loader.load()

Asload returns a list, it will block until all documents are loaded. To have better control over this process, you can also use thelazy_load method which returns an iterator instead:

docs_iterator= issues_loader.lazy_load()

Keep in mind that by default the page content is empty and the metadata object contains all the information from the record. To create documents in a different way, pass in a record_handler function when creating the loader:

from langchain_core.documentsimport Document


defhandle_record(record,id):
return Document(
page_content=record.data["title"]+"\n"+(record.data["body"]or""),
metadata=record.data,
)


issues_loader= AirbyteCDKLoader(
source_class=SourceGithub,
config=config,
stream_name="issues",
record_handler=handle_record,
)

docs= issues_loader.load()
API Reference:Document

Incremental loads

Some streams allow incremental loading, this means the source keeps track of synced records and won't load them again. This is useful for sources that have a high volume of data and are updated frequently.

To take advantage of this, store thelast_state property of the loader and pass it in when creating the loader again. This will ensure that only new records are loaded.

last_state= issues_loader.last_state# store safely

incremental_issue_loader= AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues", state=last_state
)

new_docs= incremental_issue_loader.load()

Related


[8]ページ先頭

©2009-2025 Movatter.jp