Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Replace parts offutures-util with std APIs#1233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletionspostgres/src/connection.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
use crate::{Error, Notification};
use futures_util::{future, pin_mut,Stream};
use futures_util::Stream;
use std::collections::VecDeque;
use std::future::Future;
use std::future::{self,Future};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::pin::{pin,Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
Expand DownExpand Up@@ -52,7 +52,7 @@ impl Connection {
where
F: Future<Output = Result<T, Error>>,
{
pin_mut!(future);
let mut future = pin!(future);
self.poll_block_on(|cx, _, _| future.as_mut().poll(cx))
}

Expand Down
4 changes: 2 additions & 2 deletionspostgres/src/notifications.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,9 +3,9 @@
use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures_util::{ready,FutureExt};
use futures_util::FutureExt;
use std::pin::Pin;
use std::task::Poll;
use std::task::{ready,Poll};
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};

Expand Down
4 changes: 2 additions & 2 deletionstokio-postgres/src/binary_copy.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4,7 +4,7 @@ use crate::types::{FromSql, IsNull, ToSql, Type, WrongType};
use crate::{slice_iter, CopyInSink, CopyOutStream, Error};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::{ready,SinkExt, Stream};
use futures_util::{SinkExt, Stream};
use pin_project_lite::pin_project;
use postgres_types::BorrowToSql;
use std::convert::TryFrom;
Expand All@@ -13,7 +13,7 @@ use std::io::Cursor;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};

const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0";
const HEADER_LEN: usize = MAGIC.len() + 4 + 4;
Expand Down
25 changes: 13 additions & 12 deletionstokio-postgres/src/client.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,18 +19,20 @@ use crate::{
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{future, pin_mut, ready,StreamExt, TryStreamExt};
use futures_util::{StreamExt, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol::message::backend::Message;
use postgres_types::BorrowToSql;
use std::collections::HashMap;
use std::fmt;
use std::future;
#[cfg(feature = "runtime")]
use std::net::IpAddr;
#[cfg(feature = "runtime")]
use std::path::PathBuf;
use std::pin::pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};
#[cfg(feature = "runtime")]
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
Expand DownExpand Up@@ -300,8 +302,7 @@ impl Client {
where
T: ?Sized + ToStatement,
{
let stream = self.query_raw(statement, slice_iter(params)).await?;
pin_mut!(stream);
let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);

let mut first = None;

Expand DownExpand Up@@ -336,18 +337,18 @@ impl Client {
///
/// ```no_run
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
/// use futures_util::{pin_mut, TryStreamExt};
/// use std::pin::pin;
/// use futures_util::TryStreamExt;
///
/// let params: Vec<String> = vec![
/// "first param".into(),
/// "second param".into(),
/// ];
/// let mut it = client.query_raw(
/// let mut it =pin!(client.query_raw(
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
/// params,
/// ).await?;
/// ).await?);
///
/// pin_mut!(it);
/// while let Some(row) = it.try_next().await? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
Expand DownExpand Up@@ -402,19 +403,19 @@ impl Client {
///
/// ```no_run
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
/// use futures_util::{pin_mut, TryStreamExt};
/// use std::pin::pin;
/// use futures_util::{TryStreamExt};
/// use tokio_postgres::types::Type;
///
/// let params: Vec<(String, Type)> = vec![
/// ("first param".into(), Type::TEXT),
/// ("second param".into(), Type::TEXT),
/// ];
/// let mut it = client.query_typed_raw(
/// let mut it =pin!(client.query_typed_raw(
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
/// params,
/// ).await?;
/// ).await?);
///
/// pin_mut!(it);
/// while let Some(row) = it.try_next().await? {
/// let foo: i32 = row.get("foo");
/// println!("foo: {}", foo);
Expand Down
24 changes: 13 additions & 11 deletionstokio-postgres/src/connect.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4,8 +4,10 @@ use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::tls::MakeTlsConnect;
use crate::{Client, Config, Connection, Error, SimpleQueryMessage, Socket};
use futures_util::{future, pin_mut, Future,FutureExt, Stream};
use futures_util::{FutureExt, Stream};
use rand::seq::SliceRandom;
use std::future::{self, Future};
use std::pin::pin;
use std::task::Poll;
use std::{cmp, io};
use tokio::net;
Expand DownExpand Up@@ -161,18 +163,18 @@ where
let (mut client, mut connection) = connect_raw(socket, tls, has_hostname, config).await?;

if config.target_session_attrs != TargetSessionAttrs::Any {
let rows = client.simple_query_raw("SHOW transaction_read_only");
pin_mut!(rows);
let mut rows = pin!(client.simple_query_raw("SHOW transaction_read_only"));

let rows = future::poll_fn(|cx| {
if connection.poll_unpin(cx)?.is_ready() {
return Poll::Ready(Err(Error::closed()));
}
let mut rows = pin!(
future::poll_fn(|cx| {
if connection.poll_unpin(cx)?.is_ready() {
return Poll::Ready(Err(Error::closed()));
}

rows.as_mut().poll(cx)
})
.await?;
pin_mut!(rows);
rows.as_mut().poll(cx)
})
.await?
);

loop {
let next = future::poll_fn(|cx| {
Expand Down
4 changes: 2 additions & 2 deletionstokio-postgres/src/connect_raw.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7,7 +7,7 @@ use crate::{Client, Connection, Error};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{ready,Sink, SinkExt, Stream, TryStreamExt};
use futures_util::{Sink, SinkExt, Stream, TryStreamExt};
use postgres_protocol::authentication;
use postgres_protocol::authentication::sasl;
use postgres_protocol::authentication::sasl::ScramSha256;
Expand All@@ -17,7 +17,7 @@ use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

Expand Down
4 changes: 2 additions & 2 deletionstokio-postgres/src/connection.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,14 +6,14 @@ use crate::{AsyncMessage, Error, Notification};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{ready,stream::FusedStream, Sink, Stream, StreamExt};
use futures_util::{stream::FusedStream, Sink, Stream, StreamExt};
use log::{info, trace};
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

Expand Down
5 changes: 3 additions & 2 deletionstokio-postgres/src/copy_in.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -5,15 +5,16 @@ use crate::query::extract_row_affected;
use crate::{query, slice_iter, Error, Statement};
use bytes::{Buf, BufMut, BytesMut};
use futures_channel::mpsc;
use futures_util::{future, ready,Sink, SinkExt, Stream, StreamExt};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use postgres_protocol::message::frontend::CopyData;
use std::future;
use std::marker::{PhantomData, PhantomPinned};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};

enum CopyInMessage {
Message(FrontendMessage),
Expand Down
4 changes: 2 additions & 2 deletionstokio-postgres/src/copy_out.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,13 +3,13 @@ use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{query, slice_iter, Error, Statement};
use bytes::Bytes;
use futures_util::{ready,Stream};
use futures_util::Stream;
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};

pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
debug!("executing copy out statement {}", statement.name());
Expand Down
7 changes: 3 additions & 4 deletionstokio-postgres/src/prepare.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7,12 +7,12 @@ use crate::{query, slice_iter};
use crate::{Column, Error, Statement};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{pin_mut,TryStreamExt};
use futures_util::TryStreamExt;
use log::debug;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::future::Future;
use std::pin::Pin;
use std::pin::{pin,Pin};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand DownExpand Up@@ -142,8 +142,7 @@ pub(crate) async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type

let stmt = typeinfo_statement(client).await?;

let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
pin_mut!(rows);
let mut rows = pin!(query::query(client, stmt, slice_iter(&[&oid])).await?);

let row = match rows.try_next().await? {
Some(row) => row,
Expand Down
4 changes: 2 additions & 2 deletionstokio-postgres/src/query.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,7 +6,7 @@ use crate::types::{BorrowToSql, IsNull};
use crate::{Column, Error, Portal, Row, Statement};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{ready,Stream};
use futures_util::Stream;
use log::{debug, log_enabled, Level};
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
Expand All@@ -16,7 +16,7 @@ use std::fmt;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};

struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);

Expand Down
4 changes: 2 additions & 2 deletionstokio-postgres/src/simple_query.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -5,15 +5,15 @@ use crate::query::extract_row_affected;
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{ready,Stream};
use futures_util::Stream;
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready,Context, Poll};

/// Information about a column of a single query row.
#[derive(Debug)]
Expand Down
12 changes: 5 additions & 7 deletionstokio-postgres/tests/test/binary_copy.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
use crate::connect;
use futures_util::{pin_mut, TryStreamExt};
use futures_util::TryStreamExt;
use std::pin::pin;
use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream};
use tokio_postgres::types::Type;

Expand All@@ -16,8 +17,7 @@ async fn write_basic() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));
writer.as_mut().write(&[&1i32, &"foobar"]).await.unwrap();
writer
.as_mut()
Expand DownExpand Up@@ -50,8 +50,7 @@ async fn write_many_rows() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]));

for i in 0..10_000i32 {
writer
Expand DownExpand Up@@ -86,8 +85,7 @@ async fn write_big_rows() {
.copy_in("COPY foo (id, bar) FROM STDIN BINARY")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]);
pin_mut!(writer);
let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]));

for i in 0..2i32 {
writer
Expand Down
16 changes: 6 additions & 10 deletionstokio-postgres/tests/test/main.rs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2,12 +2,11 @@

use bytes::{Bytes, BytesMut};
use futures_channel::mpsc;
use futures_util::{
future, join, pin_mut, stream, try_join, Future, FutureExt, SinkExt, StreamExt, TryStreamExt,
};
use futures_util::{join, stream, try_join, FutureExt, SinkExt, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use std::fmt::Write;
use std::pin::Pin;
use std::future::{self, Future};
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::net::TcpStream;
Expand DownExpand Up@@ -589,8 +588,7 @@ async fn copy_in() {
.into_iter()
.map(Ok::<_, Error>),
);
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
assert_eq!(rows, 2);
Expand DownExpand Up@@ -636,8 +634,7 @@ async fn copy_in_large() {
.map(Ok::<_, Error>),
);

let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send_all(&mut stream).await.unwrap();
let rows = sink.finish().await.unwrap();
assert_eq!(rows, 10_000);
Expand All@@ -658,8 +655,7 @@ async fn copy_in_error() {
.unwrap();

{
let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap();
pin_mut!(sink);
let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap());
sink.send(Bytes::from_static(b"1\tsteven")).await.unwrap();
}

Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp