@@ -303,7 +303,9 @@ aqo_stat_store(uint64 queryid, bool use_aqo,
303303entry -> exec_time [pos ]= exec_time ;
304304entry -> est_error [pos ]= est_error ;
305305}
306+
306307entry = memcpy (palloc (sizeof (StatEntry )),entry ,sizeof (StatEntry ));
308+ aqo_state -> stat_changed = true;
307309LWLockRelease (& aqo_state -> stat_lock );
308310return entry ;
309311}
@@ -425,14 +427,24 @@ aqo_stat_flush(void)
425427int ret ;
426428long entries ;
427429
428- LWLockAcquire (& aqo_state -> stat_lock ,LW_SHARED );
430+ /* Use exclusive lock to prevent concurrent flushing in different backends. */
431+ LWLockAcquire (& aqo_state -> stat_lock ,LW_EXCLUSIVE );
432+
433+ if (!aqo_state -> stat_changed )
434+ /* Hash table wasn't changed, meaningless to store it in permanent storage */
435+ gotoend ;
436+
429437entries = hash_get_num_entries (stat_htab );
430438hash_seq_init (& hash_seq ,stat_htab );
431439ret = data_store (PGAQO_STAT_FILE ,_form_stat_record_cb ,entries ,
432440 (void * )& hash_seq );
433441if (ret != 0 )
434442hash_seq_term (& hash_seq );
443+ else
444+ /* Hash table and disk storage are now consistent */
445+ aqo_state -> stat_changed = false;
435446
447+ end :
436448LWLockRelease (& aqo_state -> stat_lock );
437449}
438450
@@ -469,7 +481,7 @@ aqo_qtexts_flush(void)
469481long entries ;
470482
471483dsa_init ();
472- LWLockAcquire (& aqo_state -> qtexts_lock ,LW_SHARED );
484+ LWLockAcquire (& aqo_state -> qtexts_lock ,LW_EXCLUSIVE );
473485
474486if (!aqo_state -> qtexts_changed )
475487/* XXX: mull over forced mode. */
@@ -481,7 +493,9 @@ aqo_qtexts_flush(void)
481493 (void * )& hash_seq );
482494if (ret != 0 )
483495hash_seq_term (& hash_seq );
484- aqo_state -> qtexts_changed = false;
496+ else
497+ /* Hash table and disk storage are now consistent */
498+ aqo_state -> qtexts_changed = false;
485499
486500end :
487501LWLockRelease (& aqo_state -> qtexts_lock );
@@ -531,7 +545,7 @@ aqo_data_flush(void)
531545long entries ;
532546
533547dsa_init ();
534- LWLockAcquire (& aqo_state -> data_lock ,LW_SHARED );
548+ LWLockAcquire (& aqo_state -> data_lock ,LW_EXCLUSIVE );
535549
536550if (!aqo_state -> data_changed )
537551/* XXX: mull over forced mode. */
@@ -548,6 +562,7 @@ aqo_data_flush(void)
548562 */
549563hash_seq_term (& hash_seq );
550564else
565+ /* Hash table and disk storage are now consistent */
551566aqo_state -> data_changed = false;
552567end :
553568LWLockRelease (& aqo_state -> data_lock );
@@ -574,14 +589,22 @@ aqo_queries_flush(void)
574589int ret ;
575590long entries ;
576591
577- LWLockAcquire (& aqo_state -> queries_lock ,LW_SHARED );
592+ LWLockAcquire (& aqo_state -> queries_lock ,LW_EXCLUSIVE );
593+
594+ if (!aqo_state -> queries_changed )
595+ gotoend ;
596+
578597entries = hash_get_num_entries (queries_htab );
579598hash_seq_init (& hash_seq ,queries_htab );
580599ret = data_store (PGAQO_QUERIES_FILE ,_form_queries_record_cb ,entries ,
581600 (void * )& hash_seq );
582601if (ret != 0 )
583602hash_seq_term (& hash_seq );
603+ else
604+ /* Hash table and disk storage are now consistent */
605+ aqo_state -> queries_changed = false;
584606
607+ end :
585608LWLockRelease (& aqo_state -> queries_lock );
586609}
587610
@@ -621,7 +644,8 @@ data_store(const char *filename, form_record_t callback,
621644gotoerror ;
622645}
623646
624- (void )durable_rename (tmpfile ,filename ,LOG );
647+ /* Parallel (re)writing into a file haven't happen. */
648+ (void )durable_rename (tmpfile ,filename ,PANIC );
625649elog (LOG ,"[AQO] %d records stored in file %s." ,counter ,filename );
626650return 0 ;
627651
@@ -839,7 +863,7 @@ aqo_queries_load(void)
839863
840864LWLockAcquire (& aqo_state -> queries_lock ,LW_EXCLUSIVE );
841865
842- /* Load on postmastersturtup . So no any concurrent actions possible here. */
866+ /* Load on postmasterstartup . So no any concurrent actions possible here. */
843867Assert (hash_get_num_entries (queries_htab )== 0 );
844868
845869data_load (PGAQO_QUERIES_FILE ,_deform_queries_record_cb ,NULL );
@@ -926,6 +950,9 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
926950static void
927951on_shmem_shutdown (int code ,Datum arg )
928952{
953+ /*
954+ * XXX: It can be expensive to rewrite a file on each shutdown of a backend.
955+ */
929956aqo_qtexts_flush ();
930957aqo_data_flush ();
931958}
@@ -1201,6 +1228,7 @@ _aqo_data_remove(data_key *key)
12011228
12021229if (hash_search (data_htab ,key ,HASH_REMOVE ,NULL )== NULL )
12031230elog (PANIC ,"[AQO] Inconsistent data hash table" );
1231+
12041232aqo_state -> data_changed = true;
12051233}
12061234
@@ -1270,8 +1298,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
12701298char * ptr ;
12711299ListCell * lc ;
12721300size_t size ;
1273- bool tblOverflow ;
1274- HASHACTION action ;
1301+ bool tblOverflow ;
1302+ HASHACTION action ;
1303+ bool result ;
12751304
12761305Assert (!LWLockHeldByMe (& aqo_state -> data_lock ));
12771306
@@ -1322,7 +1351,6 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13221351}
13231352
13241353Assert (DsaPointerIsValid (entry -> data_dp ));
1325- Assert (entry -> rows <=data -> rows );/* Reserved for the future features */
13261354
13271355if (entry -> cols != data -> cols || entry -> nrels != list_length (reloids ))
13281356{
@@ -1388,8 +1416,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13881416
13891417aqo_state -> data_changed = true;
13901418end :
1419+ result = aqo_state -> data_changed ;
13911420LWLockRelease (& aqo_state -> data_lock );
1392- return aqo_state -> data_changed ;
1421+ return result ;
13931422}
13941423
13951424static void
@@ -1497,7 +1526,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
14971526
14981527dsa_init ();
14991528
1500- LWLockAcquire (& aqo_state -> data_lock ,LW_EXCLUSIVE );
1529+ LWLockAcquire (& aqo_state -> data_lock ,LW_SHARED );
15011530
15021531if (!wideSearch )
15031532{
@@ -1632,7 +1661,8 @@ aqo_data(PG_FUNCTION_ARGS)
16321661ptr += sizeof (data_key );
16331662
16341663if (entry -> cols > 0 )
1635- values [AD_FEATURES ]= PointerGetDatum (form_matrix ((double * )ptr ,entry -> rows ,entry -> cols ));
1664+ values [AD_FEATURES ]= PointerGetDatum (form_matrix ((double * )ptr ,
1665+ entry -> rows ,entry -> cols ));
16361666else
16371667nulls [AD_FEATURES ]= true;
16381668
@@ -1720,7 +1750,9 @@ aqo_data_reset(void)
17201750elog (ERROR ,"[AQO] hash table corrupted" );
17211751num_remove ++ ;
17221752}
1723- aqo_state -> data_changed = true;
1753+
1754+ if (num_remove > 0 )
1755+ aqo_state -> data_changed = true;
17241756LWLockRelease (& aqo_state -> data_lock );
17251757if (num_remove != num_entries )
17261758elog (ERROR ,"[AQO] Query ML memory storage is corrupted or parallel access without a lock has detected." );
@@ -1832,6 +1864,7 @@ aqo_queries_store(uint64 queryid,
18321864entry -> use_aqo = use_aqo ;
18331865entry -> auto_tuning = auto_tuning ;
18341866
1867+ aqo_state -> queries_changed = true;
18351868LWLockRelease (& aqo_state -> queries_lock );
18361869return true;
18371870}
@@ -1857,7 +1890,10 @@ aqo_queries_reset(void)
18571890elog (ERROR ,"[AQO] hash table corrupted" );
18581891num_remove ++ ;
18591892}
1860- aqo_state -> queries_changed = true;
1893+
1894+ if (num_remove > 0 )
1895+ aqo_state -> queries_changed = true;
1896+
18611897LWLockRelease (& aqo_state -> queries_lock );
18621898
18631899if (num_remove != num_entries - 1 )