- Notifications
You must be signed in to change notification settings - Fork3
Lightweight Apache Arrow data frame for Rust
License
alttch/myval
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Mýval (pronounced as [m'ival]) is translated from Czech as raccoon.
The common name for raccoon in Czech is "medvídek mýval" which can betranslated as "little bear".
Myval is not a competitor of Polars. Myval is a lightweight Arrow data framewhich is focused on in-place data transformation and IPC.
Because Arrow has got the standardized data layout, data frames can beconverted to Polars and vice-versa with zero-copy:
let polars_df = polars::frame::DataFrame::from(myval_df);let myval_df = myval::DataFrame::from(polars_df);
As well as Polars, Myval is based onarrow2.
Consider there is an Arrow stream block (Schema+Chunk) received from e.g. RPCor Pub/Sub. Convert the block into a Myval data frame:
let df = myval::DataFrame::from_ipc_block(&buf).unwrap();
Need to send a data frame back? Convert it to Arrow stream block with a singleline of code:
let buf = df.into_ipc_block().unwrap();
Need to send sliced? No problem, there are methods which can easily returnsliced series, sliced data frames or IPC chunks.
Consider there is an i64-column "time" which contains nanosecond timestamps.Let us override its data type:
use myval::{DataType,TimeUnit};df.set_data_type("time",DataType::Timestamp(TimeUnit::Nanosecond,None)).unwrap();
Consider there is a utf8-column "value" which should be parsed to floats:
df.parse::<f64>("value").unwrap();
df.add("col",1_000i64).unwrap();df.sub("col",1_000i64).unwrap();df.mul("col",1_000i64).unwrap();df.div("col",1_000i64).unwrap();
df.apply("time", |time| time.map(|t:i64| t /1_000)).unwrap();
df.join(df2).unwrap();
let merged = myval::concat(&[&df1,&df2,&df3]).unwrap();
Consider there is a Myval data frame with columns "voltage", "temp1", "temp2","temp3" which has received data from a server column-by-column in randomordering. Let us correct the ordering back to normal:
df.set_ordering(&["voltage","temp1","temp2","temp3"]);
Myval data frames can be parsed fromserde_json Value (map only) or convertedto Value (map/array). This requires "json" crate feature:
// create Object value from a data frame, converted to serde_json::Maplet val = serde_json::Value::Object(df.to_json_map().unwrap());// define JSON parserletmut parser = myval::convert::json::Parser::new().with_type_mapping("name",DataType::LargeUtf8);// add more columns if requiredparser = parser.with_type_mapping("time",DataType::Int64);parser = parser.with_type_mapping("status",DataType::Int32);let parsed_df = parser.parse_value(val).unwrap();
Some data types can not be correctly parsed from Value objects (e.g.Timestamp), use DataFrame methods to correct them to the required ones.
If a column is defined in a json::Parser object but missing in Value, it iscreated as null-filled.
Check the documentation:https://docs.rs/myval
Arrow provides several ways to work with databases. Myval additionally providestools to work with PostgreSQL databases in the easy way via the popularsqlx crate ("postgres" feature must beenabled):
use futures::stream::TryStreamExt;let pool =PgPoolOptions::new().connect("postgres://postgres:welcome@localhost/postgres").await.unwrap();let max_size =100_000;letmut stream = myval::db::postgres::fetch("select * from test".to_owned(),Some(max_size), pool.clone());// the stream returns data frames one by one with max data frame size (in// bytes) = max_sizewhileletSome(df) = stream.try_next().await.unwrap(){// do some stuff}
Why does the stream object require PgPool? There is one important reason: suchstream objects are static and can be stored anywhere, e.g. used as cursors in aclient-server architecture.
let df =DataFrame::from_ipc_block(payload).unwrap();// The first received data frame must have "database" field in its schema// metadata. Next data frames can go without it.ifletSome(dbparams) = df.metadata().get("database"){let params: myval::db::postgres::Params = serde_json::from_str(dbparams).unwrap();let processed_rows:usize = myval::db::postgres::push(&df,¶ms,&pool).await.unwrap();}
Let us push Polars data frame into a PostgreSQL database:
use serde_json::json;letmut df = myval::DataFrame::from(polars_df);df.metadata_mut().insert(// set "database" metadata field"database".to_owned(), serde_json::to_string(&json!({// table, required"table":"test",// PostgreSQL schema, optional"postgres":{"schema":"public"},// keys, required if the table has got keys/unique indexes"keys":["id"],// some field parameters"fields":{// another way to declare a key field//"id": { "key": true },// the following data frame columns contain strings which must be// sent to the database as JSON (for json/jsonb PostgreSQL types)"data1":{"json":true},"data2":{"json":true}}}))?,);// send the data frame to the server in a single or multiple chunks/blocks
BOOL, INT2 (16-bit int), INT4 (32-bit int), INT8 (64-bit int), FLOAT4 (32-bitfloat), FLOAT8 (64-bit float)
TIMESTAMP, TIMESTAMPTZ (time zone information is discarded as Arrow arrayscan not have different time zones for individual records)
CHAR, VARCHAR
JSON/JSONB (encoded to strings as LargeUtf8 when fetched)
Myval is not designed for data engineering. Use Polars.
Myval series can contain a single chunk only and there are no plans to extendthis. When a Polars data frame with multiple chunks is converted to Myval, thechunks are automatically aggregated.
Some features (conversion to Polars, PostgreSQL) are experimental, use atyour own risk.
Myval is a part ofEVA ICS Machine Learningkit developed byBohemiaAutomation.
Bohemia Automation /Altertech is a group of companies with 15+ yearsof experience in the enterprise automation and industrial IoT. Our setupsinclude power plants, factories and urban infrastructure. Largest of them have1M+ sensors and controlled devices and the bar raises higher and higher everyday.
About
Lightweight Apache Arrow data frame for Rust