@@ -609,6 +609,7 @@ aqo_queries_flush(void)
609609LWLockRelease (& aqo_state -> queries_lock );
610610}
611611
612+
612613static int
613614data_store (const char * filename ,form_record_t callback ,
614615long nrecs ,void * ctx )
@@ -1578,7 +1579,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
15781579build_knn_matrix (data ,temp_data );
15791580}
15801581else
1581- /* Iterate acrossall elements of the table. XXX: Maybe slow . */
1582+ /* Iterate acrossfss neighbours . */
15821583{
15831584int noids = -1 ;
15841585NeighboursEntry * neighbour_entry ;
@@ -1741,12 +1742,46 @@ _aqo_data_clean(uint64 fs)
17411742hash_seq_init (& hash_seq ,data_htab );
17421743while ((entry = hash_seq_search (& hash_seq ))!= NULL )
17431744{
1745+ bool found ;
1746+ bool has_prev = false;
1747+ bool has_next = false;
1748+ NeighboursEntry * fss_htab_entry ;
1749+
17441750if (entry -> key .fs != fs )
17451751continue ;
17461752
17471753Assert (DsaPointerIsValid (entry -> data_dp ));
17481754dsa_free (data_dsa ,entry -> data_dp );
17491755entry -> data_dp = InvalidDsaPointer ;
1756+
1757+ /* fix fs list */
1758+ if (entry -> list .next )
1759+ has_next = true;
1760+ if (entry -> list .prev )
1761+ has_prev = true;
1762+
1763+ if (has_prev )
1764+ entry -> list .prev -> list .next = has_next ?entry -> list .next :NULL ;
1765+ if (has_next )
1766+ entry -> list .next -> list .prev = has_prev ?entry -> list .prev :NULL ;
1767+
1768+ /* Fix or remove neighbours htab entry*/
1769+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
1770+ fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& entry -> key .fss ,HASH_FIND ,& found );
1771+ if (found && fss_htab_entry -> data -> key .fs == fs )
1772+ {
1773+ if (has_prev )
1774+ {
1775+ fss_htab_entry -> data = entry -> list .prev ;
1776+ }
1777+ else
1778+ {
1779+ hash_search (fss_neighbours ,& entry -> key .fss ,HASH_REMOVE ,NULL );
1780+ }
1781+ }
1782+ LWLockRelease (& aqo_state -> neighbours_lock );
1783+
1784+
17501785if (hash_search (data_htab ,& entry -> key ,HASH_REMOVE ,NULL )== NULL )
17511786elog (ERROR ,"[AQO] hash table corrupted" );
17521787removed ++ ;
@@ -2048,6 +2083,32 @@ aqo_queries_update(PG_FUNCTION_ARGS)
20482083PG_RETURN_BOOL (true);
20492084}
20502085
2086+ static long
2087+ aqo_neighbours_reset (void )
2088+ {
2089+ HASH_SEQ_STATUS hash_seq ;
2090+ NeighboursEntry * entry ;
2091+ long num_remove = 0 ;
2092+ long num_entries ;
2093+
2094+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
2095+ num_entries = hash_get_num_entries (fss_neighbours );
2096+ hash_seq_init (& hash_seq ,fss_neighbours );
2097+ while ((entry = hash_seq_search (& hash_seq ))!= NULL )
2098+ {
2099+ if (hash_search (fss_neighbours ,& entry -> fss ,HASH_REMOVE ,NULL )== NULL )
2100+ elog (ERROR ,"[AQO] hash table corrupted" );
2101+ num_remove ++ ;
2102+ }
2103+ aqo_state -> neighbours_changed = true;
2104+ LWLockRelease (& aqo_state -> neighbours_lock );
2105+
2106+ if (num_remove != num_entries )
2107+ elog (ERROR ,"[AQO] Neighbour memory storage is corrupted or parallel access without a lock was detected." );
2108+
2109+ return num_remove ;
2110+ }
2111+
20512112Datum
20522113aqo_reset (PG_FUNCTION_ARGS )
20532114{
@@ -2057,6 +2118,7 @@ aqo_reset(PG_FUNCTION_ARGS)
20572118counter += aqo_qtexts_reset ();
20582119counter += aqo_data_reset ();
20592120counter += aqo_queries_reset ();
2121+ counter += aqo_neighbours_reset ();
20602122PG_RETURN_INT64 (counter );
20612123}
20622124
@@ -2183,21 +2245,25 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
21832245entry -> list .prev -> list .next = has_next ?entry -> list .next :NULL ;
21842246if (has_next )
21852247entry -> list .next -> list .prev = has_prev ?entry -> list .prev :NULL ;
2186- }
21872248
2188- /* Fix or remove neighbours htab entry*/
2189- fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& key .fss ,HASH_FIND ,& found );
2190- if (found && fss_htab_entry -> data -> key .fs == key .fs )
2191- {
2192- if (has_prev )
2249+ /* Fix or remove neighbours htab entry*/
2250+ LWLockAcquire (& aqo_state -> neighbours_lock ,LW_EXCLUSIVE );
2251+ fss_htab_entry = (NeighboursEntry * )hash_search (fss_neighbours ,& key .fss ,HASH_FIND ,& found );
2252+ if (found && fss_htab_entry -> data -> key .fs == key .fs )
21932253{
2194- fss_htab_entry -> data = entry -> list .prev ;
2195- }
2196- else
2197- {
2198- hash_search (fss_neighbours ,& key .fss ,HASH_REMOVE ,NULL );
2254+ if (has_prev )
2255+ {
2256+ fss_htab_entry -> data = entry -> list .prev ;
2257+ }
2258+ else
2259+ {
2260+ hash_search (fss_neighbours ,& key .fss ,HASH_REMOVE ,NULL );
2261+ }
21992262}
2263+ LWLockRelease (& aqo_state -> neighbours_lock );
22002264}
2265+
2266+
22012267(* fss_num )+= (int )_aqo_data_remove (& key );
22022268}
22032269