1
1
use crate :: client:: { InnerClient , Responses } ;
2
2
use crate :: codec:: FrontendMessage ;
3
3
use crate :: connection:: RequestMessages ;
4
+ use crate :: prepare:: get_type;
4
5
use crate :: types:: { BorrowToSql , IsNull } ;
5
- use crate :: { Error , Portal , Row , Statement } ;
6
+ use crate :: { Column , Error , Portal , Row , Statement } ;
6
7
use bytes:: { Bytes , BytesMut } ;
8
+ use fallible_iterator:: FallibleIterator ;
7
9
use futures_util:: { ready, Stream } ;
8
10
use log:: { debug, log_enabled, Level } ;
9
11
use pin_project_lite:: pin_project;
10
12
use postgres_protocol:: message:: backend:: { CommandCompleteBody , Message } ;
11
13
use postgres_protocol:: message:: frontend;
14
+ use postgres_types:: Type ;
12
15
use std:: fmt;
13
16
use std:: marker:: PhantomPinned ;
14
17
use std:: pin:: Pin ;
18
+ use std:: sync:: Arc ;
15
19
use std:: task:: { Context , Poll } ;
16
20
17
21
struct BorrowToSqlParamsDebug < ' a , T > ( & ' a [ T ] ) ;
57
61
} )
58
62
}
59
63
64
+ pub async fn query_typed < ' a , P , I > (
65
+ client : & Arc < InnerClient > ,
66
+ query : & str ,
67
+ params : I ,
68
+ ) ->Result < RowStream , Error >
69
+ where
70
+ P : BorrowToSql ,
71
+ I : IntoIterator < Item =( P , Type ) > ,
72
+ I :: IntoIter : ExactSizeIterator ,
73
+ {
74
+ let ( params, param_types) : ( Vec < _ > , Vec < _ > ) = params. into_iter ( ) . unzip ( ) ;
75
+
76
+ let params = params. into_iter ( ) ;
77
+
78
+ let param_oids = param_types. iter ( ) . map ( |t| t. oid ( ) ) . collect :: < Vec < _ > > ( ) ;
79
+
80
+ let params = params. into_iter ( ) ;
81
+
82
+ let buf = client. with_buf ( |buf|{
83
+ frontend:: parse ( "" , query, param_oids. into_iter ( ) , buf) . map_err ( Error :: parse) ?;
84
+
85
+ encode_bind_with_statement_name_and_param_types ( "" , & param_types, params, "" , buf) ?;
86
+
87
+ frontend:: describe ( b'S' , "" , buf) . map_err ( Error :: encode) ?;
88
+
89
+ frontend:: execute ( "" , 0 , buf) . map_err ( Error :: encode) ?;
90
+
91
+ frontend:: sync ( buf) ;
92
+
93
+ Ok ( buf. split ( ) . freeze ( ) )
94
+ } ) ?;
95
+
96
+ let mut responses = client. send ( RequestMessages :: Single ( FrontendMessage :: Raw ( buf) ) ) ?;
97
+
98
+ loop {
99
+ match responses. next ( ) . await ?{
100
+ Message :: ParseComplete
101
+ |Message :: BindComplete
102
+ |Message :: ParameterDescription ( _)
103
+ |Message :: NoData =>{ }
104
+ Message :: RowDescription ( row_description) =>{
105
+ let mut columns: Vec < Column > =vec ! [ ] ;
106
+ let mut it = row_description. fields ( ) ;
107
+ while let Some ( field) = it. next ( ) . map_err ( Error :: parse) ?{
108
+ let type_ =get_type ( client, field. type_oid ( ) ) . await ?;
109
+ let column =Column {
110
+ name : field. name ( ) . to_string ( ) ,
111
+ table_oid : Some ( field. table_oid ( ) ) . filter ( |n|* n !=0 ) ,
112
+ column_id : Some ( field. column_id ( ) ) . filter ( |n|* n !=0 ) ,
113
+ r#type : type_,
114
+ } ;
115
+ columns. push ( column) ;
116
+ }
117
+ return Ok ( RowStream {
118
+ statement : Statement :: unnamed ( vec ! [ ] , columns) ,
119
+ responses,
120
+ rows_affected : None ,
121
+ _p : PhantomPinned ,
122
+ } ) ;
123
+ }
124
+ _ =>return Err ( Error :: unexpected_message ( ) ) ,
125
+ }
126
+ }
127
+ }
128
+
60
129
pub async fn query_portal (
61
130
client : & InnerClient ,
62
131
portal : & Portal ,
@@ -164,7 +233,27 @@ where
164
233
I : IntoIterator < Item =P > ,
165
234
I :: IntoIter : ExactSizeIterator ,
166
235
{
167
- let param_types = statement. params ( ) ;
236
+ encode_bind_with_statement_name_and_param_types (
237
+ statement. name ( ) ,
238
+ statement. params ( ) ,
239
+ params,
240
+ portal,
241
+ buf,
242
+ )
243
+ }
244
+
245
+ fn encode_bind_with_statement_name_and_param_types < P , I > (
246
+ statement_name : & str ,
247
+ param_types : & [ Type ] ,
248
+ params : I ,
249
+ portal : & str ,
250
+ buf : & mut BytesMut ,
251
+ ) ->Result < ( ) , Error >
252
+ where
253
+ P : BorrowToSql ,
254
+ I : IntoIterator < Item =P > ,
255
+ I :: IntoIter : ExactSizeIterator ,
256
+ {
168
257
let params = params. into_iter ( ) ;
169
258
170
259
if param_types. len ( ) != params. len ( ) {
@@ -181,7 +270,7 @@ where
181
270
let mut error_idx =0 ;
182
271
let r = frontend:: bind (
183
272
portal,
184
- statement . name ( ) ,
273
+ statement_name ,
185
274
param_formats,
186
275
params. zip ( param_types) . enumerate ( ) ,
187
276
|( idx, ( param, ty) ) , buf|match param. borrow_to_sql ( ) . to_sql_checked ( ty, buf) {