Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit73f8b0d

Browse files
committed
Fixed first run with empty result set
1 parent14ca0ae commit73f8b0d

File tree

1 file changed

+133
-95
lines changed

1 file changed

+133
-95
lines changed

‎src/watcher.rs‎

Lines changed: 133 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ where
6161
.unwrap();
6262
}
6363
}
64+
65+
eprintln!("stream ended");
6466
});
6567

6668
let ctx =Arc::clone(&self.ctx);
@@ -69,6 +71,8 @@ where
6971
while rx.recv().await.is_some(){
7072
ctx.write().await.handle_event().await;
7173
}
74+
75+
eprintln!("channel closed");
7276
});
7377

7478
self.ctx.write().await.start().await;
@@ -92,6 +96,9 @@ pub struct WatcherCtx<T> {
9296
#[new(default)]
9397
source_tables:Vec<String>,
9498

99+
#[new(value ="true")]
100+
first_run:bool,
101+
95102
phantom: std::marker::PhantomData<T>,
96103
}
97104

@@ -105,26 +112,70 @@ where
105112
.await
106113
.unwrap();
107114

108-
self.setup_query_result_table().await;
109-
self.get_query_result_columns().await;
115+
self.init().await
116+
}
117+
118+
pubasyncfninit(&mutself){
110119
self.collect_source_tables();
111120
self.create_triggers().await;
121+
122+
// If no initial record, we can't infer the table schema. Skipping until the first event
123+
if !self.setup_query_result_table().await{
124+
return;
125+
}
126+
127+
self.update_result_table().await;
112128
}
113129

114-
pubasyncfnget_query_result_columns(&self) ->Vec<(String,String)>{
115-
letquery =format!("SELECT * FROM ({}) AS tamere LIMIT 1",self.query);
130+
pubasyncfnsetup_query_result_table(&mutself) ->bool{
131+
lettmp_table_name ="query_result";
116132

117-
let columns =self
118-
.client
119-
.query(&query,&[])
120-
.await
121-
.unwrap()
122-
.get(0)
123-
.unwrap()
124-
.columns()
133+
let columns =self.get_query_result_columns().await;
134+
135+
if columns.is_empty(){
136+
// Delay the setup until there is at least one record
137+
returnfalse;
138+
}
139+
140+
let fields = columns.iter().map(|(name, _)| name.clone()).collect();
141+
142+
let columns_def = columns
125143
.iter()
126-
.map(|c|(c.name().to_string(), c.type_().name().to_string()))
127-
.collect();
144+
.map(|(name, t)|format!("{} {} NOT NULL", name, t))
145+
.collect::<Vec<_>>()
146+
.join(",\n");
147+
148+
let query =format!(
149+
r#"
150+
CREATE TEMP TABLE {} (
151+
{}
152+
)
153+
"#,
154+
tmp_table_name, columns_def,
155+
);
156+
157+
self.client.execute(&query,&[]).await.unwrap();
158+
159+
self.result_table =TmpTable{
160+
name: tmp_table_name.to_string(),
161+
fields,
162+
};
163+
164+
true
165+
}
166+
167+
pubasyncfnget_query_result_columns(&self) ->Vec<(String,String)>{
168+
let query =format!("SELECT * FROM ({}) q LIMIT 1",self.query);
169+
170+
let columns =ifletSome(first) =self.client.query(&query,&[]).await.unwrap().get(0){
171+
first
172+
.columns()
173+
.iter()
174+
.map(|c|(c.name().to_string(), c.type_().name().to_string()))
175+
.collect()
176+
}else{
177+
vec![]
178+
};
128179

129180
columns
130181
}
@@ -161,36 +212,6 @@ where
161212
self.source_tables = names;
162213
}
163214

164-
pubasyncfnsetup_query_result_table(&mutself){
165-
let tmp_table_name ="query_result";
166-
167-
let columns =self.get_query_result_columns().await;
168-
169-
let fields = columns.iter().map(|(name, _)| name.clone()).collect();
170-
171-
let columns_def = columns
172-
.iter()
173-
.map(|(name, t)|format!("{} {} NOT NULL", name, t))
174-
.collect::<Vec<_>>()
175-
.join(",\n");
176-
177-
let query =format!(
178-
r#"
179-
CREATE TEMP TABLE {} (
180-
{}
181-
)
182-
"#,
183-
tmp_table_name, columns_def,
184-
);
185-
186-
self.client.execute(&query,&[]).await.unwrap();
187-
188-
self.result_table =TmpTable{
189-
name: tmp_table_name.to_string(),
190-
fields,
191-
};
192-
}
193-
194215
pubasyncfncreate_triggers(&mutself){
195216
for(i, table_name)inself.source_tables.iter().enumerate(){
196217
if !self.triggers.contains(&table_name){
@@ -199,39 +220,39 @@ where
199220

200221
let drop_sql =format!(
201222
r#"
202-
DROP TRIGGER IF EXISTS
203-
{}
204-
ON
205-
{}
206-
"#,
223+
DROP TRIGGER IF EXISTS
224+
{}
225+
ON
226+
{}
227+
"#,
207228
trigger_name, table_name
208229
);
209230

210231
self.client.execute(&drop_sql,&[]).await.unwrap();
211232

212233
let func_sql =format!(
213234
r#"
214-
CREATE OR REPLACE FUNCTION pg_temp.{}()
215-
RETURNS TRIGGER AS $$
216-
BEGIN
217-
EXECUTE pg_notify('__live_update', '{}');
218-
RETURN NULL;
219-
END;
220-
$$ LANGUAGE plpgsql
221-
"#,
235+
CREATE OR REPLACE FUNCTION pg_temp.{}()
236+
RETURNS TRIGGER AS $$
237+
BEGIN
238+
EXECUTE pg_notify('__live_update', '{}');
239+
RETURN NULL;
240+
END;
241+
$$ LANGUAGE plpgsql
242+
"#,
222243
trigger_name, l_key
223244
);
224245

225246
self.client.execute(&func_sql,&[]).await.unwrap();
226247

227248
let create_sql =format!(
228249
"
229-
CREATE TRIGGER
230-
{}
231-
AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON
232-
{}
233-
EXECUTE PROCEDURE pg_temp.{}()
234-
",
250+
CREATE TRIGGER
251+
{}
252+
AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON
253+
{}
254+
EXECUTE PROCEDURE pg_temp.{}()
255+
",
235256
trigger_name, table_name, trigger_name
236257
);
237258

@@ -256,40 +277,40 @@ where
256277
let cols =self.result_table.fields.join(", ");
257278
let update_sql =format!(
258279
"WITH
259-
q AS (
260-
SELECT
261-
*,
262-
ROW_NUMBER() OVER() AS lol
263-
FROM
264-
({}) t
265-
),
266-
i AS (
267-
INSERT INTO {i_table} (
280+
q AS (
281+
SELECT
282+
*,
283+
ROW_NUMBER() OVER() AS lol
284+
FROM
285+
({}) t
286+
),
287+
i AS (
288+
INSERT INTO {i_table} (
289+
{cols}
290+
)
291+
SELECT
268292
{cols}
269-
)
270-
SELECT
271-
{cols}
272-
FROM
273-
q
274-
WHERE q.id NOT IN (
293+
FROM
294+
q
295+
WHERE q.id NOT IN (
275296
SELECT id FROM {i_table}
276297
)
277-
RETURNING
278-
{i_table}.*
279-
)
280-
SELECT
281-
jsonb_build_object(
282-
'id', i.id,
283-
'op', 1,
284-
'data', jsonb_build_object(
298+
RETURNING
299+
{i_table}.*
300+
)
301+
SELECT
302+
jsonb_build_object(
303+
'id', i.id,
304+
'op', 1,
305+
'data', jsonb_build_object(
285306
{q_obj}
286307
)
287-
) AS c
288-
FROM
289-
i JOIN
290-
q ON
291-
i.id = q.id
292-
",
308+
) AS c
309+
FROM
310+
i JOIN
311+
q ON
312+
i.id = q.id
313+
",
293314
self.query
294315
);
295316

@@ -301,6 +322,13 @@ where
301322
panic!("error {}", err);
302323
});
303324

325+
// Don't send the first result, as it will contains the whole query.
326+
ifself.first_run{
327+
self.first_run =false;
328+
329+
return;
330+
}
331+
304332
let res = res
305333
.into_iter()
306334
.map(|row| row.get("c"))
@@ -327,6 +355,15 @@ where
327355
}
328356

329357
pubasyncfnhandle_event(&mutself){
358+
// The table was previously empty, sending it all after setting up the triggers.
359+
ifself.result_table.fields.is_empty(){
360+
self.first_run =false;
361+
362+
if !self.setup_query_result_table().await{
363+
return;
364+
}
365+
}
366+
330367
self.update_result_table().await;
331368
}
332369
}
@@ -338,9 +375,10 @@ pub async fn watch<T>(
338375
where
339376
T:Debug +Send +Sync +'static +DeserializeOwned,
340377
{
341-
let(client, connection) = tokio_postgres::connect("host=localhost user=postgres",NoTls)
342-
.await
343-
.unwrap();
378+
let(client, connection) =
379+
tokio_postgres::connect("host=localhost user=postgres dbname=tamere",NoTls)
380+
.await
381+
.unwrap();
344382

345383
letmut watcher =Watcher::new(Arc::new(RwLock::new(WatcherCtx::new(
346384
handler,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp