6161. unwrap ( ) ;
6262}
6363}
64+
65+ eprintln ! ( "stream ended" ) ;
6466} ) ;
6567
6668let ctx =Arc :: clone ( & self . ctx ) ;
6971while rx. recv ( ) . await . is_some ( ) {
7072 ctx. write ( ) . await . handle_event ( ) . await ;
7173}
74+
75+ eprintln ! ( "channel closed" ) ;
7276} ) ;
7377
7478self . ctx . write ( ) . await . start ( ) . await ;
@@ -92,6 +96,9 @@ pub struct WatcherCtx<T> {
9296#[ new( default ) ]
9397source_tables : Vec < String > ,
9498
99+ #[ new( value ="true" ) ]
100+ first_run : bool ,
101+
95102phantom : std:: marker:: PhantomData < T > ,
96103}
97104
@@ -105,26 +112,70 @@ where
105112. await
106113. unwrap ( ) ;
107114
108- self . setup_query_result_table ( ) . await ;
109- self . get_query_result_columns ( ) . await ;
115+ self . init ( ) . await
116+ }
117+
118+ pub async fn init ( & mut self ) {
110119self . collect_source_tables ( ) ;
111120self . create_triggers ( ) . await ;
121+
122+ // If no initial record, we can't infer the table schema. Skipping until the first event
123+ if !self . setup_query_result_table ( ) . await {
124+ return ;
125+ }
126+
127+ self . update_result_table ( ) . await ;
112128}
113129
114- pub async fn get_query_result_columns ( & self ) ->Vec < ( String , String ) > {
115- let query =format ! ( "SELECT * FROM ({}) AS tamere LIMIT 1" , self . query ) ;
130+ pub async fn setup_query_result_table ( & mut self ) ->bool {
131+ let tmp_table_name ="query_result" ;
116132
117- let columns =self
118- . client
119- . query ( & query, & [ ] )
120- . await
121- . unwrap ( )
122- . get ( 0 )
123- . unwrap ( )
124- . columns ( )
133+ let columns =self . get_query_result_columns ( ) . await ;
134+
135+ if columns. is_empty ( ) {
136+ // Delay the setup until there is at least one record
137+ return false ;
138+ }
139+
140+ let fields = columns. iter ( ) . map ( |( name, _) | name. clone ( ) ) . collect ( ) ;
141+
142+ let columns_def = columns
125143. iter ( )
126- . map ( |c|( c. name ( ) . to_string ( ) , c. type_ ( ) . name ( ) . to_string ( ) ) )
127- . collect ( ) ;
144+ . map ( |( name, t) |format ! ( "{} {} NOT NULL" , name, t) )
145+ . collect :: < Vec < _ > > ( )
146+ . join ( ",\n " ) ;
147+
148+ let query =format ! (
149+ r#"
150+ CREATE TEMP TABLE {} (
151+ {}
152+ )
153+ "# ,
154+ tmp_table_name, columns_def,
155+ ) ;
156+
157+ self . client . execute ( & query, & [ ] ) . await . unwrap ( ) ;
158+
159+ self . result_table =TmpTable {
160+ name : tmp_table_name. to_string ( ) ,
161+ fields,
162+ } ;
163+
164+ true
165+ }
166+
167+ pub async fn get_query_result_columns ( & self ) ->Vec < ( String , String ) > {
168+ let query =format ! ( "SELECT * FROM ({}) q LIMIT 1" , self . query) ;
169+
170+ let columns =if let Some ( first) =self . client . query ( & query, & [ ] ) . await . unwrap ( ) . get ( 0 ) {
171+ first
172+ . columns ( )
173+ . iter ( )
174+ . map ( |c|( c. name ( ) . to_string ( ) , c. type_ ( ) . name ( ) . to_string ( ) ) )
175+ . collect ( )
176+ } else {
177+ vec ! [ ]
178+ } ;
128179
129180 columns
130181}
@@ -161,36 +212,6 @@ where
161212self . source_tables = names;
162213}
163214
164- pub async fn setup_query_result_table ( & mut self ) {
165- let tmp_table_name ="query_result" ;
166-
167- let columns =self . get_query_result_columns ( ) . await ;
168-
169- let fields = columns. iter ( ) . map ( |( name, _) | name. clone ( ) ) . collect ( ) ;
170-
171- let columns_def = columns
172- . iter ( )
173- . map ( |( name, t) |format ! ( "{} {} NOT NULL" , name, t) )
174- . collect :: < Vec < _ > > ( )
175- . join ( ",\n " ) ;
176-
177- let query =format ! (
178- r#"
179- CREATE TEMP TABLE {} (
180- {}
181- )
182- "# ,
183- tmp_table_name, columns_def,
184- ) ;
185-
186- self . client . execute ( & query, & [ ] ) . await . unwrap ( ) ;
187-
188- self . result_table =TmpTable {
189- name : tmp_table_name. to_string ( ) ,
190- fields,
191- } ;
192- }
193-
194215pub async fn create_triggers ( & mut self ) {
195216for ( i, table_name) in self . source_tables . iter ( ) . enumerate ( ) {
196217if !self . triggers . contains ( & table_name) {
@@ -199,39 +220,39 @@ where
199220
200221let drop_sql =format ! (
201222r#"
202- DROP TRIGGER IF EXISTS
203- {}
204- ON
205- {}
206- "#,
223+ DROP TRIGGER IF EXISTS
224+ {}
225+ ON
226+ {}
227+ "#,
207228 trigger_name, table_name
208229) ;
209230
210231self . client . execute ( & drop_sql, & [ ] ) . await . unwrap ( ) ;
211232
212233let func_sql =format ! (
213234r#"
214- CREATE OR REPLACE FUNCTION pg_temp.{}()
215- RETURNS TRIGGER AS $$
216- BEGIN
217- EXECUTE pg_notify('__live_update', '{}');
218- RETURN NULL;
219- END;
220- $$ LANGUAGE plpgsql
221- "#,
235+ CREATE OR REPLACE FUNCTION pg_temp.{}()
236+ RETURNS TRIGGER AS $$
237+ BEGIN
238+ EXECUTE pg_notify('__live_update', '{}');
239+ RETURN NULL;
240+ END;
241+ $$ LANGUAGE plpgsql
242+ "#,
222243 trigger_name, l_key
223244) ;
224245
225246self . client . execute ( & func_sql, & [ ] ) . await . unwrap ( ) ;
226247
227248let create_sql =format ! (
228249"
229- CREATE TRIGGER
230- {}
231- AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON
232- {}
233- EXECUTE PROCEDURE pg_temp.{}()
234- ",
250+ CREATE TRIGGER
251+ {}
252+ AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON
253+ {}
254+ EXECUTE PROCEDURE pg_temp.{}()
255+ ",
235256 trigger_name, table_name, trigger_name
236257) ;
237258
@@ -256,40 +277,40 @@ where
256277let cols =self . result_table . fields . join ( ", " ) ;
257278let update_sql =format ! (
258279"WITH
259- q AS (
260- SELECT
261- *,
262- ROW_NUMBER() OVER() AS lol
263- FROM
264- ({}) t
265- ),
266- i AS (
267- INSERT INTO {i_table} (
280+ q AS (
281+ SELECT
282+ *,
283+ ROW_NUMBER() OVER() AS lol
284+ FROM
285+ ({}) t
286+ ),
287+ i AS (
288+ INSERT INTO {i_table} (
289+ {cols}
290+ )
291+ SELECT
268292 {cols}
269- )
270- SELECT
271- {cols}
272- FROM
273- q
274- WHERE q.id NOT IN (
293+ FROM
294+ q
295+ WHERE q.id NOT IN (
275296 SELECT id FROM {i_table}
276297 )
277- RETURNING
278- {i_table}.*
279- )
280- SELECT
281- jsonb_build_object(
282- 'id', i.id,
283- 'op', 1,
284- 'data', jsonb_build_object(
298+ RETURNING
299+ {i_table}.*
300+ )
301+ SELECT
302+ jsonb_build_object(
303+ 'id', i.id,
304+ 'op', 1,
305+ 'data', jsonb_build_object(
285306 {q_obj}
286307 )
287- ) AS c
288- FROM
289- i JOIN
290- q ON
291- i.id = q.id
292- ",
308+ ) AS c
309+ FROM
310+ i JOIN
311+ q ON
312+ i.id = q.id
313+ ",
293314self . query
294315) ;
295316
@@ -301,6 +322,13 @@ where
301322panic ! ( "error {}" , err) ;
302323} ) ;
303324
325+ // Don't send the first result, as it will contains the whole query.
326+ if self . first_run {
327+ self . first_run =false ;
328+
329+ return ;
330+ }
331+
304332let res = res
305333. into_iter ( )
306334. map ( |row| row. get ( "c" ) )
@@ -327,6 +355,15 @@ where
327355}
328356
329357pub async fn handle_event ( & mut self ) {
358+ // The table was previously empty, sending it all after setting up the triggers.
359+ if self . result_table . fields . is_empty ( ) {
360+ self . first_run =false ;
361+
362+ if !self . setup_query_result_table ( ) . await {
363+ return ;
364+ }
365+ }
366+
330367self . update_result_table ( ) . await ;
331368}
332369}
@@ -338,9 +375,10 @@ pub async fn watch<T>(
338375where
339376T : Debug +Send +Sync +' static +DeserializeOwned ,
340377{
341- let ( client, connection) = tokio_postgres:: connect ( "host=localhost user=postgres" , NoTls )
342- . await
343- . unwrap ( ) ;
378+ let ( client, connection) =
379+ tokio_postgres:: connect ( "host=localhost user=postgres dbname=tamere" , NoTls )
380+ . await
381+ . unwrap ( ) ;
344382
345383let mut watcher =Watcher :: new ( Arc :: new ( RwLock :: new ( WatcherCtx :: new (
346384 handler,