- Notifications
You must be signed in to change notification settings - Fork5
Apache Arrow DataFusion and Ballista query engines
License
cube-js/arrow-datafusion
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
DataFusion is an extensible query execution framework, written inRust, that usesApache Arrow as itsin-memory format.
DataFusion supports both an SQL and a DataFrame API for buildinglogical query plans as well as a query optimizer and execution enginecapable of parallel execution against partitioned data sources (CSVand Parquet) using threads.
DataFusion also supports distributed query execution via theBallista crate.
DataFusion is used to create modern, fast and efficient datapipelines, ETL processes, and database systems, which need theperformance of Rust and Apache Arrow and want to provide their usersthe convenience of an SQL interface or a DataFrame API.
- High Performance: Leveraging Rust and Arrow's memory model, DataFusion achieves very high performance
- Easy to Connect: Being part of the Apache Arrow ecosystem (Arrow, Parquet and Flight), DataFusion works well with the rest of the big data ecosystem
- Easy to Embed: Allowing extension at almost any point in its design, DataFusion can be tailored for your specific usecase
- High Quality: Extensively tested, both by itself and with the rest of the Arrow ecosystem, DataFusion can be used as the foundation for production systems.
Here are some of the projects known to use DataFusion:
- Ballista Distributed Compute Platform
- Cloudfuse Buzz
- Cube Store
- datafusion-python
- delta-rs
- InfluxDB IOx Time Series Database
- ROAPI
- Tensorbase
- Squirtle
(if you know of another project, please submit a PR to add a link!)
Run a SQL query against data stored in a CSV:
use datafusion::prelude::*;use datafusion::arrow::util::pretty::print_batches;use datafusion::arrow::record_batch::RecordBatch;#[tokio::main]asyncfnmain() -> datafusion::error::Result<()>{// register the tableletmut ctx =ExecutionContext::new(); ctx.register_csv("example","tests/example.csv",CsvReadOptions::new())?;// create a plan to run a SQL querylet df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;// execute and print resultslet results:Vec<RecordBatch> = df.collect().await?;print_batches(&results)?;Ok(())}
Use the DataFrame API to process data stored in a CSV:
use datafusion::prelude::*;use datafusion::arrow::util::pretty::print_batches;use datafusion::arrow::record_batch::RecordBatch;#[tokio::main]asyncfnmain() -> datafusion::error::Result<()>{// create the dataframeletmut ctx =ExecutionContext::new();let df = ctx.read_csv("tests/example.csv",CsvReadOptions::new())?;let df = df.filter(col("a").lt_eq(col("b")))?.aggregate(vec![col("a")],vec![min(col("b"))])?.limit(100)?;// execute and print resultslet results:Vec<RecordBatch> = df.collect().await?;print_batches(&results)?;Ok(())}
Both of these examples will produce
+---+--------+| a | MIN(b) |+---+--------+| 1 | 2 |+---+--------+DataFusion ispublished on crates.io, and iswell documented on docs.rs.
To get started, add the following to yourCargo.toml file:
[dependencies]datafusion ="4.0.0-SNAPSHOT"
DataFusion also includes a simple command-line interactive SQL utility. See theCLI reference for more information.
- SQL Parser
- SQL Query Planner
- Query Optimizer
- Constant folding
- Join Reordering
- Limit Pushdown
- Projection push down
- Predicate push down
- Type coercion
- Parallel query execution
- Projection
- Filter (WHERE)
- Filter post-aggregate (HAVING)
- Limit
- Aggregate
- Common math functions
- cast
- try_cast
- Postgres compatible String functions
- ascii
- bit_length
- btrim
- char_length
- character_length
- chr
- concat
- concat_ws
- initcap
- left
- length
- lpad
- ltrim
- octet_length
- regexp_replace
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- split_part
- starts_with
- strpos
- substr
- to_hex
- translate
- trim
- Miscellaneous/Boolean functions
- nullif
- Common date/time functions
- Basic date functions
- Basic time functions
- Basic timestamp functions
- nested functions
- Array of columns
- Schema Queries
- SHOW TABLES
- SHOW COLUMNS
- information_schema.{tables, columns}
- information_schema other views
- Sorting
- Nested types
- Lists
- Subqueries
- Common table expressions
- Set Operations
- UNION ALL
- UNION
- INTERSECT
- MINUS
- Joins
- INNER JOIN
- LEFT JOIN
- RIGHT JOIN
- FULL JOIN
- CROSS JOIN
- Window
- Empty window
- Common window functions
- Window with PARTITION BY clause
- Window with ORDER BY clause
- Window with FILTER clause
- Window with custom WINDOW FRAME
- UDF and UDAF for window functions
- CSV
- Parquet primitive types
- Parquet nested types
DataFusion is designed to be extensible at all points. To that end, you can provide your own custom:
- User Defined Functions (UDFs)
- User Defined Aggregate Functions (UDAFs)
- User Defined Table Source (
TableProvider) for tables - User Defined
Optimizerpasses (plan rewrites) - User Defined
LogicalPlannodes - User Defined
ExecutionPlannodes
This library currently supports many SQL constructs, including
CREATE EXTERNAL TABLE X STORED AS PARQUET LOCATION '...';to register a table's locationsSELECT ... FROM ...together with any expressionALIASto name an expressionCASTto change types, including e.g.Timestamp(Nanosecond, None)- most mathematical unary and binary expressions such as
+,/,sqrt,tan,>=. WHEREto filterGROUP BYtogether with one of the following aggregations:MIN,MAX,COUNT,SUM,AVGORDER BYtogether with an expression and optionalASCorDESCand also optionalNULLS FIRSTorNULLS LAST
DataFusion strives to implement a subset of thePostgreSQL SQL dialect where possible. We explicitly choose a single dialect to maximize interoperability with other tools and allow reuse of the PostgreSQL documents and tutorials as much as possible.
Currently, only a subset of the PostgreSQL dialect is implemented, and we will document any deviations.
DataFusion supports the showing metadata about the tables available. This information can be accessed using the views of the ISO SQLinformation_schema schema or the DataFusion specificSHOW TABLES andSHOW COLUMNS commands.
More information can be found in thePostgres docs).
To show tables available for use in DataFusion, use theSHOW TABLES command or theinformation_schema.tables view:
> show tables;+---------------+--------------------+------------+------------+| table_catalog | table_schema | table_name | table_type |+---------------+--------------------+------------+------------+| datafusion | public | t | BASE TABLE || datafusion | information_schema | tables | VIEW |+---------------+--------------------+------------+------------+>select*frominformation_schema.tables;+---------------+--------------------+------------+--------------+| table_catalog | table_schema | table_name | table_type |+---------------+--------------------+------------+--------------+| datafusion | public | t | BASE TABLE || datafusion | information_schema | TABLES | SYSTEM TABLE |+---------------+--------------------+------------+--------------+
To show the schema of a table in DataFusion, use theSHOW COLUMNS command or the orinformation_schema.columns view:
> show columnsfrom t;+---------------+--------------+------------+-------------+-----------+-------------+| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |+---------------+--------------+------------+-------------+-----------+-------------+| datafusion | public | t | a | Int32 | NO || datafusion | public | t | b | Utf8 | NO || datafusion | public | t | c | Float32 | NO |+---------------+--------------+------------+-------------+-----------+-------------+>select table_name, column_name, ordinal_position, is_nullable, data_typefrominformation_schema.columns;+------------+-------------+------------------+-------------+-----------+| table_name | column_name | ordinal_position | is_nullable | data_type |+------------+-------------+------------------+-------------+-----------+| t | a |0 | NO | Int32 || t | b |1 | NO | Utf8 || t | c |2 | NO | Float32 |+------------+-------------+------------------+-------------+-----------+
DataFusion uses Arrow, and thus the Arrow type system, for queryexecution. The SQL types fromsqlparser-rsare mapped to Arrow types according to the following table
| SQL Data Type | Arrow DataType |
|---|---|
CHAR | Utf8 |
VARCHAR | Utf8 |
UUID | Not yet supported |
CLOB | Not yet supported |
BINARY | Not yet supported |
VARBINARY | Not yet supported |
DECIMAL | Float64 |
FLOAT | Float32 |
SMALLINT | Int16 |
INT | Int32 |
BIGINT | Int64 |
REAL | Float64 |
DOUBLE | Float64 |
BOOLEAN | Boolean |
DATE | Date32 |
TIME | Time64(TimeUnit::Millisecond) |
TIMESTAMP | Timestamp(TimeUnit::Nanosecond) |
INTERVAL | Not yet supported |
REGCLASS | Not yet supported |
TEXT | Not yet supported |
BYTEA | Not yet supported |
CUSTOM | Not yet supported |
ARRAY | Not yet supported |
There is no formal document describing DataFusion's architecture yet, but the following presentations offer a good overview of its different components and how they interact together.
- (March 2021): The DataFusion architecture is described inQuery Engine Design and the Rust-Based DataFusion in Apache Arrow:recording (DataFusion content starts ~ 15 minutes in) andslides
- (Feburary 2021): How DataFusion is used within the Ballista Project is described in *Ballista: Distributed Compute with Rust and Apache Arrow:recording
Please seeDevelopers Guide for information about developing DataFusion.
About
Apache Arrow DataFusion and Ballista query engines
Resources
License
Code of conduct
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Languages
- Rust91.5%
- Shell3.7%
- Dockerfile2.2%
- Python1.1%
- TypeScript0.7%
- Batchfile0.4%
- Other0.4%