- Notifications
You must be signed in to change notification settings - Fork39
feat: add Amqp09(RabbitMQ)#460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Uh oh!
There was an error while loading.Please reload this page.
Conversation
coderabbitaibot commentedJun 11, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
WalkthroughA RabbitMQ (AMQP 0.9) input and output integration was added to the Arkflow plugin. This includes new modules implementing configurable, asynchronous RabbitMQ consumers and publishers, registration logic, and dependency updates. The initialization routines were updated to register these new components, and the Changes
Sequence Diagram(s)sequenceDiagram participant User participant ArkflowInput participant Amqp09Input participant RabbitMQ User->>ArkflowInput: Initialize inputs ArkflowInput->>Amqp09Input: Create and connect Amqp09Input->>RabbitMQ: Connect, declare queue/exchange, bind, consume RabbitMQ-->>Amqp09Input: Deliver message Amqp09Input-->>ArkflowInput: Provide message batch with ack handler User->>ArkflowInput: Acknowledge message ArkflowInput->>Amqp09Input: Call ack handler Amqp09Input->>RabbitMQ: Send ack (if manual ack)sequenceDiagram participant User participant ArkflowOutput participant Amqp09Output participant RabbitMQ User->>ArkflowOutput: Initialize outputs ArkflowOutput->>Amqp09Output: Create and connect Amqp09Output->>RabbitMQ: Connect, declare exchange (optional) User->>ArkflowOutput: Write message batch ArkflowOutput->>Amqp09Output: Write messages Amqp09Output->>RabbitMQ: Publish messages (with routing key) RabbitMQ-->>Amqp09Output: Confirm publish (if confirms enabled)Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat withCodeRabbit:
SupportNeed help? Create a ticket on oursupport page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 2
🧹 Nitpick comments (1)
crates/arkflow-plugin/src/input/amqp09.rs (1)
86-86:Consider making the internal channel size configurable.The flume channel is hardcoded to 1000 messages, which might not be suitable for all use cases. High-throughput scenarios might benefit from a larger buffer.
Add a configuration option:
pub struct Amqp09InputConfig { /// Auto-acknowledge messages pub auto_ack: Option<bool>, /// Prefetch count pub prefetch_count: Option<u16>,+ /// Internal channel buffer size (default: 1000)+ pub channel_buffer_size: Option<usize>, }Then use it:
-let (sender, receiver) = flume::bounded::<Amqp09Msg>(1000);+let buffer_size = config.channel_buffer_size.unwrap_or(1000);+let (sender, receiver) = flume::bounded::<Amqp09Msg>(buffer_size);
🛑 Comments failed to post (2)
crates/arkflow-plugin/src/output/amqp09.rs (1)
50-62:
⚠️ Potential issueImplement or remove unused configuration fields.
The following config fields are defined but never used in the implementation:
persistent(line 51)priority(line 55)expiration(line 57)content_type(line 59)content_encoding(line 61)These fields should be applied when constructing
BasicPropertiesin thewritemethod.Apply this diff to use the configured properties:
- BasicProperties::default(),+ {+ let mut props = BasicProperties::default();+ if let Some(persistent) = self.config.persistent {+ props = props.with_delivery_mode(if persistent { 2 } else { 1 });+ }+ if let Some(priority) = self.config.priority {+ props = props.with_priority(priority);+ }+ if let Some(ref expiration) = self.config.expiration {+ props = props.with_expiration(expiration.clone());+ }+ if let Some(ref content_type) = self.config.content_type {+ props = props.with_content_type(content_type.clone().into());+ }+ if let Some(ref content_encoding) = self.config.content_encoding {+ props = props.with_content_encoding(content_encoding.clone().into());+ }+ props+ },Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/amqp09.rs around lines 50 to 62, the fieldspersistent, priority, expiration, content_type, and content_encoding are definedbut not used when constructing BasicProperties in the write method. To fix this,update the write method to apply these configuration fields to theBasicProperties builder, setting each property accordingly if the option isSome, so that the message properties reflect the configured values.crates/arkflow-plugin/src/input/amqp09.rs (1)
216-256:🛠️ Refactor suggestion
Add error recovery for the consumer task.
The spawned task could panic and silently stop consuming messages. Consider adding panic recovery or at least logging when the task exits unexpectedly.
Store the task handle and monitor it:
+use tokio::task::JoinHandle; pub struct Amqp09Input { // ... existing fields ...+ consumer_task: Arc<Mutex<Option<JoinHandle<()>>>>, }Then in connect:
-tokio::spawn(async move {+let handle = tokio::spawn(async move { // existing task code });+{+ let mut task_guard = self.consumer_task.lock().await;+ *task_guard = Some(handle);+}This allows you to check task status in read() or implement recovery logic.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.// Spawn the consumer loop and keep its JoinHandle for monitoring let handle = tokio::spawn(async move { loop { tokio::select! { _ = cancellation_token.cancelled() => { break; } result = async { let mut consumer_guard = consumer_arc.lock().await; if let Some(consumer) = &mut *consumer_guard { consumer.next().await } else { None } } => { match result { Some(delivery_result) => { match delivery_result { Ok(delivery) => { if let Err(_) = sender_clone.send_async(Amqp09Msg::Delivery(delivery)).await { break; } } Err(e) => { error!("RabbitMQ delivery error: {}", e); if let Err(_) = sender_clone.send_async(Amqp09Msg::Err(Error::Disconnection)).await { break; } } } } None => { // Consumer ended if let Err(_) = sender_clone.send_async(Amqp09Msg::Err(Error::EOF)).await { break; } } } } } } }); // Store the handle so we can .await it or check for panics later { let mut task_guard = self.consumer_task.lock().await; *task_guard = Some(handle); }🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/input/amqp09.rs around lines 216 to 256, thespawned consumer task lacks error recovery and may silently stop on panic.Modify the code to store the JoinHandle returned by tokio::spawn so you canmonitor the task status. Add panic recovery by wrapping the async block with acatch_unwind or use a task supervision pattern to log unexpected exits. Then, inthe connect function, keep the JoinHandle and check its status in read() orimplement logic to restart the task if it has stopped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
crates/arkflow-plugin/src/output/amqp09.rs (2)
57-57:Consider using a numeric type for the expiration field.The
expirationfield is currently aStringrepresenting milliseconds. Consider usingOption<u64>orOption<std::time::Duration>for better type safety and to avoid runtime parsing errors.- /// Message expiration (in milliseconds)- expiration: Option<String>,+ /// Message expiration+ expiration: Option<std::time::Duration>,
112-118:Add validation for unrecognized exchange types.The code silently defaults to
Directfor unrecognized exchange types, which could hide configuration errors.let exchange_type = match self.config.exchange_type.as_deref() { Some("direct") => ExchangeKind::Direct, Some("fanout") => ExchangeKind::Fanout, Some("topic") => ExchangeKind::Topic, Some("headers") => ExchangeKind::Headers,+ Some(unknown) => {+ return Err(Error::Config(format!("Unknown exchange type: {}", unknown)));+ }- _ => ExchangeKind::Direct,+ None => ExchangeKind::Direct, };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
crates/arkflow-plugin/Cargo.toml(1 hunks)crates/arkflow-plugin/src/input/amqp09.rs(1 hunks)crates/arkflow-plugin/src/input/mod.rs(2 hunks)crates/arkflow-plugin/src/output/amqp09.rs(1 hunks)crates/arkflow-plugin/src/output/mod.rs(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- crates/arkflow-plugin/src/input/amqp09.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- crates/arkflow-plugin/Cargo.toml
- crates/arkflow-plugin/src/output/mod.rs
- crates/arkflow-plugin/src/input/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (3)
crates/arkflow-plugin/src/output/amqp09.rs (3)
74-84:LGTM!The constructor correctly initializes all fields with appropriate default values.
214-232:LGTM!The builder implementation correctly handles configuration parsing and error cases.
192-211:LGTM!The close method properly handles resource cleanup in the correct order (channel before connection) and appropriately resets the connection state.
| channel | ||
| .basic_publish( | ||
| &self.config.exchange, | ||
| routing_key, | ||
| BasicPublishOptions::default(), | ||
| &payload, | ||
| BasicProperties::default(), | ||
| ) | ||
| .await | ||
| .map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))? | ||
| .await | ||
| .map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Critical: Message properties from configuration are not applied.
The code usesBasicProperties::default() instead of applying the configured message properties (persistent, priority, expiration, content_type, content_encoding).
+ let mut props = BasicProperties::default();+ if let Some(persistent) = self.config.persistent {+ props = props.with_delivery_mode(if persistent { 2 } else { 1 });+ }+ if let Some(priority) = self.config.priority {+ props = props.with_priority(priority);+ }+ if let Some(ref expiration) = self.config.expiration {+ props = props.with_expiration(expiration.clone().into());+ }+ if let Some(ref content_type) = self.config.content_type {+ props = props.with_content_type(content_type.clone().into());+ }+ if let Some(ref content_encoding) = self.config.content_encoding {+ props = props.with_content_encoding(content_encoding.clone().into());+ }+ channel .basic_publish( &self.config.exchange, routing_key, BasicPublishOptions::default(), &payload,- BasicProperties::default(),+ props, )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| channel | |
| .basic_publish( | |
| &self.config.exchange, | |
| routing_key, | |
| BasicPublishOptions::default(), | |
| &payload, | |
| BasicProperties::default(), | |
| ) | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to publish message: {}", e)))? | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to confirm message: {}", e)))?; | |
| } | |
| // build up message properties from config | |
| letmut props =BasicProperties::default(); | |
| ifletSome(persistent) =self.config.persistent{ | |
| props = props.with_delivery_mode(if persistent{2}else{1}); | |
| } | |
| ifletSome(priority) =self.config.priority{ | |
| props = props.with_priority(priority); | |
| } | |
| ifletSome(ref expiration) =self.config.expiration{ | |
| props = props.with_expiration(expiration.clone().into()); | |
| } | |
| ifletSome(ref content_type) =self.config.content_type{ | |
| props = props.with_content_type(content_type.clone().into()); | |
| } | |
| ifletSome(ref content_encoding) =self.config.content_encoding{ | |
| props = props.with_content_encoding(content_encoding.clone().into()); | |
| } | |
| channel | |
| .basic_publish( | |
| &self.config.exchange, | |
| routing_key, | |
| BasicPublishOptions::default(), | |
| &payload, | |
| props, | |
| ) | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to publish message: {}", e)))? | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to confirm message: {}", e)))?; |
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/amqp09.rs around lines 175 to 187, themessage properties are set to BasicProperties::default(), ignoring theconfigured properties like persistent, priority, expiration, content_type, andcontent_encoding. Update the code to build BasicProperties using theconfiguration values instead of default(), ensuring all relevant properties fromself.config are applied when publishing the message.🛠️ Refactor suggestion
Handle publisher confirms conditionally.
The code always awaits the confirmation future, but this should only happen when publisher confirms are enabled.
- channel+ let publish_future = channel .basic_publish( &self.config.exchange, routing_key, BasicPublishOptions::default(), &payload, BasicProperties::default(), )- .await- .map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?- .await- .map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;+ .await+ .map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))?;++ if self.config.confirm.unwrap_or(false) {+ publish_future+ .await+ .map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?;+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| channel | |
| .basic_publish( | |
| &self.config.exchange, | |
| routing_key, | |
| BasicPublishOptions::default(), | |
| &payload, | |
| BasicProperties::default(), | |
| ) | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to publish message: {}", e)))? | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to confirm message: {}", e)))?; | |
| } | |
| // Publish the message, always awaiting the publish result… | |
| let publish_future = channel | |
| .basic_publish( | |
| &self.config.exchange, | |
| routing_key, | |
| BasicPublishOptions::default(), | |
| &payload, | |
| BasicProperties::default(), | |
| ) | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to publish message: {}", e)))?; | |
| // …and only await confirmation if confirms are enabled | |
| ifself.config.confirm.unwrap_or(false){ | |
| publish_future | |
| .await | |
| .map_err(|e|Error::Process(format!("Failed to confirm message: {}", e)))?; | |
| } | |
| } |
🤖 Prompt for AI Agents
In crates/arkflow-plugin/src/output/amqp09.rs around lines 175 to 187, the codealways awaits the publisher confirmation future, but this should only be done ifpublisher confirms are enabled in the configuration. Modify the code to check ifpublisher confirms are enabled before awaiting the confirmation future, and skipawaiting it otherwise.
Uh oh!
There was an error while loading.Please reload this page.
Summary by CodeRabbit