1+ /// Adapted from https://github.com/nothingisdead/pg-live-query/blob/master/watcher.js
12use async_trait:: async_trait;
23use derive_new:: new;
34use futures:: StreamExt ;
@@ -270,7 +271,21 @@ where
270271. collect :: < Vec < _ > > ( )
271272. join ( "," ) ;
272273
274+ let u_obj =self
275+ . result_table
276+ . fields
277+ . iter ( )
278+ . map ( |name|format ! ( "'{name}', u.{name}" ) )
279+ . collect :: < Vec < _ > > ( )
280+ . join ( "," ) ;
273281let cols =self . result_table . fields . join ( ", " ) ;
282+ let set_cols =self
283+ . result_table
284+ . fields
285+ . iter ( )
286+ . map ( |name|format ! ( "{} = q.{}" , name, name) )
287+ . collect :: < Vec < _ > > ( )
288+ . join ( ", " ) ;
274289let update_sql =format ! (
275290"WITH
276291 q AS (
@@ -293,7 +308,17 @@ where
293308 )
294309 RETURNING
295310 {i_table}.*
296- )
311+ ),
312+ u AS (
313+ UPDATE {i_table} SET
314+ {set_cols}
315+ FROM
316+ q
317+ WHERE
318+ {i_table}.id = q.id
319+ RETURNING
320+ {i_table}.*
321+ )
297322 SELECT
298323 jsonb_build_object(
299324 'id', i.id,
@@ -306,6 +331,17 @@ where
306331 i JOIN
307332 q ON
308333 i.id = q.id
334+ UNION ALL
335+ SELECT
336+ jsonb_build_object(
337+ 'id', u.id,
338+ 'op', 2,
339+ 'data', jsonb_build_object({u_obj})
340+ ) AS c
341+ FROM
342+ u JOIN
343+ q ON
344+ u.id = q.id
309345 " ,
310346self . query
311347) ;
@@ -342,6 +378,7 @@ where
342378let data = serde_json:: from_value ( event. data ) . unwrap ( ) ;
343379match event. op {
3443801 =>Event :: Insert ( data) ,
381+ 2 =>Event :: Update ( data) ,
345382 _ =>unimplemented ! ( ) ,
346383}
347384} )
@@ -371,10 +408,9 @@ pub async fn watch<T>(
371408where
372409T : Debug +Send +Sync +' static +DeserializeOwned ,
373410{
374- let ( client, connection) =
375- tokio_postgres:: connect ( "host=localhost user=postgres dbname=tamere" , NoTls )
376- . await
377- . unwrap ( ) ;
411+ let db_url = std:: env:: var ( "DATABASE_URL" ) . unwrap ( ) ;
412+
413+ let ( client, connection) = tokio_postgres:: connect ( & db_url, NoTls ) . await . unwrap ( ) ;
378414
379415let mut watcher =Watcher :: new ( Arc :: new ( RwLock :: new ( WatcherCtx :: new (
380416 handler,