3636#define PGAQO_TEXT_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_query_texts.stat"
3737#define PGAQO_DATA_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_data.stat"
3838#define PGAQO_QUERIES_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_queries.stat"
39+ #define PGAQO_NEIGHBOURS_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_neighbours.stat"
3940
4041#define AQO_DATA_COLUMNS (7)
4142#define FormVectorSz (v_name )(form_vector((v_name), (v_name ## _size)))
@@ -1393,6 +1394,7 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13931394if (!found ) {
13941395LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
13951396
1397+ elog (NOTICE ,"Entering neighbours %d" ,fss );
13961398prev = (NeighboursEntry * )hash_search (fss_neighbours ,& key .fss ,HASH_ENTER ,& found );
13971399if (!found )
13981400{
@@ -1406,6 +1408,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
14061408entry -> list .prev = prev -> data ;
14071409}
14081410prev -> data = entry ;
1411+ elog (NOTICE ,"Entered neighbours %ld, found %s" ,prev -> fss ,found ?"true" :"false" );
1412+ prev = (NeighboursEntry * )hash_search (fss_neighbours ,& key .fss ,HASH_FIND ,& found );
1413+ elog (NOTICE ,"Entered neighbours check 2 key %ld found %s number of entries %ld" ,prev -> fss ,found ?"true" :"false" ,hash_get_num_entries (fss_neighbours ));
14091414
14101415LWLockRelease (& aqo_state -> neighbours_lock );
14111416}
@@ -1585,8 +1590,10 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
15851590NeighboursEntry * neighbour_entry ;
15861591
15871592found = false;
1593+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
15881594neighbour_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& fss ,HASH_FIND ,& found );
15891595entry = found ?neighbour_entry -> data :NULL ;
1596+ elog (NOTICE ,"load_aqo_data, find %d, found %d" ,fss ,found ?1 :0 );
15901597
15911598while (entry != NULL )
15921599{
@@ -1620,6 +1627,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
16201627
16211628entry = entry -> list .prev ;
16221629}
1630+ LWLockRelease (& aqo_state -> neighbours_lock );
16231631}
16241632
16251633Assert (!found || (data -> rows > 0 && data -> rows <=aqo_K ));
@@ -1768,15 +1776,18 @@ _aqo_data_clean(uint64 fs)
17681776/* Fix or remove neighbours htab entry*/
17691777LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
17701778fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& entry -> key .fss ,HASH_FIND ,& found );
1779+ elog (NOTICE ,"_aqo_data_clean %ld" ,fss_htab_entry -> fss );
17711780if (found && fss_htab_entry -> data -> key .fs == fs )
17721781{
17731782if (has_prev )
17741783{
17751784fss_htab_entry -> data = entry -> list .prev ;
1785+ elog (NOTICE ,"_aqo_data_clean, change %ld" ,entry -> key .fss );
17761786}
17771787else
17781788{
17791789hash_search (fss_neighbours ,& entry -> key .fss ,HASH_REMOVE ,NULL );
1790+ elog (NOTICE ,"_aqo_data_clean, find %ld" ,entry -> key .fss );
17801791}
17811792}
17821793LWLockRelease (& aqo_state -> neighbours_lock );
@@ -2082,6 +2093,89 @@ aqo_queries_update(PG_FUNCTION_ARGS)
20822093PG_RETURN_BOOL (true);
20832094}
20842095
2096+ static bool
2097+ _deform_neighbours_record_cb (void * data ,size_t size )
2098+ {
2099+ bool found ;
2100+ NeighboursEntry * entry ;
2101+ int64 fss ;
2102+
2103+ Assert (LWLockHeldByMeInMode (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE ));
2104+ Assert (size == sizeof (NeighboursEntry ));
2105+
2106+ fss = ((NeighboursEntry * )data )-> fss ;
2107+ entry = (NeighboursEntry * )hash_search (fss_neighbours ,& fss ,HASH_ENTER ,& found );
2108+ Assert (!found );
2109+ memcpy (entry ,data ,sizeof (NeighboursEntry ));
2110+ return true;
2111+ }
2112+
2113+ void
2114+ aqo_neighbours_load (void )
2115+ {
2116+ elog (NOTICE ,"Loading aqo_neighbours" );
2117+ Assert (!LWLockHeldByMe (& aqo_state -> neighbours_lock ));
2118+
2119+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
2120+
2121+ if (hash_get_num_entries (fss_neighbours )!= 0 )
2122+ {
2123+ /* Someone have done it concurrently. */
2124+ elog (LOG ,"[AQO] Another backend have loaded neighbours data concurrently." );
2125+ LWLockRelease (& aqo_state -> neighbours_lock );
2126+ return ;
2127+ }
2128+
2129+ data_load (PGAQO_NEIGHBOURS_FILE ,_deform_neighbours_record_cb ,NULL );
2130+
2131+ aqo_state -> neighbours_changed = false;/* mem data is consistent with disk */
2132+ LWLockRelease (& aqo_state -> neighbours_lock );
2133+ }
2134+
2135+ static void *
2136+ _form_neighbours_record_cb (void * ctx ,size_t * size )
2137+ {
2138+ HASH_SEQ_STATUS * hash_seq = (HASH_SEQ_STATUS * )ctx ;
2139+ NeighboursEntry * entry ;
2140+
2141+ * size = sizeof (NeighboursEntry );
2142+ entry = hash_seq_search (hash_seq );
2143+ if (entry == NULL )
2144+ return NULL ;
2145+
2146+ return memcpy (palloc (* size ),entry ,* size );
2147+ }
2148+
2149+
2150+ void
2151+ aqo_neighbours_flush (void )
2152+ {
2153+ HASH_SEQ_STATUS hash_seq ;
2154+ int ret ;
2155+ long entries ;
2156+
2157+ elog (NOTICE ,"Flushing aqo_neighbours" );
2158+
2159+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
2160+
2161+ if (!aqo_state -> neighbours_changed )
2162+ gotoend ;
2163+
2164+ entries = hash_get_num_entries (fss_neighbours );
2165+ hash_seq_init (& hash_seq ,fss_neighbours );
2166+ ret = data_store (PGAQO_NEIGHBOURS_FILE ,_form_neighbours_record_cb ,entries ,
2167+ (void * )& hash_seq );
2168+ if (ret != 0 )
2169+ hash_seq_term (& hash_seq );
2170+ else
2171+ /* Hash table and disk storage are now consistent */
2172+ aqo_state -> neighbours_changed = false;
2173+
2174+ end :
2175+ LWLockRelease (& aqo_state -> neighbours_lock );
2176+ }
2177+
2178+
20852179static long
20862180aqo_neighbours_reset (void )
20872181{
@@ -2093,8 +2187,11 @@ aqo_neighbours_reset(void)
20932187LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
20942188num_entries = hash_get_num_entries (fss_neighbours );
20952189hash_seq_init (& hash_seq ,fss_neighbours );
2190+ elog (NOTICE ,"fss_neighbours num entries: %ld" ,num_entries );
20962191while ((entry = hash_seq_search (& hash_seq ))!= NULL )
20972192{
2193+ elog (NOTICE ,"delete %ld" ,entry -> fss );
2194+
20982195if (hash_search (fss_neighbours ,& entry -> fss ,HASH_REMOVE ,NULL )== NULL )
20992196elog (ERROR ,"[AQO] hash table corrupted" );
21002197num_remove ++ ;
@@ -2105,8 +2202,10 @@ aqo_neighbours_reset(void)
21052202
21062203LWLockRelease (& aqo_state -> neighbours_lock );
21072204
2108- if (num_remove != num_entries )
2109- elog (ERROR ,"[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected." );
2205+ // if (num_remove != num_entries)
2206+ // elog(ERROR, "[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected.");
2207+
2208+ aqo_neighbours_flush ();
21102209
21112210return num_remove ;
21122211}
@@ -2251,15 +2350,18 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
22512350/* Fix or remove neighbours htab entry*/
22522351LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
22532352fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& key .fss ,HASH_FIND ,& found );
2353+ elog (NOTICE ,"aqo_cleanup, find %ld" ,key .fss );
22542354if (found && fss_htab_entry -> data -> key .fs == key .fs )
22552355{
22562356if (has_prev )
22572357{
22582358fss_htab_entry -> data = entry -> list .prev ;
2359+ elog (NOTICE ,"aqo_cleanup, change %ld" ,key .fss );
22592360}
22602361else
22612362{
22622363hash_search (fss_neighbours ,& key .fss ,HASH_REMOVE ,NULL );
2364+ elog (NOTICE ,"aqo_cleanup, remove %ld" ,key .fss );
22632365}
22642366}
22652367LWLockRelease (& aqo_state -> neighbours_lock );
@@ -2294,6 +2396,7 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
22942396aqo_data_flush ();
22952397aqo_qtexts_flush ();
22962398aqo_queries_flush ();
2399+ aqo_neighbours_flush ();
22972400}
22982401
22992402Datum
@@ -2376,6 +2479,7 @@ aqo_drop_class(PG_FUNCTION_ARGS)
23762479aqo_data_flush ();
23772480aqo_qtexts_flush ();
23782481aqo_queries_flush ();
2482+ aqo_neighbours_flush ();
23792483
23802484PG_RETURN_INT32 (cnt );
23812485}