Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

RowStream should inherit lifetime of connection or statement? #1279

Open
Labels
@stevenroose

Description

@stevenroose

So we were having this bug for quite a while that took a long time to figure out.

We were usingquery_raw and usingStreamExt to map the stream into a parsed stream. Stuff like this:

/// Fetch all vtxos that have been forfeited.pubasyncfnfetch_all_forfeited_vtxos(&self,) -> anyhow::Result<implStream<Item = anyhow::Result<Vtxo>> +'_>{let conn =self.pool.get().await?;let stmt = conn.prepare("SELECT vtxo FROM vtxo WHERE forfeit_state IS NOT NULL AND board_swept_at IS NULL;").await?;Ok(conn.query_raw(&stmt,NOARG).await?.err_into().map_ok(|row|Vtxo::deserialize(row.get("vtxo")).expect("corrupt db: vtxo")))}

But somehow we were having weird issues on other bb8 pool connections.

I think I boiled it down to the root cause being that the connection is dropped when the function returns and somehow then theRowStream gets hung up on.

It is solved by creating a type that wraps the connection together with theRowStream and returning that so that the connection is not dropped until the stream finishes.

I think this is kinda a bug on tokio_postgres's part. I see that internally the API is kept lifetime-free usingArc's, but I think that might be causing this bug. To be correct, bothStatement andRowStream should inherit the lifetime of the connection, to make sure that returning the stream errors out and you are forced to bundle the stream with the connection.


This is the solution I'm using now:

/// A wrapper around [RowStream] that bundles the connection along with it#[pin_project::pin_project]pub(crate)structOwnedRowStream<'a,M: bb8::ManageConnection>{/// We carry this to keep the connection alive as long as the stream_conn: bb8::PooledConnection<'a,M>,#[pin]inner:RowStream,}impl<'a,M: bb8::ManageConnection>OwnedRowStream<'a,M>{fnnew(conn: bb8::PooledConnection<'a,M>,row_stream:RowStream,) ->OwnedRowStream<'a,M>{OwnedRowStream{_conn: conn,inner: row_stream,}}}impl<'a,M>StreamforOwnedRowStream<'a,M>whereM:ManageConnection,<MasManageConnection>::Connection:Unpin,{typeItem = <RowStreamasStream>::Item;fnpoll_next(self:Pin<&mutSelf>,cx:&mut task::Context<'_>) -> task::Poll<Option<Self::Item>>{self.project().inner.as_mut().poll_next(cx)}}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions


      [8]ページ先頭

      ©2009-2025 Movatter.jp