- Notifications
You must be signed in to change notification settings - Fork3
Lightweight Apache Arrow data frame for Rust
License
alttch/myval
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Mýval (pronounced as [m'ival]) is translated from Czech as raccoon.
The common name for raccoon in Czech is "medvídek mýval" which can betranslated as "little bear".
Myval is not a competitor of Polars. Myval is a lightweight Arrow data framewhich is focused on in-place data transformation and IPC.
Because Arrow has got the standardized data layout, data frames can beconverted to Polars and vice-versa with zero-copy:
let polars_df = polars::frame::DataFrame::from(myval_df);let myval_df = myval::DataFrame::from(polars_df);
As well as Polars, Myval is based onarrow2.
Consider there is an Arrow stream block (Schema+Chunk) received from e.g. RPCor Pub/Sub. Convert the block into a Myval data frame:
let df = myval::DataFrame::from_ipc_block(&buf).unwrap();
Need to send a data frame back? Convert it to Arrow stream block with a singleline of code:
let buf = df.into_ipc_block().unwrap();
Need to send sliced? No problem, there are methods which can easily returnsliced series, sliced data frames or IPC chunks.
Consider there is an i64-column "time" which contains nanosecond timestamps.Let us override its data type:
use myval::{DataType,TimeUnit};df.set_data_type("time",DataType::Timestamp(TimeUnit::Nanosecond,None)).unwrap();
Consider there is a utf8-column "value" which should be parsed to floats:
df.parse::<f64>("value").unwrap();
df.add("col",1_000i64).unwrap();df.sub("col",1_000i64).unwrap();df.mul("col",1_000i64).unwrap();df.div("col",1_000i64).unwrap();
df.apply("time", |time| time.map(|t:i64| t /1_000)).unwrap();
df.join(df2).unwrap();
let merged = myval::concat(&[&df1,&df2,&df3]).unwrap();
Consider there is a Myval data frame with columns "voltage", "temp1", "temp2","temp3" which has received data from a server column-by-column in randomordering. Let us correct the ordering back to normal:
df.set_ordering(&["voltage","temp1","temp2","temp3"]);
Myval data frames can be parsed fromserde_json Value (map only) or convertedto Value (map/array). This requires "json" crate feature:
// create Object value from a data frame, converted to serde_json::Maplet val = serde_json::Value::Object(df.to_json_map().unwrap());// define JSON parserletmut parser = myval::convert::json::Parser::new().with_type_mapping("name",DataType::LargeUtf8);// add more columns if requiredparser = parser.with_type_mapping("time",DataType::Int64);parser = parser.with_type_mapping("status",DataType::Int32);let parsed_df = parser.parse_value(val).unwrap();
Some data types can not be correctly parsed from Value objects (e.g.Timestamp), use DataFrame methods to correct them to the required ones.
If a column is defined in a json::Parser object but missing in Value, it iscreated as null-filled.
Check the documentation:https://docs.rs/myval
Arrow provides several ways to work with databases. Myval additionally providestools to work with PostgreSQL databases in the easy way via the popularsqlx crate ("postgres" feature must beenabled):
use futures::stream::TryStreamExt;let pool =PgPoolOptions::new().connect("postgres://postgres:welcome@localhost/postgres").await.unwrap();let max_size =100_000;letmut stream = myval::db::postgres::fetch("select * from test".to_owned(),Some(max_size), pool.clone());// the stream returns data frames one by one with max data frame size (in// bytes) = max_sizewhileletSome(df) = stream.try_next().await.unwrap(){// do some stuff}
Why does the stream object require PgPool? There is one important reason: suchstream objects are static and can be stored anywhere, e.g. used as cursors in aclient-server architecture.
let df =DataFrame::from_ipc_block(payload).unwrap();// The first received data frame must have "database" field in its schema// metadata. Next data frames can go without it.ifletSome(dbparams) = df.metadata().get("database"){let params: myval::db::postgres::Params = serde_json::from_str(dbparams).unwrap();let processed_rows:usize = myval::db::postgres::push(&df,¶ms,&pool).await.unwrap();}
Let us push Polars data frame into a PostgreSQL database:
use serde_json::json;letmut df = myval::DataFrame::from(polars_df);df.metadata_mut().insert(// set "database" metadata field"database".to_owned(), serde_json::to_string(&json!({// table, required"table":"test",// PostgreSQL schema, optional"postgres":{"schema":"public"},// keys, required if the table has got keys/unique indexes"keys":["id"],// some field parameters"fields":{// another way to declare a key field//"id": { "key": true },// the following data frame columns contain strings which must be// sent to the database as JSON (for json/jsonb PostgreSQL types)"data1":{"json":true},"data2":{"json":true}}}))?,);// send the data frame to the server in a single or multiple chunks/blocks
BOOL, INT2 (16-bit int), INT4 (32-bit int), INT8 (64-bit int), FLOAT4 (32-bitfloat), FLOAT8 (64-bit float)
TIMESTAMP, TIMESTAMPTZ (time zone information is discarded as Arrow arrayscan not have different time zones for individual records)
CHAR, VARCHAR
JSON/JSONB (encoded to strings as LargeUtf8 when fetched)
Myval is not designed for data engineering. Use Polars.
Myval series can contain a single chunk only and there are no plans to extendthis. When a Polars data frame with multiple chunks is converted to Myval, thechunks are automatically aggregated.
Some features (conversion to Polars, PostgreSQL) are experimental, use atyour own risk.
Myval is a part ofEVA ICS Machine Learningkit developed byBohemiaAutomation.
Bohemia Automation /Altertech is a group of companies with 15+ yearsof experience in the enterprise automation and industrial IoT. Our setupsinclude power plants, factories and urban infrastructure. Largest of them have1M+ sensors and controlled devices and the bar raises higher and higher everyday.
About
Lightweight Apache Arrow data frame for Rust
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.