Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat(processor): supports wasm#568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
chenquan wants to merge2 commits intomain
base:main
Choose a base branch
Loading
fromfeat/wasm
Draft

feat(processor): supports wasm#568

chenquan wants to merge2 commits intomainfromfeat/wasm

Conversation

@chenquan
Copy link
Collaborator

@chenquanchenquan commentedJul 7, 2025
edited by coderabbitaibot
Loading

Summary by CodeRabbit

  • New Features

    • Introduced support for processing message batches using custom WebAssembly modules.
    • Added configuration options to specify WASM modules and functions for processing.
    • Enabled asynchronous execution of WASM-based processors within the plugin framework.
  • Bug Fixes

    • Improved error handling for invalid or missing WASM processor configurations.

@coderabbitai
Copy link
Contributor

coderabbitaibot commentedJul 7, 2025
edited
Loading

Walkthrough

A new WebAssembly (WASM) processor has been introduced to thearkflow-plugin crate. This includes adding thewasmtime dependency, integrating a newwasm module into the processor initialization, and implementing the WASM processor logic to enable asynchronous processing of message batches via user-specified WASM modules.

Changes

File(s)Change Summary
crates/arkflow-plugin/Cargo.tomlAddedwasmtime crate dependency for WASM support.
crates/arkflow-plugin/src/processor/mod.rsDeclared new publicwasm module; updatedinit to initialize thewasm module.
crates/arkflow-plugin/src/processor/wasm.rsImplemented WASM processor: config struct, processor logic, builder, error handling, and tests.

Sequence Diagram(s)

sequenceDiagram    participant User    participant ArkflowProcessor    participant WasmProcessor    participant WasmtimeEngine    participant WasmModule    User->>ArkflowProcessor: Initialize    ArkflowProcessor->>WasmProcessor: init()    WasmProcessor->>WasmtimeEngine: Load WASM module (file or base64)    WasmtimeEngine->>WasmModule: Instantiate    User->>ArkflowProcessor: process(MessageBatch)    ArkflowProcessor->>WasmProcessor: process(MessageBatch)    WasmProcessor->>WasmModule: Call function with input data    WasmModule-->>WasmProcessor: Return output pointer    WasmProcessor->>WasmModule: Read output bytes    WasmProcessor-->>ArkflowProcessor: Return processed MessageBatch    ArkflowProcessor-->>User: Return processed result
Loading

Possibly related PRs

  • feat(processor): supports python #409: Adds a Python-based processor module and integrates it similarly to this WASM processor, modifying the same processor module and initialization function but implementing a different processor type.

Poem

In the land of Arkflow, a new path appears,
WASM hops in, greeted with cheers!
Now plugins can run, bytecode in tow,
Processing messages with a swift, nimble flow.
With Wasmtime’s might and a bunny’s delight,
The future of plugins is shiny and bright! 🐇✨

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Clippy (1.86.0)
Updating crates.io index

warning: failed to write cache, path: /usr/local/registry/index/index.crates.io-1949cf8c6b5b557f/.cache/ar/ro/arrow-json, error: Permission denied (os error 13)
Downloading crates ...
Downloaded aes-siv v0.7.0
error: failed to create directory/usr/local/registry/cache/index.crates.io-1949cf8c6b5b557f

Caused by:
Permission denied (os error 13)

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat withCodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag@coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag@coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on oursupport page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings togenerate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add@coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add@coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add@coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a.coderabbit.yaml file to the root of your repository.
  • Please see theconfiguration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation:# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit ourDocumentation for detailed information on how to use CodeRabbit.
  • Join ourDiscord Community to get help, request features, and share feedback.
  • Follow us onX/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitaicoderabbitaibot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (1)
crates/arkflow-plugin/src/processor/wasm.rs (1)

55-58:Consider security and performance implications of WASM execution.

Creating a new Store for each execution provides good isolation but may have performance implications for high-throughput scenarios. Additionally, running untrusted WASM code requires careful consideration of security boundaries.

Consider:

  1. Store Pooling: For better performance, implement a pool of pre-initialized Stores that can be reused.
  2. Fuel Metering: Use Wasmtime's fuel consumption feature to prevent infinite loops.
  3. Resource Limits: Configure memory and table limits in the Engine.
  4. Sandboxing: Ensure WASM modules cannot access host filesystem or network unless explicitly allowed.

Example configuration:

letmut config = wasmtime::Config::new();config.consume_fuel(true);config.memory_maximum_size(16*1024*1024);// 16MB maxlet engine =Engine::new(&config)?;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between8c4ce15 andf1e3eb8.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by!**/*.lock
📒 Files selected for processing (3)
  • crates/arkflow-plugin/Cargo.toml (1 hunks)
  • crates/arkflow-plugin/src/processor/mod.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/wasm.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/mod.rs (6)
crates/arkflow-plugin/src/processor/wasm.rs (1)
  • init (27-30)
crates/arkflow-plugin/src/processor/sql.rs (1)
  • init (247-249)
crates/arkflow-plugin/src/processor/protobuf.rs (1)
  • init (230-240)
crates/arkflow-plugin/src/processor/python.rs (1)
  • init (166-168)
crates/arkflow-plugin/src/processor/json.rs (1)
  • init (147-151)
crates/arkflow-plugin/src/processor/vrl.rs (1)
  • init (24-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
🔇 Additional comments (2)
crates/arkflow-plugin/Cargo.toml (1)

89-91:Confirm wasmtime 34.0.1 is up to date and secure

Version 34.0.1 is the latest stable wasmtime release (June 24, 2025) and includes a fix for a host-defined tables/globals panic. No other security vulnerabilities are reported—no changes needed.

crates/arkflow-plugin/src/processor/mod.rs (1)

27-27:LGTM!

The WASM module declaration and initialization follow the established pattern consistently.

Also applies to: 36-36

Comment on lines +223 to +257
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;

#[tokio::test]
async fn test_wasm_processor_missing_config() {
let result = WasmProcessorBuilder.build(
None,
&None,
&Resource {
temporary: Default::default(),
input_names: RefCell::new(Default::default()),
},
);
assert!(result.is_err());
}

#[tokio::test]
async fn test_wasm_processor_invalid_config() {
let config = serde_json::json!({
"invalid": "config"
});

let result = WasmProcessorBuilder.build(
None,
&Some(config),
&Resource {
temporary: Default::default(),
input_names: RefCell::new(Default::default()),
},
);
assert!(result.is_err());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

🛠️ Refactor suggestion

Add comprehensive tests for WASM processing functionality.

The current tests only cover configuration errors. Consider adding tests for actual WASM execution, memory management, and edge cases.

Would you like me to generate comprehensive test cases including:

  • A simple WASM module for testing
  • Tests for successful processing
  • Tests for WASM execution errors
  • Tests for memory allocation/deallocation
  • Tests for timeout handling (if implemented)
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs between lines 223 and 257, theexisting tests only check for configuration errors. To improve coverage, addtests that include a simple WASM module to verify successful processing, teststhat simulate WASM execution errors, tests that validate proper memoryallocation and deallocation, and if applicable, tests that check timeouthandling during WASM execution. This will ensure the WASM processor behavescorrectly under various scenarios beyond just config validation.

Comment on lines 32 to 41
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WasmProcessorConfig {
/// WASM module path or inline WASM bytes (base64 encoded)
module: String,
/// Function name to call in the WASM module
function: String,
/// Whether the module field contains a file path or base64 encoded bytes
#[serde(default = "default_is_file")]
is_file: bool,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

🛠️ Refactor suggestion

Consider adding security and resource limit configurations.

The current configuration lacks important security and resource management options for running untrusted WASM code safely.

 #[derive(Debug, Clone, Serialize, Deserialize)] struct WasmProcessorConfig {     /// WASM module path or inline WASM bytes (base64 encoded)     module: String,     /// Function name to call in the WASM module     function: String,     /// Whether the module field contains a file path or base64 encoded bytes     #[serde(default = "default_is_file")]     is_file: bool,+    /// Maximum memory pages (64KB each) the WASM module can use+    #[serde(default = "default_max_memory_pages")]+    max_memory_pages: u32,+    /// Timeout for WASM execution in milliseconds+    #[serde(default = "default_timeout_ms")]+    timeout_ms: u64, }

Add corresponding default functions:

fndefault_max_memory_pages() ->u32{256// 16MB default limit}fndefault_timeout_ms() ->u64{5000// 5 seconds default timeout}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 32 to 41, theWasmProcessorConfig struct lacks security and resource limit fields for safelyrunning untrusted WASM code. Add new fields max_memory_pages of type u32 andtimeout_ms of type u64 with serde default attributes pointing to new defaultfunctions default_max_memory_pages and default_timeout_ms respectively.Implement these default functions to return 256 for max_memory_pages (16MBlimit) and 5000 for timeout_ms (5 seconds timeout) as specified.

Comment on lines +64 to +71
let func = instance
.get_typed_func::<(i32, i32), i32>(&mut store, &self.config.function)
.map_err(|e| {
Error::Process(format!(
"Failed to get function '{}': {}",
self.config.function, e
))
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

🛠️ Refactor suggestion

Document WASM module interface requirements.

The hardcoded function signature and required exports should be clearly documented for users implementing WASM modules.

Add comprehensive documentation above the struct:

/// WasmProcessor executes WebAssembly modules for message processing.////// # WASM Module Requirements////// The WASM module must export:/// - `memory`: WebAssembly memory for data exchange/// - `alloc(size: i32) -> i32`: Allocate memory and return pointer/// - `dealloc(ptr: i32, size: i32)`: Free allocated memory (optional but recommended)/// - Processing function with signature `(ptr: i32, len: i32) -> i32`////// # Data Format////// Input: MessageBatch serialized as JSON bytes/// Output: Length-prefixed byte array (4-byte LE length + data)////// # Example WASM Module (Rust)/// ```ignore/// #[no_mangle]/// pub extern "C" fn alloc(size: i32) -> i32 {///     // Allocate memory/// }////// #[no_mangle]/// pub extern "C" fn process(ptr: i32, len: i32) -> i32 {///     // Process data and return output pointer/// }/// ```structWasmProcessor{
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 64 to 71, adddetailed documentation above the WasmProcessor struct to clearly specify theWASM module interface requirements. Document the expected exports such asmemory, alloc, dealloc, and the processing function with the signature (ptr:i32, len: i32) -> i32. Also include the data format for input and output, andprovide an example snippet illustrating the required functions in Rust. Thiswill help users understand how to implement compatible WASM modules.

Comment on lines +149 to +154
fn message_batch_to_bytes(&self, msg_batch: &MessageBatch) -> Result<Vec<u8>, Error> {
// Convert MessageBatch to JSON for WASM processing
// This is a simplified approach - in practice you might want to use a more efficient format
let json_data = msg_batch.to_binary("value")?;
Ok(json_data.join(&b'\n'))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

🛠️ Refactor suggestion

Improve data serialization for production use.

The current implementation uses a simplified approach that may not preserve all MessageBatch properties effectively.

Consider using a more robust serialization format:

     fn message_batch_to_bytes(&self, msg_batch: &MessageBatch) -> Result<Vec<u8>, Error> {-        // Convert MessageBatch to JSON for WASM processing-        // This is a simplified approach - in practice you might want to use a more efficient format-        let json_data = msg_batch.to_binary("value")?;-        Ok(json_data.join(&b'\n'))+        // Serialize MessageBatch to JSON, preserving schema and metadata+        let json_value = serde_json::to_value(msg_batch)+            .map_err(|e| Error::Process(format!("Failed to serialize MessageBatch: {}", e)))?;+        serde_json::to_vec(&json_value)+            .map_err(|e| Error::Process(format!("Failed to convert to bytes: {}", e)))     }
📝 Committable suggestion

‼️IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn message_batch_to_bytes(&self,msg_batch:&MessageBatch) ->Result<Vec<u8>,Error>{
// Convert MessageBatch to JSON for WASM processing
// This is a simplified approach - in practice you might want to use a more efficient format
let json_data = msg_batch.to_binary("value")?;
Ok(json_data.join(&b'\n'))
}
fn message_batch_to_bytes(&self,msg_batch:&MessageBatch) ->Result<Vec<u8>,Error>{
// Serialize MessageBatch to JSON, preserving schema and metadata
let json_value = serde_json::to_value(msg_batch)
.map_err(|e|Error::Process(format!("Failed to serialize MessageBatch: {}", e)))?;
serde_json::to_vec(&json_value)
.map_err(|e|Error::Process(format!("Failed to convert to bytes: {}", e)))
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 149 to 154, thecurrent method serializes MessageBatch to JSON and joins with newlinecharacters, which may not preserve all properties or be efficient forproduction. Replace this with a more robust serialization format such asMessagePack, Protobuf, or CBOR that fully captures the MessageBatch structureand is optimized for performance. Update the method to serialize using thechosen format and return the resulting byte vector accordingly.

Comment on lines +81 to +91
// Allocate function to get memory for input
let alloc_func = instance
.get_typed_func::<i32, i32>(&mut store, "alloc")
.map_err(|e| {
Error::Process(format!("WASM module must export 'alloc' function: {}", e))
})?;

// Allocate memory for input data
let input_ptr = alloc_func
.call(&mut store, input_data.len() as i32)
.map_err(|e| Error::Process(format!("Failed to allocate memory: {}", e)))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

⚠️ Potential issue

Critical: Memory leak - allocated memory is never freed.

The code allocates memory in the WASM module but never deallocates it, leading to memory leaks with repeated invocations.

After processing, you should call a correspondingdealloc orfree function:

         // Write input data to WASM memory         memory             .write(&mut store, input_ptr as usize, &input_data)             .map_err(|e| Error::Process(format!("Failed to write input data: {}", e)))?;          // Call the processing function         let output_ptr = func             .call(&mut store, (input_ptr, input_data.len() as i32))             .map_err(|e| Error::Process(format!("WASM function call failed: {}", e)))?;          // Read output data from WASM memory         let output_data = self.read_output_from_memory(&mut store, &memory, output_ptr)?;++        // Free allocated input memory+        if let Ok(dealloc_func) = instance.get_typed_func::<(i32, i32), ()>(&mut store, "dealloc") {+            dealloc_func+                .call(&mut store, (input_ptr, input_data.len() as i32))+                .map_err(|e| Error::Process(format!("Failed to deallocate memory: {}", e)))?;+        }
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 81 to 91, the codeallocates memory in the WASM module using the 'alloc' function but does not freeit afterward, causing a memory leak. To fix this, retrieve the corresponding'dealloc' or 'free' function from the WASM instance, then after processing theinput data, call this function with the pointer and length of the allocatedmemory to properly release it.

Comment on lines +179 to +197
fn bytes_to_message_batch(
&self,
data: Vec<u8>,
original: &MessageBatch,
) -> Result<MessageBatch, Error> {
// Convert processed bytes back to MessageBatch
// This assumes the WASM function returns JSON data
if data.is_empty() {
return Ok(MessageBatch::new_binary(vec![]).unwrap());
}

// Try to parse as JSON and create new MessageBatch
let json_str = String::from_utf8(data)
.map_err(|e| Error::Process(format!("Invalid UTF-8 in WASM output: {}", e)))?;

// For simplicity, return as binary data
// In practice, you might want to parse JSON and convert back to Arrow format
Ok(original.new_binary_with_origin(vec![json_str.into_bytes()])?)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

🛠️ Refactor suggestion

Complete the data deserialization implementation.

The current implementation doesn't properly deserialize the processed data back to MessageBatch format.

Implement proper deserialization:

     fn bytes_to_message_batch(         &self,         data: Vec<u8>,         original: &MessageBatch,     ) -> Result<MessageBatch, Error> {-        // Convert processed bytes back to MessageBatch-        // This assumes the WASM function returns JSON data-        if data.is_empty() {-            return Ok(MessageBatch::new_binary(vec![]).unwrap());-        }--        // Try to parse as JSON and create new MessageBatch-        let json_str = String::from_utf8(data)-            .map_err(|e| Error::Process(format!("Invalid UTF-8 in WASM output: {}", e)))?;--        // For simplicity, return as binary data-        // In practice, you might want to parse JSON and convert back to Arrow format-        Ok(original.new_binary_with_origin(vec![json_str.into_bytes()])?)+        if data.is_empty() {+            return Ok(MessageBatch::new_binary(vec![]).unwrap());+        }++        // Deserialize JSON data back to MessageBatch+        let output_batch: MessageBatch = serde_json::from_slice(&data)+            .map_err(|e| Error::Process(format!("Failed to deserialize WASM output: {}", e)))?;++        Ok(output_batch)     }
📝 Committable suggestion

‼️IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn bytes_to_message_batch(
&self,
data:Vec<u8>,
original:&MessageBatch,
) ->Result<MessageBatch,Error>{
// Convert processed bytes back to MessageBatch
// This assumes the WASM function returns JSON data
if data.is_empty(){
returnOk(MessageBatch::new_binary(vec![]).unwrap());
}
// Try to parse as JSON and create new MessageBatch
let json_str =String::from_utf8(data)
.map_err(|e|Error::Process(format!("Invalid UTF-8 in WASM output: {}", e)))?;
// For simplicity, return as binary data
// In practice, you might want to parse JSON and convert back to Arrow format
Ok(original.new_binary_with_origin(vec![json_str.into_bytes()])?)
}
fn bytes_to_message_batch(
&self,
data:Vec<u8>,
original:&MessageBatch,
) ->Result<MessageBatch,Error>{
if data.is_empty(){
returnOk(MessageBatch::new_binary(vec![]).unwrap());
}
// Deserialize JSON data back to MessageBatch
let output_batch:MessageBatch = serde_json::from_slice(&data)
.map_err(|e|Error::Process(format!("Failed to deserialize WASM output: {}", e)))?;
Ok(output_batch)
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs between lines 179 and 197, thebytes_to_message_batch function currently does not deserialize the processeddata correctly back into a MessageBatch. To fix this, replace the placeholderJSON string handling with actual deserialization logic that converts the inputbytes into the appropriate MessageBatch structure, likely by parsing the JSON orbinary format expected and reconstructing the MessageBatch accordingly, ensuringthe returned MessageBatch accurately represents the processed data.

Copy link
Contributor

@coderabbitaicoderabbitaibot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
crates/arkflow-plugin/src/processor/wasm.rs (5)

35-39:Document WASM module interface requirements.


69-79:Critical: Memory leak - allocated memory is never freed.


130-135:Improve data serialization for production use.


160-178:Complete the data deserialization implementation.


209-243:Add comprehensive tests for WASM processing functionality.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and betweenf1e3eb8 andd02c375.

📒 Files selected for processing (1)
  • crates/arkflow-plugin/src/processor/wasm.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/wasm.rs (3)
crates/arkflow-core/src/processor/mod.rs (1)
  • register_processor_builder (75-88)
crates/arkflow-core/src/lib.rs (2)
  • len (203-205)
  • new_binary (95-97)
crates/arkflow-plugin/src/processor/mod.rs (1)
  • init (29-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
🔇 Additional comments (1)
crates/arkflow-plugin/src/processor/wasm.rs (1)

181-202:LGTM - Well-implemented builder pattern.

The builder correctly validates configuration presence, deserializes the config, and handles errors appropriately.

Comment on lines +111 to +117
let module_data = // Load from file
std::fs::read(&config.module).map_err(|e| {
Error::Config(format!(
"Failed to read WASM file '{}': {}",
config.module, e
))
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

⚠️ Potential issue

Implement base64 decoding support or update documentation.

The constructor only handles file reading, but the configuration documentation suggests support for base64-encoded WASM bytes. Either implement the base64 decoding functionality or update the documentation to reflect current capabilities.

To support base64 decoding, update the constructor:

         // Load WASM module-        let module_data =   // Load from file-            std::fs::read(&config.module).map_err(|e| {-                Error::Config(format!(-                    "Failed to read WASM file '{}': {}",-                    config.module, e-                ))-            })?;+        let module_data = if config.is_file {+            // Load from file+            std::fs::read(&config.module).map_err(|e| {+                Error::Config(format!(+                    "Failed to read WASM file '{}': {}",+                    config.module, e+                ))+            })?+        } else {+            // Decode from base64+            base64::engine::general_purpose::STANDARD+                .decode(&config.module)+                .map_err(|e| {+                    Error::Config(format!(+                        "Failed to decode base64 WASM module: {}",+                        e+                    ))+                })?+        };
📝 Committable suggestion

‼️IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let module_data =// Load from file
std::fs::read(&config.module).map_err(|e|{
Error::Config(format!(
"Failed to read WASM file '{}': {}",
config.module, e
))
})?;
// Load WASM module
let module_data =if config.is_file{
// Load from file
std::fs::read(&config.module).map_err(|e|{
Error::Config(format!(
"Failed to read WASM file '{}': {}",
config.module, e
))
})?
}else{
// Decode from base64
base64::engine::general_purpose::STANDARD
.decode(&config.module)
.map_err(|e|{
Error::Config(format!(
"Failed to decode base64 WASM module: {}",
e
))
})?
};
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 111 to 117, the codeonly reads the WASM module from a file, but the documentation suggests it shouldalso support base64-encoded WASM bytes. To fix this, modify the constructor todetect if the input is base64-encoded and decode it accordingly before use.Alternatively, if base64 support is not intended, update the documentation toremove or clarify the base64 support claim.

Comment on lines +27 to +33
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WasmProcessorConfig {
/// WASM module path or inline WASM bytes (base64 encoded)
module: String,
/// Function name to call in the WASM module
function: String,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

⚠️ Potential issue

Fix configuration inconsistency between documentation and implementation.

The documentation mentions "inline WASM bytes (base64 encoded)" but the struct lacks theis_file field to distinguish between file paths and base64-encoded content. The constructor only handles file reading.

Either add the missingis_file field or update the documentation to match the current implementation:

 #[derive(Debug, Clone, Serialize, Deserialize)] struct WasmProcessorConfig {     /// WASM module path or inline WASM bytes (base64 encoded)     module: String,     /// Function name to call in the WASM module     function: String,+    /// Whether the module field contains a file path or base64 encoded bytes+    #[serde(default = "default_is_file")]+    is_file: bool, }++fn default_is_file() -> bool {+    true+}
📝 Committable suggestion

‼️IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[derive(Debug,Clone,Serialize,Deserialize)]
structWasmProcessorConfig{
/// WASM module path or inline WASM bytes (base64 encoded)
module:String,
/// Function name to call in the WASM module
function:String,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
structWasmProcessorConfig{
/// WASM module path or inline WASM bytes (base64 encoded)
module:String,
/// Function name to call in the WASM module
function:String,
/// Whether the module field contains a file path or base64 encoded bytes
#[serde(default ="default_is_file")]
is_file:bool,
}
fn default_is_file() ->bool{
true
}
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/processor/wasm.rs around lines 27 to 33, theWasmProcessorConfig struct documentation mentions supporting both file paths andinline base64-encoded WASM bytes, but the struct lacks an is_file field todistinguish these cases and the constructor only handles file reading. To fixthis, either add a boolean is_file field to the struct and update theconstructor to handle both file reading and base64 decoding based on this flag,or revise the documentation comment to only mention file paths to align with thecurrent implementation.

@chenquanchenquan self-assigned thisJul 7, 2025
@chenquanchenquan marked this pull request as draftJuly 7, 2025 13:47
@chenquanchenquan added the holdPause, for some reason labelJul 11, 2025
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@coderabbitaicoderabbitai[bot]coderabbitai[bot] left review comments

Assignees

@chenquanchenquan

Labels

holdPause, for some reason

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

2 participants

@chenquan

[8]ページ先頭

©2009-2025 Movatter.jp