Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis.

License

NotificationsYou must be signed in to change notification settings

arkflow-rs/arkflow

Repository files navigation

English |中文

RustLicense

Latest docs |Dev docs

ArkFlow - High-performance rust stream processing engine | Product Hunt

High performance Rust stream processing engine seamlessly integrates AI capabilities,providing powerful real-time data processing and intelligent analysis.It not only supports multiple input/output sources and processors, but also enables easy loading and execution of machine learning models,enabling streaming data and inference, anomaly detection, and complex event processing.

Cloud Native Landscape

   

ArkFlow enlisted in theCNCF Cloud Native Landscape.

Features

  • High Performance: Built on Rust and Tokio async runtime, offering excellent performance and low latency
  • Multiple Data Sources: Support for Kafka, MQTT, HTTP, files, and other input/output sources
  • Powerful Processing Capabilities: Built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, batchprocessing, and other processors
  • Extensible: Modular design, easy to extend with new input, buffer, output, and processor components

Installation

Building from Source

# Clone the repositorygit clone https://github.com/arkflow-rs/arkflow.gitcd arkflow# Build the projectcargo build --release# Run testscargotest

Quick Start

  1. Create a configuration fileconfig.yaml:
logging:level:infostreams:  -input:type:"generate"context:'{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'interval:1sbatch_size:10pipeline:thread_num:4processors:        -type:"json_to_arrow"        -type:"sql"query:"SELECT * FROM flow WHERE value >= 10"output:type:"stdout"error_output:type:"stdout"
  1. Run ArkFlow:
./target/release/arkflow --config config.yaml

Configuration Guide

ArkFlow uses YAML format configuration files, supporting the following main configuration items:

Top-level Configuration

logging:level:info# Log level: debug, info, warn, errorstreams:# Stream definition list  -input:# Input configuration# ...pipeline:# Processing pipeline configuration# ...output:# Output configuration# ...error_output:# Error output configuration# ...buffer:# Buffer configuration# ...

Input Components

ArkFlow supports multiple input sources:

  • Kafka: Read data from Kafka topics
  • MQTT: Subscribe to messages from MQTT topics
  • HTTP: Receive data via HTTP
  • File: Reading data from files(Csv,Json, Parquet, Avro, Arrow) using SQL
  • Generator: Generate test data
  • Database: Query data from databases(MySQL, PostgreSQL, SQLite, Duckdb)
  • Nats: Subscribe to messages from Nats topics
  • Redis: Subscribe to messages from Redis channels or lists
  • Websocket: Subscribe to messages from WebSocket connections
  • Modbus: Read data from Modbus devices

Example:

input:type:kafkabrokers:    -localhost:9092topics:    -test-topicconsumer_group:test-groupclient_id:arkflowstart_from_latest:true

Processors

ArkFlow provides multiple data processors:

  • JSON: JSON data processing and transformation
  • SQL: Process data using SQL queries
  • Protobuf: Protobuf encoding/decoding
  • Batch Processing: Process messages in batches
  • Vrl: Process data usingVRL

Example:

pipeline:thread_num:4processors:    -type:json_to_arrow    -type:sqlquery:"SELECT * FROM flow WHERE value >= 10"

Output Components

ArkFlow supports multiple output targets:

  • Kafka: Write data to Kafka topics
  • MQTT: Publish messages to MQTT topics
  • HTTP: Send data via HTTP
  • Standard Output: Output data to the console
  • Drop: Discard data
  • Nats: Publish messages to Nats topics

Example:

output:type:kafkabrokers:    -localhost:9092topic:type:valuevalue:type:valuevalue:test-topicclient_id:arkflow-producer

Error Output Components

ArkFlow supports multiple error output targets:

  • Kafka: Write error data to Kafka topics
  • MQTT: Publish error messages to MQTT topics
  • HTTP: Send error data via HTTP
  • Standard Output: Output error data to the console
  • Drop: Discard error data
  • Nats: Publish messages to Nats topics

Example:

error_output:type:kafkabrokers:    -localhost:9092topic:type:valuevalue:error-topicclient_id:error-arkflow-producer

Buffer Components

ArkFlow provides buffer capabilities to handle backpressure and temporary storage of messages:

  • Memory Buffer: Memory buffer, for high-throughput scenarios and window aggregation.
  • Session Window: The Session Window buffer component provides a session-based message grouping mechanism wheremessages are grouped based on activity gaps. It implements a session window that closes after a configurable period ofinactivity.
  • Sliding Window: The Sliding Window buffer component provides a time-based windowing mechanism for processingmessage batches. It implements a sliding window algorithm with configurable window size, slide interval and slidesize.
  • Tumbling Window: The Tumbling Window buffer component provides a fixed-size, non-overlapping windowing mechanismfor processing message batches. It implements a tumbling window algorithm with configurable interval settings.

Example:

buffer:type:memorycapacity:10000# Maximum number of messages to buffertimeout:10s# Maximum time to buffer messages

Examples

Kafka to Kafka Data Processing

streams:  -input:type:kafkabrokers:        -localhost:9092topics:        -test-topicconsumer_group:test-grouppipeline:thread_num:4processors:        -type:json_to_arrow        -type:sqlquery:"SELECT * FROM flow WHERE value > 100"output:type:kafkabrokers:        -localhost:9092topic:type:valuevalue:test-topic

Generate Test Data and Process

streams:  -input:type:"generate"context:'{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'interval:1msbatch_size:10000pipeline:thread_num:4processors:        -type:"json_to_arrow"        -type:"sql"query:"SELECT count(*) FROM flow WHERE value >= 10 group by sensor"output:type:"stdout"

Users

  • Conalog(Country: South Korea)

ArkFlow Plugin

ArkFlow Plugin Examples

License

ArkFlow is licensed under theApache License 2.0.

Community

Discord:https://discord.gg/CwKhzb8pux

If you like or are using this project to learn or start your solution, please give it a star⭐. Thanks!

About

High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors5


[8]ページ先頭

©2009-2025 Movatter.jp