Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9c543e3

Browse files
author
Alexandra Pervushina
committed
add flush and load functions for neighbours
1 parenta547fab commit9c543e3

File tree

3 files changed

+111
-2
lines changed

3 files changed

+111
-2
lines changed

‎aqo_shared.c‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ aqo_init_shmem(void)
263263
/* Doesn't use DSA, so can be loaded in postmaster */
264264
aqo_stat_load();
265265
aqo_queries_load();
266+
aqo_neighbours_load();
266267
}
267268
}
268269

@@ -281,6 +282,7 @@ on_shmem_shutdown(int code, Datum arg)
281282
*/
282283
aqo_stat_flush();
283284
aqo_queries_flush();
285+
aqo_neighbours_flush();
284286
return;
285287
}
286288

‎storage.c‎

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#definePGAQO_TEXT_FILEPGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_query_texts.stat"
3737
#definePGAQO_DATA_FILEPGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_data.stat"
3838
#definePGAQO_QUERIES_FILEPGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_queries.stat"
39+
#definePGAQO_NEIGHBOURS_FILEPGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_neighbours.stat"
3940

4041
#defineAQO_DATA_COLUMNS(7)
4142
#defineFormVectorSz(v_name)(form_vector((v_name), (v_name ## _size)))
@@ -1393,6 +1394,7 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13931394
if (!found) {
13941395
LWLockAcquire(&aqo_state->neighbours_lock,LW_EXCLUSIVE);
13951396

1397+
elog(NOTICE,"Entering neighbours %d",fss);
13961398
prev= (NeighboursEntry*)hash_search(fss_neighbours,&key.fss,HASH_ENTER,&found);
13971399
if (!found)
13981400
{
@@ -1406,6 +1408,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
14061408
entry->list.prev=prev->data;
14071409
}
14081410
prev->data=entry;
1411+
elog(NOTICE,"Entered neighbours %ld, found %s",prev->fss,found ?"true" :"false");
1412+
prev= (NeighboursEntry*)hash_search(fss_neighbours,&key.fss,HASH_FIND,&found);
1413+
elog(NOTICE,"Entered neighbours check 2 key %ld found %s number of entries %ld",prev->fss,found ?"true" :"false",hash_get_num_entries(fss_neighbours));
14091414

14101415
LWLockRelease(&aqo_state->neighbours_lock);
14111416
}
@@ -1585,8 +1590,10 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
15851590
NeighboursEntry*neighbour_entry;
15861591

15871592
found= false;
1593+
LWLockAcquire(&aqo_state->neighbours_lock,LW_EXCLUSIVE);
15881594
neighbour_entry= (NeighboursEntry*)hash_search(fss_neighbours,&fss,HASH_FIND,&found);
15891595
entry=found ?neighbour_entry->data :NULL;
1596+
elog(NOTICE,"load_aqo_data, find %d, found %d",fss,found ?1 :0);
15901597

15911598
while (entry!=NULL)
15921599
{
@@ -1620,6 +1627,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
16201627

16211628
entry=entry->list.prev;
16221629
}
1630+
LWLockRelease(&aqo_state->neighbours_lock);
16231631
}
16241632

16251633
Assert(!found|| (data->rows>0&&data->rows <=aqo_K));
@@ -1768,15 +1776,18 @@ _aqo_data_clean(uint64 fs)
17681776
/* Fix or remove neighbours htab entry*/
17691777
LWLockAcquire(&aqo_state->neighbours_lock,LW_EXCLUSIVE);
17701778
fss_htab_entry= (NeighboursEntry*)hash_search(fss_neighbours,&entry->key.fss,HASH_FIND,&found);
1779+
elog(NOTICE,"_aqo_data_clean %ld",fss_htab_entry->fss);
17711780
if (found&&fss_htab_entry->data->key.fs==fs)
17721781
{
17731782
if (has_prev)
17741783
{
17751784
fss_htab_entry->data=entry->list.prev;
1785+
elog(NOTICE,"_aqo_data_clean, change %ld",entry->key.fss);
17761786
}
17771787
else
17781788
{
17791789
hash_search(fss_neighbours,&entry->key.fss,HASH_REMOVE,NULL);
1790+
elog(NOTICE,"_aqo_data_clean, find %ld",entry->key.fss);
17801791
}
17811792
}
17821793
LWLockRelease(&aqo_state->neighbours_lock);
@@ -2082,6 +2093,89 @@ aqo_queries_update(PG_FUNCTION_ARGS)
20822093
PG_RETURN_BOOL(true);
20832094
}
20842095

2096+
staticbool
2097+
_deform_neighbours_record_cb(void*data,size_tsize)
2098+
{
2099+
boolfound;
2100+
NeighboursEntry*entry;
2101+
int64fss;
2102+
2103+
Assert(LWLockHeldByMeInMode(&aqo_state->neighbours_lock,LW_EXCLUSIVE));
2104+
Assert(size==sizeof(NeighboursEntry));
2105+
2106+
fss= ((NeighboursEntry*)data)->fss;
2107+
entry= (NeighboursEntry*)hash_search(fss_neighbours,&fss,HASH_ENTER,&found);
2108+
Assert(!found);
2109+
memcpy(entry,data,sizeof(NeighboursEntry));
2110+
return true;
2111+
}
2112+
2113+
void
2114+
aqo_neighbours_load(void)
2115+
{
2116+
elog(NOTICE,"Loading aqo_neighbours");
2117+
Assert(!LWLockHeldByMe(&aqo_state->neighbours_lock));
2118+
2119+
LWLockAcquire(&aqo_state->neighbours_lock,LW_EXCLUSIVE);
2120+
2121+
if (hash_get_num_entries(fss_neighbours)!=0)
2122+
{
2123+
/* Someone have done it concurrently. */
2124+
elog(LOG,"[AQO] Another backend have loaded neighbours data concurrently.");
2125+
LWLockRelease(&aqo_state->neighbours_lock);
2126+
return;
2127+
}
2128+
2129+
data_load(PGAQO_NEIGHBOURS_FILE,_deform_neighbours_record_cb,NULL);
2130+
2131+
aqo_state->neighbours_changed= false;/* mem data is consistent with disk */
2132+
LWLockRelease(&aqo_state->neighbours_lock);
2133+
}
2134+
2135+
staticvoid*
2136+
_form_neighbours_record_cb(void*ctx,size_t*size)
2137+
{
2138+
HASH_SEQ_STATUS*hash_seq= (HASH_SEQ_STATUS*)ctx;
2139+
NeighboursEntry*entry;
2140+
2141+
*size=sizeof(NeighboursEntry);
2142+
entry=hash_seq_search(hash_seq);
2143+
if (entry==NULL)
2144+
returnNULL;
2145+
2146+
returnmemcpy(palloc(*size),entry,*size);
2147+
}
2148+
2149+
2150+
void
2151+
aqo_neighbours_flush(void)
2152+
{
2153+
HASH_SEQ_STATUShash_seq;
2154+
intret;
2155+
longentries;
2156+
2157+
elog(NOTICE,"Flushing aqo_neighbours");
2158+
2159+
LWLockAcquire(&aqo_state->neighbours_lock,LW_EXCLUSIVE);
2160+
2161+
if (!aqo_state->neighbours_changed)
2162+
gotoend;
2163+
2164+
entries=hash_get_num_entries(fss_neighbours);
2165+
hash_seq_init(&hash_seq,fss_neighbours);
2166+
ret=data_store(PGAQO_NEIGHBOURS_FILE,_form_neighbours_record_cb,entries,
2167+
(void*)&hash_seq);
2168+
if (ret!=0)
2169+
hash_seq_term(&hash_seq);
2170+
else
2171+
/* Hash table and disk storage are now consistent */
2172+
aqo_state->neighbours_changed= false;
2173+
2174+
end:
2175+
LWLockRelease(&aqo_state->neighbours_lock);
2176+
}
2177+
2178+
20852179
staticlong
20862180
aqo_neighbours_reset(void)
20872181
{
@@ -2093,8 +2187,11 @@ aqo_neighbours_reset(void)
20932187
LWLockAcquire(&aqo_state->neighbours_lock,LW_EXCLUSIVE);
20942188
num_entries=hash_get_num_entries(fss_neighbours);
20952189
hash_seq_init(&hash_seq,fss_neighbours);
2190+
elog(NOTICE,"fss_neighbours num entries: %ld",num_entries);
20962191
while ((entry=hash_seq_search(&hash_seq))!=NULL)
20972192
{
2193+
elog(NOTICE,"delete %ld",entry->fss);
2194+
20982195
if (hash_search(fss_neighbours,&entry->fss,HASH_REMOVE,NULL)==NULL)
20992196
elog(ERROR,"[AQO] hash table corrupted");
21002197
num_remove++;
@@ -2105,8 +2202,10 @@ aqo_neighbours_reset(void)
21052202

21062203
LWLockRelease(&aqo_state->neighbours_lock);
21072204

2108-
if (num_remove!=num_entries)
2109-
elog(ERROR,"[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected.");
2205+
// if (num_remove != num_entries)
2206+
// elog(ERROR, "[AQO] Neighbours memory storage is corrupted or parallel access without a lock has detected.");
2207+
2208+
aqo_neighbours_flush();
21102209

21112210
returnnum_remove;
21122211
}
@@ -2251,15 +2350,18 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
22512350
/* Fix or remove neighbours htab entry*/
22522351
LWLockAcquire(&aqo_state->neighbours_lock,LW_EXCLUSIVE);
22532352
fss_htab_entry= (NeighboursEntry*)hash_search(fss_neighbours,&key.fss,HASH_FIND,&found);
2353+
elog(NOTICE,"aqo_cleanup, find %ld",key.fss);
22542354
if (found&&fss_htab_entry->data->key.fs==key.fs)
22552355
{
22562356
if (has_prev)
22572357
{
22582358
fss_htab_entry->data=entry->list.prev;
2359+
elog(NOTICE,"aqo_cleanup, change %ld",key.fss);
22592360
}
22602361
else
22612362
{
22622363
hash_search(fss_neighbours,&key.fss,HASH_REMOVE,NULL);
2364+
elog(NOTICE,"aqo_cleanup, remove %ld",key.fss);
22632365
}
22642366
}
22652367
LWLockRelease(&aqo_state->neighbours_lock);
@@ -2294,6 +2396,7 @@ cleanup_aqo_database(bool gentle, int *fs_num, int *fss_num)
22942396
aqo_data_flush();
22952397
aqo_qtexts_flush();
22962398
aqo_queries_flush();
2399+
aqo_neighbours_flush();
22972400
}
22982401

22992402
Datum
@@ -2376,6 +2479,7 @@ aqo_drop_class(PG_FUNCTION_ARGS)
23762479
aqo_data_flush();
23772480
aqo_qtexts_flush();
23782481
aqo_queries_flush();
2482+
aqo_neighbours_flush();
23792483

23802484
PG_RETURN_INT32(cnt);
23812485
}

‎storage.h‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ extern bool aqo_queries_store(uint64 queryid, uint64 fs, bool learn_aqo,
128128
externvoidaqo_queries_flush(void);
129129
externvoidaqo_queries_load(void);
130130

131+
externvoidaqo_neighbours_flush(void);
132+
externvoidaqo_neighbours_load(void);
133+
131134
/*
132135
* Machinery for deactivated queries cache.
133136
* TODO: Should live in a custom memory context

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp