@@ -303,7 +303,9 @@ aqo_stat_store(uint64 queryid, bool use_aqo,
303303entry -> exec_time [pos ]= exec_time ;
304304entry -> est_error [pos ]= est_error ;
305305}
306+
306307entry = memcpy (palloc (sizeof (StatEntry )),entry ,sizeof (StatEntry ));
308+ aqo_state -> stat_changed = true;
307309LWLockRelease (& aqo_state -> stat_lock );
308310return entry ;
309311}
@@ -425,14 +427,24 @@ aqo_stat_flush(void)
425427int ret ;
426428long entries ;
427429
428- LWLockAcquire (& aqo_state -> stat_lock ,LW_SHARED );
430+ /* Use exclusive lock to prevent concurrent flushing in different backends. */
431+ LWLockAcquire (& aqo_state -> stat_lock ,LW_EXCLUSIVE );
432+
433+ if (!aqo_state -> stat_changed )
434+ /* Hash table wasn't changed, meaningless to store it in permanent storage */
435+ gotoend ;
436+
429437entries = hash_get_num_entries (stat_htab );
430438hash_seq_init (& hash_seq ,stat_htab );
431439ret = data_store (PGAQO_STAT_FILE ,_form_stat_record_cb ,entries ,
432440 (void * )& hash_seq );
433441if (ret != 0 )
434442hash_seq_term (& hash_seq );
443+ else
444+ /* Hash table and disk storage are now consistent */
445+ aqo_state -> stat_changed = false;
435446
447+ end :
436448LWLockRelease (& aqo_state -> stat_lock );
437449}
438450
@@ -469,7 +481,7 @@ aqo_qtexts_flush(void)
469481long entries ;
470482
471483dsa_init ();
472- LWLockAcquire (& aqo_state -> qtexts_lock ,LW_SHARED );
484+ LWLockAcquire (& aqo_state -> qtexts_lock ,LW_EXCLUSIVE );
473485
474486if (!aqo_state -> qtexts_changed )
475487/* XXX: mull over forced mode. */
@@ -481,7 +493,9 @@ aqo_qtexts_flush(void)
481493 (void * )& hash_seq );
482494if (ret != 0 )
483495hash_seq_term (& hash_seq );
484- aqo_state -> qtexts_changed = false;
496+ else
497+ /* Hash table and disk storage are now consistent */
498+ aqo_state -> qtexts_changed = false;
485499
486500end :
487501LWLockRelease (& aqo_state -> qtexts_lock );
@@ -531,7 +545,7 @@ aqo_data_flush(void)
531545long entries ;
532546
533547dsa_init ();
534- LWLockAcquire (& aqo_state -> data_lock ,LW_SHARED );
548+ LWLockAcquire (& aqo_state -> data_lock ,LW_EXCLUSIVE );
535549
536550if (!aqo_state -> data_changed )
537551/* XXX: mull over forced mode. */
@@ -548,6 +562,7 @@ aqo_data_flush(void)
548562 */
549563hash_seq_term (& hash_seq );
550564else
565+ /* Hash table and disk storage are now consistent */
551566aqo_state -> data_changed = false;
552567end :
553568LWLockRelease (& aqo_state -> data_lock );
@@ -574,14 +589,22 @@ aqo_queries_flush(void)
574589int ret ;
575590long entries ;
576591
577- LWLockAcquire (& aqo_state -> queries_lock ,LW_SHARED );
592+ LWLockAcquire (& aqo_state -> queries_lock ,LW_EXCLUSIVE );
593+
594+ if (!aqo_state -> queries_changed )
595+ gotoend ;
596+
578597entries = hash_get_num_entries (queries_htab );
579598hash_seq_init (& hash_seq ,queries_htab );
580599ret = data_store (PGAQO_QUERIES_FILE ,_form_queries_record_cb ,entries ,
581600 (void * )& hash_seq );
582601if (ret != 0 )
583602hash_seq_term (& hash_seq );
603+ else
604+ /* Hash table and disk storage are now consistent */
605+ aqo_state -> queries_changed = false;
584606
607+ end :
585608LWLockRelease (& aqo_state -> queries_lock );
586609}
587610
@@ -621,7 +644,8 @@ data_store(const char *filename, form_record_t callback,
621644gotoerror ;
622645}
623646
624- (void )durable_rename (tmpfile ,filename ,LOG );
647+ /* Parallel (re)writing into a file haven't happen. */
648+ (void )durable_rename (tmpfile ,filename ,PANIC );
625649elog (LOG ,"[AQO] %d records stored in file %s." ,counter ,filename );
626650return 0 ;
627651
@@ -839,7 +863,7 @@ aqo_queries_load(void)
839863
840864LWLockAcquire (& aqo_state -> queries_lock ,LW_EXCLUSIVE );
841865
842- /* Load on postmastersturtup . So no any concurrent actions possible here. */
866+ /* Load on postmasterstartup . So no any concurrent actions possible here. */
843867Assert (hash_get_num_entries (queries_htab )== 0 );
844868
845869data_load (PGAQO_QUERIES_FILE ,_deform_queries_record_cb ,NULL );
@@ -926,6 +950,9 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
926950static void
927951on_shmem_shutdown (int code ,Datum arg )
928952{
953+ /*
954+ * XXX: It can be expensive to rewrite a file on each shutdown of a backend.
955+ */
929956aqo_qtexts_flush ();
930957aqo_data_flush ();
931958}
@@ -1201,6 +1228,7 @@ _aqo_data_remove(data_key *key)
12011228
12021229if (hash_search (data_htab ,key ,HASH_REMOVE ,NULL )== NULL )
12031230elog (PANIC ,"[AQO] Inconsistent data hash table" );
1231+
12041232aqo_state -> data_changed = true;
12051233}
12061234
@@ -1270,8 +1298,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
12701298char * ptr ;
12711299ListCell * lc ;
12721300size_t size ;
1273- bool tblOverflow ;
1274- HASHACTION action ;
1301+ bool tblOverflow ;
1302+ HASHACTION action ;
1303+ bool result ;
12751304
12761305Assert (!LWLockHeldByMe (& aqo_state -> data_lock ));
12771306
@@ -1387,8 +1416,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13871416
13881417aqo_state -> data_changed = true;
13891418end :
1419+ result = aqo_state -> data_changed ;
13901420LWLockRelease (& aqo_state -> data_lock );
1391- return aqo_state -> data_changed ;
1421+ return result ;
13921422}
13931423
13941424static void
@@ -1496,7 +1526,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
14961526
14971527dsa_init ();
14981528
1499- LWLockAcquire (& aqo_state -> data_lock ,LW_EXCLUSIVE );
1529+ LWLockAcquire (& aqo_state -> data_lock ,LW_SHARED );
15001530
15011531if (!wideSearch )
15021532{
@@ -1631,7 +1661,8 @@ aqo_data(PG_FUNCTION_ARGS)
16311661ptr += sizeof (data_key );
16321662
16331663if (entry -> cols > 0 )
1634- values [AD_FEATURES ]= PointerGetDatum (form_matrix ((double * )ptr ,entry -> rows ,entry -> cols ));
1664+ values [AD_FEATURES ]= PointerGetDatum (form_matrix ((double * )ptr ,
1665+ entry -> rows ,entry -> cols ));
16351666else
16361667nulls [AD_FEATURES ]= true;
16371668
@@ -1719,7 +1750,9 @@ aqo_data_reset(void)
17191750elog (ERROR ,"[AQO] hash table corrupted" );
17201751num_remove ++ ;
17211752}
1722- aqo_state -> data_changed = true;
1753+
1754+ if (num_remove > 0 )
1755+ aqo_state -> data_changed = true;
17231756LWLockRelease (& aqo_state -> data_lock );
17241757if (num_remove != num_entries )
17251758elog (ERROR ,"[AQO] Query ML memory storage is corrupted or parallel access without a lock has detected." );
@@ -1831,6 +1864,7 @@ aqo_queries_store(uint64 queryid,
18311864entry -> use_aqo = use_aqo ;
18321865entry -> auto_tuning = auto_tuning ;
18331866
1867+ aqo_state -> queries_changed = true;
18341868LWLockRelease (& aqo_state -> queries_lock );
18351869return true;
18361870}
@@ -1856,7 +1890,10 @@ aqo_queries_reset(void)
18561890elog (ERROR ,"[AQO] hash table corrupted" );
18571891num_remove ++ ;
18581892}
1859- aqo_state -> queries_changed = true;
1893+
1894+ if (num_remove > 0 )
1895+ aqo_state -> queries_changed = true;
1896+
18601897LWLockRelease (& aqo_state -> queries_lock );
18611898
18621899if (num_remove != num_entries - 1 )