- Notifications
You must be signed in to change notification settings - Fork515
Support CopyBoth queries and replication mode in config#778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:master
Are you sure you want to change the base?
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
95a3c98
bd96437
92899e8
bed87a9
88edd68
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
use crate::codec::{BackendMessages, FrontendMessage}; | ||
use crate::config::SslMode; | ||
use crate::connection::{Request, RequestMessages}; | ||
use crate::copy_both::{CopyBothDuplex, CopyBothReceiver}; | ||
use crate::copy_out::CopyOutStream; | ||
#[cfg(feature = "runtime")] | ||
use crate::keepalive::KeepaliveConfig; | ||
@@ -13,13 +14,14 @@ use crate::types::{Oid, ToSql, Type}; | ||
#[cfg(feature = "runtime")] | ||
use crate::Socket; | ||
use crate::{ | ||
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, | ||
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction, | ||
TransactionBuilder, | ||
}; | ||
use bytes::{Buf, BytesMut}; | ||
use fallible_iterator::FallibleIterator; | ||
use futures_channel::mpsc; | ||
use futures_util::{future, pin_mut, ready,Stream,StreamExt, TryStreamExt}; | ||
use parking_lot::Mutex; | ||
use postgres_protocol::message::backend::Message; | ||
use postgres_types::BorrowToSql; | ||
@@ -29,6 +31,7 @@ use std::fmt; | ||
use std::net::IpAddr; | ||
#[cfg(feature = "runtime")] | ||
use std::path::PathBuf; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
#[cfg(feature = "runtime")] | ||
@@ -40,6 +43,11 @@ pub struct Responses { | ||
cur: BackendMessages, | ||
} | ||
pub struct CopyBothHandles { | ||
pub(crate) stream_receiver: mpsc::Receiver<Result<Message, Error>>, | ||
pub(crate) sink_sender: mpsc::Sender<FrontendMessage>, | ||
} | ||
impl Responses { | ||
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> { | ||
loop { | ||
@@ -61,6 +69,17 @@ impl Responses { | ||
} | ||
} | ||
impl Stream for Responses { | ||
type Item = Result<Message, Error>; | ||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
match ready!((*self).poll_next(cx)) { | ||
Err(err) if err.is_closed() => Poll::Ready(None), | ||
msg => Poll::Ready(Some(msg)), | ||
} | ||
} | ||
} | ||
/// A cache of type info and prepared statements for fetching type info | ||
/// (corresponding to the queries in the [prepare](prepare) module). | ||
#[derive(Default)] | ||
@@ -103,6 +122,32 @@ impl InnerClient { | ||
}) | ||
} | ||
pub fn start_copy_both(&self) -> Result<CopyBothHandles, Error> { | ||
let (sender, receiver) = mpsc::channel(16); | ||
let (stream_sender, stream_receiver) = mpsc::channel(16); | ||
let (sink_sender, sink_receiver) = mpsc::channel(16); | ||
let responses = Responses { | ||
receiver, | ||
cur: BackendMessages::empty(), | ||
}; | ||
let messages = RequestMessages::CopyBoth(CopyBothReceiver::new( | ||
responses, | ||
sink_receiver, | ||
stream_sender, | ||
)); | ||
let request = Request { messages, sender }; | ||
self.sender | ||
.unbounded_send(request) | ||
.map_err(|_| Error::closed())?; | ||
Ok(CopyBothHandles { | ||
stream_receiver, | ||
sink_sender, | ||
}) | ||
} | ||
pub fn typeinfo(&self) -> Option<Statement> { | ||
self.cached_typeinfo.lock().typeinfo.clone() | ||
} | ||
@@ -493,6 +538,15 @@ impl Client { | ||
copy_out::copy_out(self.inner(), statement).await | ||
} | ||
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy | ||
/// data. | ||
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. After the replication stream, if the timeline is historical, Postgres will send a tuple as a response. So we actually need a function that returns something like It's actually very specific to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. That's a good point, I'll take a look on how we can expose this to users, ideally in a generic way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. In case you missed my other comment, there's a similar issue for | ||
where | ||
T: Buf + 'static + Send, | ||
{ | ||
copy_both::copy_both_simple(self.inner(), query).await | ||
} | ||
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows. | ||
/// | ||
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that | ||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.