- Notifications
You must be signed in to change notification settings - Fork514
Description
So we were having this bug for quite a while that took a long time to figure out.
We were usingquery_raw
and usingStreamExt
to map the stream into a parsed stream. Stuff like this:
/// Fetch all vtxos that have been forfeited.pubasyncfnfetch_all_forfeited_vtxos(&self,) -> anyhow::Result<implStream<Item = anyhow::Result<Vtxo>> +'_>{let conn =self.pool.get().await?;let stmt = conn.prepare("SELECT vtxo FROM vtxo WHERE forfeit_state IS NOT NULL AND board_swept_at IS NULL;").await?;Ok(conn.query_raw(&stmt,NOARG).await?.err_into().map_ok(|row|Vtxo::deserialize(row.get("vtxo")).expect("corrupt db: vtxo")))}
But somehow we were having weird issues on other bb8 pool connections.
I think I boiled it down to the root cause being that the connection is dropped when the function returns and somehow then theRowStream
gets hung up on.
It is solved by creating a type that wraps the connection together with theRowStream
and returning that so that the connection is not dropped until the stream finishes.
I think this is kinda a bug on tokio_postgres's part. I see that internally the API is kept lifetime-free usingArc
's, but I think that might be causing this bug. To be correct, bothStatement
andRowStream
should inherit the lifetime of the connection, to make sure that returning the stream errors out and you are forced to bundle the stream with the connection.
This is the solution I'm using now:
/// A wrapper around [RowStream] that bundles the connection along with it#[pin_project::pin_project]pub(crate)structOwnedRowStream<'a,M: bb8::ManageConnection>{/// We carry this to keep the connection alive as long as the stream_conn: bb8::PooledConnection<'a,M>,#[pin]inner:RowStream,}impl<'a,M: bb8::ManageConnection>OwnedRowStream<'a,M>{fnnew(conn: bb8::PooledConnection<'a,M>,row_stream:RowStream,) ->OwnedRowStream<'a,M>{OwnedRowStream{_conn: conn,inner: row_stream,}}}impl<'a,M>StreamforOwnedRowStream<'a,M>whereM:ManageConnection,<MasManageConnection>::Connection:Unpin,{typeItem = <RowStreamasStream>::Item;fnpoll_next(self:Pin<&mutSelf>,cx:&mut task::Context<'_>) -> task::Poll<Option<Self::Item>>{self.project().inner.as_mut().poll_next(cx)}}