@@ -609,6 +609,7 @@ aqo_queries_flush(void)
609609LWLockRelease (& aqo_state -> queries_lock );
610610}
611611
612+
612613static int
613614data_store (const char * filename ,form_record_t callback ,
614615long nrecs ,void * ctx )
@@ -1578,7 +1579,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
15781579build_knn_matrix (data ,temp_data );
15791580}
15801581else
1581- /* Iterate acrossall elements of the table. XXX: Maybe slow . */
1582+ /* Iterate acrossfss neighbours . */
15821583{
15831584int noids = -1 ;
15841585NeighboursEntry * neighbour_entry ;
@@ -1741,14 +1742,47 @@ _aqo_data_clean(uint64 fs)
17411742hash_seq_init (& hash_seq ,data_htab );
17421743while ((entry = hash_seq_search (& hash_seq ))!= NULL )
17431744{
1745+ bool found ;
1746+ bool has_prev = false;
1747+ bool has_next = false;
1748+ NeighboursEntry * fss_htab_entry ;
1749+
17441750if (entry -> key .fs != fs )
17451751continue ;
17461752
17471753Assert (DsaPointerIsValid (entry -> data_dp ));
17481754dsa_free (data_dsa ,entry -> data_dp );
17491755entry -> data_dp = InvalidDsaPointer ;
1756+
1757+ /* fix fs list */
1758+ if (entry -> list .next )
1759+ has_next = true;
1760+ if (entry -> list .prev )
1761+ has_prev = true;
1762+
1763+ if (has_prev )
1764+ entry -> list .prev -> list .next = has_next ?entry -> list .next :NULL ;
1765+ if (has_next )
1766+ entry -> list .next -> list .prev = has_prev ?entry -> list .prev :NULL ;
1767+
1768+ /* Fix or remove neighbours htab entry*/
1769+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
1770+ fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& entry -> key .fss ,HASH_FIND ,& found );
1771+ if (found && fss_htab_entry -> data -> key .fs == fs )
1772+ {
1773+ if (has_prev )
1774+ {
1775+ fss_htab_entry -> data = entry -> list .prev ;
1776+ }
1777+ else
1778+ {
1779+ hash_search (fss_neighbours ,& entry -> key .fss ,HASH_REMOVE ,NULL );
1780+ }
1781+ }
1782+ LWLockRelease (& aqo_state -> neighbours_lock );
1783+
17501784if (!hash_search (data_htab ,& entry -> key ,HASH_REMOVE ,NULL ))
1751- elog (PANIC ,"[AQO] hash table corrupted" );
1785+ elog (ERROR ,"[AQO] hash table corrupted" );
17521786removed ++ ;
17531787}
17541788
@@ -2048,6 +2082,32 @@ aqo_queries_update(PG_FUNCTION_ARGS)
20482082PG_RETURN_BOOL (true);
20492083}
20502084
2085+ static long
2086+ aqo_neighbours_reset (void )
2087+ {
2088+ HASH_SEQ_STATUS hash_seq ;
2089+ NeighboursEntry * entry ;
2090+ long num_remove = 0 ;
2091+ long num_entries ;
2092+
2093+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
2094+ num_entries = hash_get_num_entries (fss_neighbours );
2095+ hash_seq_init (& hash_seq ,fss_neighbours );
2096+ while ((entry = hash_seq_search (& hash_seq ))!= NULL )
2097+ {
2098+ if (hash_search (fss_neighbours ,& entry -> fss ,HASH_REMOVE ,NULL )== NULL )
2099+ elog (ERROR ,"[AQO] hash table corrupted" );
2100+ num_remove ++ ;
2101+ }
2102+ aqo_state -> neighbours_changed = true;
2103+ LWLockRelease (& aqo_state -> neighbours_lock );
2104+
2105+ if (num_remove != num_entries )
2106+ elog (ERROR ,"[AQO] Neighbour memory storage is corrupted or parallel access without a lock was detected." );
2107+
2108+ return num_remove ;
2109+ }
2110+
20512111Datum
20522112aqo_reset (PG_FUNCTION_ARGS )
20532113{
@@ -2057,6 +2117,7 @@ aqo_reset(PG_FUNCTION_ARGS)
20572117counter += aqo_qtexts_reset ();
20582118counter += aqo_data_reset ();
20592119counter += aqo_queries_reset ();
2120+ counter += aqo_neighbours_reset ();
20602121PG_RETURN_INT64 (counter );
20612122}
20622123
@@ -2183,21 +2244,25 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
21832244entry -> list .prev -> list .next = has_next ?entry -> list .next :NULL ;
21842245if (has_next )
21852246entry -> list .next -> list .prev = has_prev ?entry -> list .prev :NULL ;
2186- }
21872247
2188- /* Fix or remove neighbours htab entry*/
2189- fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& key .fss ,HASH_FIND ,& found );
2190- if (found && fss_htab_entry -> data -> key .fs == key .fs )
2191- {
2192- if (has_prev )
2248+ /* Fix or remove neighbours htab entry*/
2249+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
2250+ fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& key .fss ,HASH_FIND ,& found );
2251+ if (found && fss_htab_entry -> data -> key .fs == key .fs )
21932252{
2194- fss_htab_entry -> data = entry -> list .prev ;
2195- }
2196- else
2197- {
2198- hash_search (fss_neighbours ,& key .fss ,HASH_REMOVE ,NULL );
2253+ if (has_prev )
2254+ {
2255+ fss_htab_entry -> data = entry -> list .prev ;
2256+ }
2257+ else
2258+ {
2259+ hash_search (fss_neighbours ,& key .fss ,HASH_REMOVE ,NULL );
2260+ }
21992261}
2262+ LWLockRelease (& aqo_state -> neighbours_lock );
22002263}
2264+
2265+
22012266(* fss_num )+= (int )_aqo_data_remove (& key );
22022267}
22032268