Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf5e018f

Browse files
committed
add copy_both_simple method
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent8423d6d commitf5e018f

File tree

7 files changed

+534
-2
lines changed

7 files changed

+534
-2
lines changed

‎postgres-protocol/src/message/backend.rs‎

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
2222
pubconstERROR_RESPONSE_TAG:u8 =b'E';
2323
pubconstCOPY_IN_RESPONSE_TAG:u8 =b'G';
2424
pubconstCOPY_OUT_RESPONSE_TAG:u8 =b'H';
25+
pubconstCOPY_BOTH_RESPONSE_TAG:u8 =b'W';
2526
pubconstEMPTY_QUERY_RESPONSE_TAG:u8 =b'I';
2627
pubconstBACKEND_KEY_DATA_TAG:u8 =b'K';
2728
pubconstNO_DATA_TAG:u8 =b'n';
@@ -93,6 +94,7 @@ pub enum Message {
9394
CopyDone,
9495
CopyInResponse(CopyInResponseBody),
9596
CopyOutResponse(CopyOutResponseBody),
97+
CopyBothResponse(CopyBothResponseBody),
9698
DataRow(DataRowBody),
9799
EmptyQueryResponse,
98100
ErrorResponse(ErrorResponseBody),
@@ -190,6 +192,16 @@ impl Message {
190192
storage,
191193
})
192194
}
195+
COPY_BOTH_RESPONSE_TAG =>{
196+
let format = buf.read_u8()?;
197+
let len = buf.read_u16::<BigEndian>()?;
198+
let storage = buf.read_all();
199+
Message::CopyBothResponse(CopyBothResponseBody{
200+
format,
201+
len,
202+
storage,
203+
})
204+
}
193205
EMPTY_QUERY_RESPONSE_TAG =>Message::EmptyQueryResponse,
194206
BACKEND_KEY_DATA_TAG =>{
195207
let process_id = buf.read_i32::<BigEndian>()?;
@@ -524,6 +536,27 @@ impl CopyOutResponseBody {
524536
}
525537
}
526538

539+
pubstructCopyBothResponseBody{
540+
format:u8,
541+
len:u16,
542+
storage:Bytes,
543+
}
544+
545+
implCopyBothResponseBody{
546+
#[inline]
547+
pubfnformat(&self) ->u8{
548+
self.format
549+
}
550+
551+
#[inline]
552+
pubfncolumn_formats(&self) ->ColumnFormats<'_>{
553+
ColumnFormats{
554+
remaining:self.len,
555+
buf:&self.storage,
556+
}
557+
}
558+
}
559+
527560
pubstructDataRowBody{
528561
storage:Bytes,
529562
len:u16,

‎tokio-postgres/src/client.rs‎

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
usecrate::codec::BackendMessages;
22
usecrate::config::{Host,SslMode};
33
usecrate::connection::{Request,RequestMessages};
4+
usecrate::copy_both::CopyBothDuplex;
45
usecrate::copy_out::CopyOutStream;
56
usecrate::query::RowStream;
67
usecrate::simple_query::SimpleQueryStream;
@@ -11,8 +12,9 @@ use crate::types::{Oid, ToSql, Type};
1112
#[cfg(feature ="runtime")]
1213
usecrate::Socket;
1314
usecrate::{
14-
copy_in, copy_out, prepare, query, simple_query, slice_iter,CancelToken,CopyInSink,Error,
15-
Row,SimpleQueryMessage,Statement,ToStatement,Transaction,TransactionBuilder,
15+
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter,CancelToken,
16+
CopyInSink,Error,Row,SimpleQueryMessage,Statement,ToStatement,Transaction,
17+
TransactionBuilder,
1618
};
1719
use bytes::{Buf,BytesMut};
1820
use fallible_iterator::FallibleIterator;
@@ -461,6 +463,15 @@ impl Client {
461463
copy_out::copy_out(self.inner(), statement).await
462464
}
463465

466+
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
467+
/// data.
468+
pubasyncfncopy_both_simple<T>(&self,query:&str) ->Result<CopyBothDuplex<T>,Error>
469+
where
470+
T:Buf +'static +Send,
471+
{
472+
copy_both::copy_both_simple(self.inner(), query).await
473+
}
474+
464475
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
465476
///
466477
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

‎tokio-postgres/src/connection.rs‎

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
usecrate::codec::{BackendMessage,BackendMessages,FrontendMessage,PostgresCodec};
2+
usecrate::copy_both::CopyBothReceiver;
23
usecrate::copy_in::CopyInReceiver;
34
usecrate::error::DbError;
45
usecrate::maybe_tls_stream::MaybeTlsStream;
@@ -21,6 +22,7 @@ use tokio_util::codec::Framed;
2122
pubenumRequestMessages{
2223
Single(FrontendMessage),
2324
CopyIn(CopyInReceiver),
25+
CopyBoth(CopyBothReceiver),
2426
}
2527

2628
pubstructRequest{
@@ -259,6 +261,24 @@ where
259261
.map_err(Error::io)?;
260262
self.pending_request =Some(RequestMessages::CopyIn(receiver));
261263
}
264+
RequestMessages::CopyBoth(mut receiver) =>{
265+
let message =match receiver.poll_next_unpin(cx){
266+
Poll::Ready(Some(message)) => message,
267+
Poll::Ready(None) =>{
268+
trace!("poll_write: finished copy_both request");
269+
continue;
270+
}
271+
Poll::Pending =>{
272+
trace!("poll_write: waiting on copy_both stream");
273+
self.pending_request =Some(RequestMessages::CopyBoth(receiver));
274+
returnOk(true);
275+
}
276+
};
277+
Pin::new(&mutself.stream)
278+
.start_send(message)
279+
.map_err(Error::io)?;
280+
self.pending_request =Some(RequestMessages::CopyBoth(receiver));
281+
}
262282
}
263283
}
264284
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp