Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Python wrapper for the Sling CLI tool

License

NotificationsYou must be signed in to change notification settings

slingdata-io/sling-python

Repository files navigation

logo

Slings from a data source to a data target.

Installation

pip install sling orpip install sling[arrow] for streaming.

Then you should be able to runsling --help from command line.

Running a Extract-Load Task

CLI

sling run --src-conn MY_PG --src-stream myschema.mytable \  --tgt-conn YOUR_SNOWFLAKE --tgt-object yourschema.yourtable \  --mode full-refresh

Or passing a yaml/json string or file

cat'source: MY_POSTGREStarget: MY_SNOWFLAKE# default config options which apply to all streamsdefaults:  mode: full-refresh  object: new_schema.{stream_schema}_{stream_table}streams:  my_schema.*:'> /path/to/replication.yamlsling run -r /path/to/replication.yaml

Using theReplication class

Run a replication from file:

importyamlfromslingimportReplication# From a YAML filereplication=Replication(file_path="path/to/replication.yaml")replication.run()# Or load into objectwithopen('path/to/replication.yaml')asfile:config=yaml.load(file,Loader=yaml.FullLoader)replication=Replication(**config)replication.run()

Build a replication dynamically:

fromslingimportReplication,ReplicationStream,Mode# build sling replicationstreams= {}for (folder,table_name)inlist(folders):streams[folder]=ReplicationStream(mode=Mode.FULL_REFRESH,object=table_name,primary_key='_hash_id')replication=Replication(source='aws_s3',target='snowflake',streams=streams,env=dict(SLING_STREAM_URL_COLUMN='true',SLING_LOADED_AT_COLUMN='true'),debug=True,)replication.run()

Using theSling Class

For more direct control and streaming capabilities, you can use theSling class, which mirrors the CLI interface.

Basic Usage withrun() method

importosfromslingimportSling,Mode# Set postgres & snowflake connection# see https://docs.slingdata.io/connections/database-connectionsos.environ["POSTGRES"]='postgres://...'os.environ["SNOWFLAKE"]='snowflake://...'# Database to database transferSling(src_conn="postgres",src_stream="public.users",tgt_conn="snowflake",tgt_object="public.users_copy",mode=Mode.FULL_REFRESH).run()# Database to fileSling(src_conn="postgres",src_stream="select * from users where active = true",tgt_object="file:///tmp/active_users.csv").run()# File to databaseSling(src_stream="file:///path/to/data.csv",tgt_conn="snowflake",tgt_object="public.imported_data").run()

Input Streaming - Python Data to Target

💡 Tip: Installpip install sling[arrow] for better streaming performance and improved data type handling.

📊 DataFrame Support: Theinput parameter accepts lists of dictionaries, pandas DataFrames, or polars DataFrames. DataFrame support preserves data types when using Arrow format.

⚠️ Note: Be careful with large numbers ofSling invocations usinginput orstream() methods when working with external systems (databases, file systems). Each call re-opens the connection since it invokes the underlying sling binary. For better performance and connection reuse, consider using theReplication class instead, which maintains open connections across multiple operations.

importosfromslingimportSling,Format# Set postgres connection# see https://docs.slingdata.io/connections/database-connectionsos.environ["POSTGRES"]='postgres://...'# Stream Python data to CSV filedata= [    {"id":1,"name":"John","age":30},    {"id":2,"name":"Jane","age":25},    {"id":3,"name":"Bob","age":35}]Sling(input=data,tgt_object="file:///tmp/output.csv").run()# Stream Python data to databaseSling(input=data,tgt_conn="postgres",tgt_object="public.users").run()# Stream Python data to JSON Lines fileSling(input=data,tgt_object="file:///tmp/output.jsonl",tgt_options={"format":Format.JSONLINES}).run()# Stream from generator (memory efficient for large datasets)defdata_generator():foriinrange(10000):yield {"id":i,"value":f"item_{i}","timestamp":"2023-01-01"}Sling(input=data_generator(),tgt_object="file:///tmp/large_dataset.csv").run()# Stream pandas DataFrame to databaseimportpandasaspddf=pd.DataFrame({"id": [1,2,3,4],"name": ["Alice","Bob","Charlie","Diana"],"age": [25,30,35,28],"salary": [50000,60000,70000,55000]})Sling(input=df,tgt_conn="postgres",tgt_object="public.employees").run()# Stream polars DataFrame to CSV fileimportpolarsaspldf=pl.DataFrame({"product_id": [101,102,103],"product_name": ["Laptop","Mouse","Keyboard"],"price": [999.99,25.50,75.00],"in_stock": [True,False,True]})Sling(input=df,tgt_object="file:///tmp/products.csv").run()# DataFrame with column selectionSling(input=df,select=["product_name","price"],# Only export specific columnstgt_object="file:///tmp/product_prices.csv").run()

Output Streaming withstream()

importosfromslingimportSling# Set postgres connection# see https://docs.slingdata.io/connections/database-connectionsos.environ["POSTGRES"]='postgres://...'# Stream data from databasesling=Sling(src_conn="postgres",src_stream="public.users",limit=1000)forrecordinsling.stream():print(f"User:{record['name']}, Age:{record['age']}")# Stream data from filesling=Sling(src_stream="file:///path/to/data.csv")# Process records one by one (memory efficient)forrecordinsling.stream():# Process each recordprocessed_data=transform_record(record)# Could save to another system, send to API, etc.# Stream with parameterssling=Sling(src_conn="postgres",src_stream="public.orders",select=["order_id","customer_name","total"],where="total > 100",limit=500)records=list(sling.stream())print(f"Found{len(records)} high-value orders")

High-Performance Streaming withstream_arrow()

🚀 Performance: Thestream_arrow() method provides the highest performance streaming with full data type preservation by using Apache Arrow's columnar format. Requirespip install sling[arrow].

📊 Type Safety: Unlikestream() which may convert data types during CSV serialization,stream_arrow() preserves exact data types including integers, floats, timestamps, and more.

importosfromslingimportSling# Set postgres connection# see https://docs.slingdata.io/connections/database-connectionsos.environ["POSTGRES"]='postgres://...'# Basic Arrow streaming from databasesling=Sling(src_conn="postgres",src_stream="public.users",limit=1000)# Get Arrow RecordBatchStreamReader for maximum performancereader=sling.stream_arrow()# Convert to Arrow Table for analysistable=reader.read_all()print(f"Received{table.num_rows} rows with{table.num_columns} columns")print(f"Column names:{table.column_names}")print(f"Schema:{table.schema}")# Convert to pandas DataFrame with preserved typesiftable.num_rows>0:df=table.to_pandas()print(df.dtypes)# Shows preserved data types# Stream Arrow file with type preservationsling=Sling(src_stream="file:///path/to/data.arrow",src_options={"format":"arrow"})reader=sling.stream_arrow()table=reader.read_all()# Access columnar data directly (very efficient)forcolumn_nameintable.column_names:column=table.column(column_name)print(f"{column_name}:{column.type}")# Process Arrow batches for large datasets (memory efficient)sling=Sling(src_conn="postgres",src_stream="select * from large_table")reader=sling.stream_arrow()forbatchinreader:# Process each batch separately to manage memoryprint(f"Processing batch with{batch.num_rows} rows")# Convert batch to pandas if neededbatch_df=batch.to_pandas()# Process batch_df...# Round-trip with Arrow format preservationimportpandasaspd# Write DataFrame to Arrow file with type preservationdf=pd.DataFrame({"id": [1,2,3],"amount": [100.50,250.75,75.25],"timestamp":pd.to_datetime(["2023-01-01","2023-01-02","2023-01-03"]),"active": [True,False,True]})Sling(input=df,tgt_object="file:///tmp/data.arrow",tgt_options={"format":"arrow"}).run()# Read back with full type preservationsling=Sling(src_stream="file:///tmp/data.arrow",src_options={"format":"arrow"})reader=sling.stream_arrow()restored_table=reader.read_all()restored_df=restored_table.to_pandas()# Types are exactly preserved (no string conversion)print(restored_df.dtypes)assertrestored_df['active'].dtype=='bool'assert'datetime64'instr(restored_df['timestamp'].dtype)

Notes:

  • stream_arrow() requires PyArrow:pip install sling[arrow]
  • Cannot be used with a target object (userun() instead)
  • Provides the best performance for large datasets
  • Preserves exact data types including timestamps, decimals, and booleans
  • Ideal for analytics workloads and data science applications

Round-trip Examples

importosfromslingimportSling# Set postgres connection# see https://docs.slingdata.io/connections/database-connectionsos.environ["POSTGRES"]='postgres://...'# Python → File → Pythonoriginal_data= [    {"id":1,"name":"Alice","score":95.5},    {"id":2,"name":"Bob","score":87.2}]# Step 1: Python data to filesling_write=Sling(input=original_data,tgt_object="file:///tmp/scores.csv")sling_write.run()# Step 2: File back to Pythonsling_read=Sling(src_stream="file:///tmp/scores.csv")loaded_data=list(sling_read.stream())# Python → Database → Python (with transformations)sling_to_db=Sling(input=original_data,tgt_conn="postgres",tgt_object="public.temp_scores")sling_to_db.run()sling_from_db=Sling(src_conn="postgres",src_stream="select *, score * 1.1 as boosted_score from public.temp_scores",)transformed_data=list(sling_from_db.stream())# DataFrame → Database → DataFrame (with pandas/polars)importpandasaspd# Start with pandas DataFramedf=pd.DataFrame({"user_id": [1,2,3],"purchase_amount": [100.50,250.75,75.25],"category": ["electronics","clothing","books"]})# Write DataFrame to databaseSling(input=df,tgt_conn="postgres",tgt_object="public.purchases").run()# Read back with SQL transformations as pandas DataFramesling_query=Sling(src_conn="postgres",src_stream="""        SELECT category,               COUNT(*) as purchase_count,               AVG(purchase_amount) as avg_amount        FROM public.purchases        GROUP BY category    """)summary_data=list(sling_query.stream())summary_df=pd.DataFrame(summary_data)print(summary_df)

Using thePipeline class

Run aPipeline:

fromslingimportPipelinefromsling.hooksimportStepLog,StepCopy,StepReplication,StepHTTP,StepCommand# From a YAML filepipeline=Pipeline(file_path="path/to/pipeline.yaml")pipeline.run()# Or using Hook objects for type safetypipeline=Pipeline(steps=[StepLog(message="Hello world"),StepCopy(from_="sftp//path/to/file",to="aws_s3/path/to/file"),StepReplication(path="path/to/replication.yaml"),StepHTTP(url="https://trigger.webhook.com"),StepCommand(command=["ls","-l"],print_output=True)    ],env={"MY_VAR":"value"})pipeline.run()# Or programmatically using dictionariespipeline=Pipeline(steps=[        {"type":"log","message":"Hello world"},        {"type":"copy","from":"sftp//path/to/file","to":"aws_s3/path/to/file"},        {"type":"replication","path":"path/to/replication.yaml"},        {"type":"http","url":"https://trigger.webhook.com"},        {"type":"command","command": ["ls","-l"],"print":True}    ],env={"MY_VAR":"value"})pipeline.run()

Testing

pytest sling/tests/tests.py -vpytest sling/tests/test_sling_class.py -v

MCP

To Login:

mcp-publisher login dns --domain slingdata.io --private-key $(openssl pkey -in mcp-key.pem  -noout -text | grep -A3 "priv:" | tail -n +2 | tr -d ' :\n')`

To Publish:

# to publish, adjust the version first in server.jsonmcp-publisher publish# checkcurl"https://registry.modelcontextprotocol.io/v0/servers?search=io.slingdata/sling-cli"

mcp-name: io.slingdata/sling-cli

About

Python wrapper for the Sling CLI tool

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors7


[8]ページ先頭

©2009-2025 Movatter.jp