- Notifications
You must be signed in to change notification settings - Fork516
Closed
Description
CurrentlyTransaction
can be left open in caseFuture
created byClient::transaction
dropped before polling to completion.
pubasyncfntransaction(&mutself) ->Result<Transaction<'_>,Error>{self.batch_execute("BEGIN").await?;// suspension pointOk(Transaction::new(self))}
batch_execute()
under the hood synchronously sendsRequestMessages
to theConnection
and then asynchronously pollsResponses
. WhilePending
onResponses
,Future
can be dropped beforeTransaction::new()
is created. ThenTransaction::drop()
won't be called andTransaction
will be left open.
Reproduction
pin_project!{/// Polls `F` at most `polls_left` times returning `Some(F::Output)` if/// [`Future`] returned [`Poll::Ready`] or [`None`] otherwise.structCancellable<F>{ #[pin] fut:F, polls_left:usize,}}impl<F:Future>FutureforCancellable<F>{typeOutput =Option<F::Output>;fnpoll(self:Pin<&mutSelf>,ctx:&mutContext<'_>) ->Poll<Self::Output>{let this =self.project();match this.fut.poll(ctx){Poll::Ready(r) =>Poll::Ready(Some(r)),Poll::Pending =>{*this.polls_left = this.polls_left.saturating_sub(1);if*this.polls_left ==0{Poll::Ready(None)}else{Poll::Pending}}}}}asyncfncurrent_transaction_id(client:&Client) ->i64{ client.query("SELECT txid_current()",&[]).await.unwrap().pop().unwrap().get::<_,i64>("txid_current")}asyncfnin_transaction(client:&Client) ->bool{current_transaction_id(client).await ==current_transaction_id(client).await}#[tokio::test]asyncfntransaction_future_cancellation(){letmut client =connect("user=postgres").await;for iin0..{let done ={let txn = client.transaction();let fut =Cancellable{fut: txn,polls_left: i,}; fut.await.map(|res| res.expect("transaction failed")).is_some()};assert!(!in_transaction(&client).await);if done{break;}}}
Metadata
Metadata
Assignees
Labels
No labels