Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Lightweight Apache Arrow data frame for Rust

License

NotificationsYou must be signed in to change notification settings

alttch/myval

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

What is Myval?

Mýval (pronounced as [m'ival]) is translated from Czech as raccoon.

Why not a bear-name?

The common name for raccoon in Czech is "medvídek mýval" which can betranslated as "little bear".

But there is Polars?

Myval is not a competitor of Polars. Myval is a lightweight Arrow data framewhich is focused on in-place data transformation and IPC.

Because Arrow has got the standardized data layout, data frames can beconverted to Polars and vice-versa with zero-copy:

let polars_df = polars::frame::DataFrame::from(myval_df);let myval_df = myval::DataFrame::from(polars_df);

As well as Polars, Myval is based onarrow2.

Some tricks

IPC

Consider there is an Arrow stream block (Schema+Chunk) received from e.g. RPCor Pub/Sub. Convert the block into a Myval data frame:

let df = myval::DataFrame::from_ipc_block(&buf).unwrap();

Need to send a data frame back? Convert it to Arrow stream block with a singleline of code:

let buf = df.into_ipc_block().unwrap();

Need to send sliced? No problem, there are methods which can easily returnsliced series, sliced data frames or IPC chunks.

Overriding data types

Consider there is an i64-column "time" which contains nanosecond timestamps.Let us override its data type:

use myval::{DataType,TimeUnit};df.set_data_type("time",DataType::Timestamp(TimeUnit::Nanosecond,None)).unwrap();

Parsing numbers from strings

Consider there is a utf8-column "value" which should be parsed to floats:

df.parse::<f64>("value").unwrap();

Basic in-place math

df.add("col",1_000i64).unwrap();df.sub("col",1_000i64).unwrap();df.mul("col",1_000i64).unwrap();df.div("col",1_000i64).unwrap();

Custom in-place transformations

df.apply("time", |time| time.map(|t:i64| t /1_000)).unwrap();

Horizontal join

df.join(df2).unwrap();

Concatenation

let merged = myval::concat(&[&df1,&df2,&df3]).unwrap();

Set column ordering

Consider there is a Myval data frame with columns "voltage", "temp1", "temp2","temp3" which has received data from a server column-by-column in randomordering. Let us correct the ordering back to normal:

df.set_ordering(&["voltage","temp1","temp2","temp3"]);

From/to JSON

Myval data frames can be parsed fromserde_json Value (map only) or convertedto Value (map/array). This requires "json" crate feature:

// create Object value from a data frame, converted to serde_json::Maplet val = serde_json::Value::Object(df.to_json_map().unwrap());// define JSON parserletmut parser = myval::convert::json::Parser::new().with_type_mapping("name",DataType::LargeUtf8);// add more columns if requiredparser = parser.with_type_mapping("time",DataType::Int64);parser = parser.with_type_mapping("status",DataType::Int32);let parsed_df = parser.parse_value(val).unwrap();
  • Some data types can not be correctly parsed from Value objects (e.g.Timestamp), use DataFrame methods to correct them to the required ones.

  • If a column is defined in a json::Parser object but missing in Value, it iscreated as null-filled.

Others

Check the documentation:https://docs.rs/myval

Working with databases

Arrow provides several ways to work with databases. Myval additionally providestools to work with PostgreSQL databases in the easy way via the popularsqlx crate ("postgres" feature must beenabled):

Fetching data from a database

use futures::stream::TryStreamExt;let pool =PgPoolOptions::new().connect("postgres://postgres:welcome@localhost/postgres").await.unwrap();let max_size =100_000;letmut stream = myval::db::postgres::fetch("select * from test".to_owned(),Some(max_size), pool.clone());// the stream returns data frames one by one with max data frame size (in// bytes) = max_sizewhileletSome(df) = stream.try_next().await.unwrap(){// do some stuff}

Why does the stream object require PgPool? There is one important reason: suchstream objects are static and can be stored anywhere, e.g. used as cursors in aclient-server architecture.

Pushing data into a database

Server

let df =DataFrame::from_ipc_block(payload).unwrap();// The first received data frame must have "database" field in its schema// metadata. Next data frames can go without it.ifletSome(dbparams) = df.metadata().get("database"){let params: myval::db::postgres::Params = serde_json::from_str(dbparams).unwrap();let processed_rows:usize = myval::db::postgres::push(&df,&params,&pool).await.unwrap();}

Client

Let us push Polars data frame into a PostgreSQL database:

use serde_json::json;letmut df = myval::DataFrame::from(polars_df);df.metadata_mut().insert(// set "database" metadata field"database".to_owned(),    serde_json::to_string(&json!({// table, required"table":"test",// PostgreSQL schema, optional"postgres":{"schema":"public"},// keys, required if the table has got keys/unique indexes"keys":["id"],// some field parameters"fields":{// another way to declare a key field//"id": { "key": true },// the following data frame columns contain strings which must be// sent to the database as JSON (for json/jsonb PostgreSQL types)"data1":{"json":true},"data2":{"json":true}}}))?,);// send the data frame to the server in a single or multiple chunks/blocks

PostgreSQL types supported

  • BOOL, INT2 (16-bit int), INT4 (32-bit int), INT8 (64-bit int), FLOAT4 (32-bitfloat), FLOAT8 (64-bit float)

  • TIMESTAMP, TIMESTAMPTZ (time zone information is discarded as Arrow arrayscan not have different time zones for individual records)

  • CHAR, VARCHAR

  • JSON/JSONB (encoded to strings as LargeUtf8 when fetched)

General limitations

  • Myval is not designed for data engineering. Use Polars.

  • Myval series can contain a single chunk only and there are no plans to extendthis. When a Polars data frame with multiple chunks is converted to Myval, thechunks are automatically aggregated.

  • Some features (conversion to Polars, PostgreSQL) are experimental, use atyour own risk.

About

Myval is a part ofEVA ICS Machine Learningkit developed byBohemiaAutomation.

Bohemia Automation /Altertech is a group of companies with 15+ yearsof experience in the enterprise automation and industrial IoT. Our setupsinclude power plants, factories and urban infrastructure. Largest of them have1M+ sensors and controlled devices and the bar raises higher and higher everyday.


[8]ページ先頭

©2009-2025 Movatter.jp