Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: add remote-manage#576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
chenquan wants to merge9 commits intomain
base:main
Choose a base branch
Loading
fromfeat/remote-manage
Draft

feat: add remote-manage#576

chenquan wants to merge9 commits intomainfromfeat/remote-manage

Conversation

@chenquan
Copy link
Collaborator

@chenquanchenquan commentedJul 9, 2025
edited by coderabbitaibot
Loading

Summary by CodeRabbit

  • New Features
    • Remote configuration via a new CLI subcommand to fetch and manage stream pipelines from a remote server.
    • Configurable polling interval and optional authentication for remote config; automatic start/stop/restart of streams on remote changes.
  • Improvements
    • Local config is optional when using remote; --validate can check local config without starting the engine.
    • Graceful cancellation and signal-aware shutdown for both remote and local runs.

@coderabbitai
Copy link
Contributor

coderabbitaibot commentedJul 9, 2025
edited
Loading

Walkthrough

Adds a remote configuration subsystem and CLI support for a "remote" subcommand, introduces a RemoteConfigManager module that polls and manages stream lifecycles, updates the engine to accept an external CancellationToken, and adds thereqwest dependency with JSON support.

Changes

Cohort / File(s)Change Summary
Crate manifest
crates/arkflow-core/Cargo.toml
Addedreqwest dependency with the"json" feature, using workspace version.
CLI
crates/arkflow-core/src/cli/mod.rs
Addedremote subcommand and args;Cli gainspub remote_config_manager: Option<RemoteConfigManager>; parse logic initializes manager when used;--config is optional if remote used;run became async and cancellation-aware, creating a CancellationToken and wiring signal handlers to cancel it;init_logging visibility changed topub(crate).
Library root
crates/arkflow-core/src/lib.rs
Declared new public module:pub mod remote_config;.
Remote config module
crates/arkflow-core/src/remote_config.rs
New module providingRemoteConfigManager, request/response types (RemoteConfigResponse,StreamInfo,StreamStatus), polling loop, version-aware stream start/stop/restart, per-stream runtimes with cancellation tokens, Reqwest client usage, and logging/error handling.
Engine
crates/arkflow-core/src/engine/mod.rs
Engine::run signature changed to accepttoken: CancellationToken; removed internal signal handling and creation of its own cancellation token (caller now provides cancellation).

Sequence Diagram(s)

sequenceDiagram    participant User    participant CLI    participant RemoteConfigManager    participant Engine    User->>CLI: Run with "remote" subcommand    CLI->>RemoteConfigManager: new(url, interval, token)    CLI->>RemoteConfigManager: run(token)    loop every poll interval        RemoteConfigManager->>RemoteConfigManager: fetch remote config        alt version changed            RemoteConfigManager->>RemoteConfigManager: start/stop/restart streams        end    end    User->>CLI: SIGINT/SIGTERM    CLI->>RemoteConfigManager: cancel token    RemoteConfigManager->>RemoteConfigManager: graceful shutdown
Loading
sequenceDiagram    participant User    participant CLI    participant Engine    User->>CLI: Run with local config    CLI->>Engine: construct(...)    CLI->>Engine: run(token)    User->>CLI: SIGINT/SIGTERM    CLI->>Engine: cancel token    Engine->>Engine: graceful shutdown
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

I am a rabbit, code in paw,
Polling skies for config law.
Streams awake or take a nap,
Tokens hold the tidy map.
Hopping logs and restful hum—🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from theIntegrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branchfeat/remote-manage

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat withCodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag@coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag@coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on oursupport page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type@coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add@coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add@coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add@coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a.coderabbit.yaml file to the root of your repository.
  • Please see theconfiguration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation:# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit ourStatus Page to check the current availability of CodeRabbit.
  • Visit ourDocumentation for detailed information on how to use CodeRabbit.
  • Join ourDiscord Community to get help, request features, and share feedback.
  • Follow us onX/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitaicoderabbitaibot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (4)
docs/remote-config.md (1)

36-40:Add language specifiers to fenced code blocks.

The static analysis correctly identifies that fenced code blocks should have language specifiers for better syntax highlighting.

Apply these diffs to add language specifiers:

For line 36:

-```+```http GET /pipelines Authorization: Bearer <token>  # 如果提供了token Content-Type: application/json
For line 176:```diff-```+```log 2024-01-15T10:30:00Z INFO Starting remote configuration manager 2024-01-15T10:30:00Z INFO Polling interval: 30 seconds 2024-01-15T10:30:00Z INFO API endpoint: http://api.example.com/pipelines 2024-01-15T10:30:30Z INFO Configuration changed, updating pipelines (version: v1.0.1) 2024-01-15T10:30:30Z INFO Starting new pipeline 'Data Processing Pipeline' 2024-01-15T10:30:30Z INFO Pipeline 'Data Processing Pipeline' started successfully
Also applies to: 176-183</blockquote></details><details><summary>crates/arkflow-core/src/remote_config.rs (3)</summary><blockquote>`210-214`: **Consider graceful task termination instead of abort.**Using `abort()` on task handles forcefully terminates tasks without cleanup. Consider awaiting cancellation instead.Instead of immediately aborting, give tasks a chance to shut down gracefully:```diff // Stop existing stream existing.cancellation_token.cancel(); if let Some(handle) = &existing.handle {-    let _ = handle.abort();+    // Give the task a chance to shut down gracefully+    tokio::select! {+        _ = tokio::time::sleep(Duration::from_secs(5)) => {+            // Timeout - force abort+            handle.abort();+        }+        _ = handle => {+            // Task completed gracefully+        }+    } }

Also applies to: 228-231, 237-241, 257-260


248-248:Fix typo in comment.

-        // Remove stream that are no longer in the configuration+        // Remove streams that are no longer in the configuration

168-186:Add retry logic for transient network failures.

Network requests can fail due to transient issues. Consider adding retry logic with exponential backoff.

Consider using a retry mechanism for better resilience:

use tokio_retry::{Retry, strategy::ExponentialBackoff};asyncfnfetch_remote_config_with_retry(&self) ->Result<RemoteConfigResponse,Box<dyn std::error::Error>>{let retry_strategy =ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)).take(3);Retry::spawn(retry_strategy, ||async{self.fetch_remote_config().await}).await}

This would require adding thetokio-retry crate as a dependency.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and betweend7abf3f and94d6f96.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by!**/*.lock
📒 Files selected for processing (5)
  • crates/arkflow-core/Cargo.toml (1 hunks)
  • crates/arkflow-core/src/cli/mod.rs (3 hunks)
  • crates/arkflow-core/src/lib.rs (1 hunks)
  • crates/arkflow-core/src/remote_config.rs (1 hunks)
  • docs/remote-config.md (1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.17.2)
docs/remote-config.md

36-36: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


176-176: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
  • GitHub Check: build
🔇 Additional comments (1)
crates/arkflow-core/src/lib.rs (1)

38-38:LGTM!

The module declaration follows the existing naming convention and properly exposes the remote configuration functionality.

remote_manager.run().await?;
}else{
// Run with local configuration
let config =self.config.clone().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

🛠️ Refactor suggestion

Avoid unwrap() on potentially None config.

While the logic ensures config is Some when remote_config_manager is None, usingunwrap() here is fragile and could panic if the logic changes.

Apply this diff to handle the None case explicitly:

-            let config = self.config.clone().unwrap();+            let config = self.config.clone().ok_or("Configuration not found")?;
📝 Committable suggestion

‼️IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config =self.config.clone().unwrap();
let config =self.config.clone().ok_or("Configuration not found")?;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs at line 121, avoid using unwrap() onself.config as it may panic if None. Instead, explicitly handle the None case bymatching on self.config or using if let to safely extract the config value,returning an error or handling the absence gracefully to prevent potentialruntime panics.

@chenquanchenquan marked this pull request as draftJuly 9, 2025 16:08
@chenquanchenquan self-assigned thisJul 11, 2025
Copy link
Contributor

@coderabbitaicoderabbitaibot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
crates/arkflow-core/src/cli/mod.rs (1)

131-131:Avoidunwrap() onself.config

This can panic ifparse() returned early under--validate, leavingself.config asNone. Prefer returning a proper error.

Apply this diff:

-            let config = self.config.clone().unwrap();+            let config = self+                .config+                .clone()+                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "Configuration not found"))?;
🧹 Nitpick comments (7)
crates/arkflow-core/src/cli/mod.rs (7)

107-111:Don’t exit the process inside library-style parsing; propagate errors instead

Callingprocess::exit(1) here prevents callers from handling the error and makes testing harder. Return the error and let the top-level binary decide.

-                Err(e) => {-                    println!("Failed to load configuration file: {}", e);-                    process::exit(1);-                }+                Err(e) => {+                    eprintln!("Failed to load configuration file: {}", e);+                    return Err(Box::new(e));+                }

113-121:--validate early return can leaveself.config unset; set it before returning

If callers invokerun() afterparse(), this leads to a panic at theunwrap(). To be safe, setself.config before the early return.

-            // If you just verify the configuration, exit it-            if matches.get_flag("validate") {-                info!("The config is validated.");-                return Ok(());-            }--            self.config = Some(config);+            // If you just verify the configuration, exit it+            if matches.get_flag("validate") {+                self.config = Some(config);+                info!("The config is validated.");+                return Ok(());+            }++            self.config = Some(config);

58-80:Consider making--token optional and allowing ENV override for flexibility

RemoteConfigManager::new takesOption<String>, but the CLI requires--token. Making it optional (and supporting an env var) lets users access unauthenticated endpoints or configure tokens via environment.

-                .arg(-                    Arg::new("token")-                        .long("token")-                        .value_name("TOKEN")-                        .help("Authentication token for remote configuration API")-                        .required( true),-                ))+                .arg(+                    Arg::new("token")+                        .long("token")+                        .value_name("TOKEN")+                        .help("Authentication token for remote configuration API")+                        .env("ARKFLOW_REMOTE_TOKEN")+                ))

91-96:Avoidexpect() for URL; propagate a user-friendly error instead

Even though clap enforcesrequired(true), prefer consistent error handling by returning an error rather than panicking.

-            let remote_url = remote-                .get_one::<String>("url")-                .expect("Remote configuration URL not found");+            let remote_url = remote+                .get_one::<String>("url")+                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Remote configuration URL not found"))?;

20-21:Unix-only signal handling; consider cross-platform support

Importingtokio::signal::unix limits builds to Unix targets. If Windows support is desired, add actrl_c fallback with conditional compilation.

Example changes:

- use tokio::signal::unix::{signal, SignalKind};+ #[cfg(unix)]+ use tokio::signal::unix::{signal, SignalKind};+ #[cfg(not(unix))]+ use tokio::signal;

And inrun():

#[cfg(unix)]// existing SIGINT/SIGTERM handling#[cfg(not(unix))]tokio::spawn({let token = token.clone();asyncmove{if signal::ctrl_c().await.is_ok(){info!("Received Ctrl+C, exiting...");}        token.cancel();}});

156-224:Nit: remove explicit-> () and avoid.pretty() with JSON format

Minor polish:

  • Returning() explicitly is redundant in Rust.
  • When using.json(),.pretty() is ignored; drop it to reduce confusion.
-pub(crate) fn init_logging(config: &EngineConfig) -> () {+pub(crate) fn init_logging(config: &EngineConfig) {@@-                    LogFormat::JSON => {+                    LogFormat::JSON => {                         let subscriber = subscriber_builder                             .with_writer(std::sync::Mutex::new(file))-                            .pretty()                             .json()                             .finish();                         tracing::subscriber::set_global_default(subscriber)                             .expect("You can't set a global default log subscriber");                     }@@     match config.logging.format {         LogFormat::JSON => {-            let subscriber = subscriber_builder.pretty().json().finish();+            let subscriber = subscriber_builder.json().finish();             tracing::subscriber::set_global_default(subscriber)                 .expect("You can't set a global default log subscriber");         }

43-55:Nit: help text wording improvements

Tighten phrasing for clarity and correctness.

-            .about("High-performance Rust stream processing engine, providing powerful data stream processing capabilities, supporting multiple input/output sources and processors")+            .about("High-performance Rust stream processing engine with multiple inputs/outputs and processors")@@-                    .help("Specify the profile path")+                    .help("Path to the configuration file")@@-                    .help("Only the profile is verified, not the engine is started")+                    .help("Validate the configuration file without starting the engine")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between17da896 andc75ad66.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by!**/*.lock
📒 Files selected for processing (1)
  • crates/arkflow-core/src/cli/mod.rs (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-core/src/cli/mod.rs (2)
crates/arkflow-core/src/config.rs (3)
  • default (148-156)
  • default (165-171)
  • from_file (89-106)
crates/arkflow-core/src/remote_config.rs (2)
  • new (93-107)
  • run (110-134)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (1)
crates/arkflow-core/src/cli/mod.rs (1)

86-90:Interval parsing: good defensive handling

Nice use of.and_then(|s| s.parse::<u64>().ok()).unwrap_or(30) to avoid panics and provide a sane default.

Comment on lines +101 to +104
let config_path = matches
.get_one::<String>("config")
.ok_or("Configuration not found")?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

⚠️ Potential issue

Compilation bug: usingok_or("...")? with&str won't compile

&str doesn’t implementstd::error::Error, so? cannot convert it intoBox<dyn std::error::Error>. Use an error type that implementsError (e.g.,std::io::Error) or handle it explicitly.

Apply this diff:

-            let config_path = matches-                .get_one::<String>("config")-                .ok_or("Configuration not found")?;+            let config_path = matches+                .get_one::<String>("config")+                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Configuration not found"))?;
📝 Committable suggestion

‼️IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config_path = matches
.get_one::<String>("config")
.ok_or("Configuration not found")?;
let config_path = matches
.get_one::<String>("config")
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput,"Configuration not found"))?;
🤖 Prompt for AI Agents
crates/arkflow-core/src/cli/mod.rs around lines 101 to 104: the use of.ok_or("Configuration not found")? fails because &str does not implementstd::error::Error and cannot be converted by ?. Replace the call so it producesa concrete error type that implements Error (for example use .ok_or_else(||std::io::Error::new(std::io::ErrorKind::Other, "Configuration not found"))?), orif the project uses anyhow, use .ok_or_else(|| anyhow::anyhow!("Configurationnot found"))?; ensure the resulting error type matches the function return type(or convert/wrap it into Box<dyn std::error::Error> as needed).

Comment on lines +137 to +152
// Set up signal handlers
letmut sigint =signal(SignalKind::interrupt()).expect("Failed to set signal handler");
letmut sigterm =signal(SignalKind::terminate()).expect("Failed to set signal handler");

tokio::spawn(asyncmove{
tokio::select!{
_ = sigint.recv() =>{
info!("Received SIGINT, exiting...");

},
_ = sigterm.recv() =>{
info!("Received SIGTERM, exiting...");
}
}
token.cancel();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

⚠️ Potential issue

Signal handlers are set up after the run completes (unreachable), breaking graceful shutdown

Youawait the engine/remote manager before spawning signal listeners. As a result, the token is never cancelled on SIGINT/SIGTERM and the process won’t shut down gracefully.

Move the signal setup before awaitingrun(...):

 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {-        let token = CancellationToken::new();+        let token = CancellationToken::new();++        // Set up signal handlers BEFORE running the engine/remote manager+        let token_for_signals = token.clone();+        #[cfg(unix)]+        tokio::spawn(async move {+            let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");+            let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");+            tokio::select! {+                _ = sigint.recv() => {+                    info!("Received SIGINT, exiting...");+                },+                _ = sigterm.recv() => {+                    info!("Received SIGTERM, exiting...");+                }+            }+            token_for_signals.cancel();+        });++        #[cfg(not(unix))]+        tokio::spawn(async move {+            if tokio::signal::ctrl_c().await.is_ok() {+                info!("Received Ctrl+C, exiting...");+            }+            token_for_signals.cancel();+        });-        if let Some(remote_manager) = &self.remote_config_manager {+        if let Some(remote_manager) = &self.remote_config_manager {             // Run with remote configuration management             remote_manager.run(token.clone()).await?;         } else {             // Run with local configuration             let config = self.config.clone().unwrap();             init_logging(&config);             let engine = Engine::new(config);             engine.run(token.clone()).await?;         }-        // Set up signal handlers-        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");-        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");--        tokio::spawn(async move {-            tokio::select! {-                _ = sigint.recv() => {-                    info!("Received SIGINT, exiting...");--                },-                _ = sigterm.recv() => {-                    info!("Received SIGTERM, exiting...");-                }-            }-            token.cancel();-        });         Ok(())     }
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs around lines 137 to 152, the signalhandler setup currently runs after awaiting run(...), so SIGINT/SIGTERM can'tcancel the token; move the signal creation and tokio::spawn block to before thecall to run(...) (so the signal streams and spawned task live while runexecutes), ensure any cancellation token used inside the spawned task iscloned/moved into the async block correctly, and then await run(...) — thisguarantees signals are observed and token.cancel() is called on SIGINT/SIGTERMfor graceful shutdown.

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@coderabbitaicoderabbitai[bot]coderabbitai[bot] left review comments

Assignees

@chenquanchenquan

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

2 participants

@chenquan

[8]ページ先頭

©2009-2025 Movatter.jp