Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitad4dd06

Browse files
committed
Add general limit on DSM memory which can be allocated by the AQO extension to
store learning data. Also, use common DSA area to place data and query texts.Default limit on DSM memory is 100 MB.TODO: remove meaningless dsa variables.
1 parent3bd2bb3 commitad4dd06

File tree

5 files changed

+132
-31
lines changed

5 files changed

+132
-31
lines changed

‎aqo.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ _PG_init(void)
231231
"Max number of feature spaces that AQO can operate with.",
232232
NULL,
233233
&fs_max_items,
234-
1000,
234+
10000,
235235
1,INT_MAX,
236236
PGC_SUSET,
237237
0,
@@ -244,7 +244,7 @@ _PG_init(void)
244244
"Max number of feature subspaces that AQO can operate with.",
245245
NULL,
246246
&fss_max_items,
247-
1000,
247+
100000,
248248
0,INT_MAX,
249249
PGC_SUSET,
250250
0,
@@ -266,6 +266,19 @@ _PG_init(void)
266266
NULL
267267
);
268268

269+
DefineCustomIntVariable("aqo.dsm_size_max",
270+
"Maximum size of dynamic shared memory which AQO could allocate to store learning data.",
271+
NULL,
272+
&dsm_size_max,
273+
100,
274+
0,INT_MAX,
275+
PGC_SUSET,
276+
0,
277+
NULL,
278+
NULL,
279+
NULL
280+
);
281+
269282
prev_shmem_startup_hook=shmem_startup_hook;
270283
shmem_startup_hook=aqo_init_shmem;
271284
prev_planner_hook=planner_hook;

‎aqo_shared.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ aqo_init_shmem(void)
198198
aqo_state->qtext_trancheid=LWLockNewTrancheId();
199199
aqo_state->qtexts_changed= false;
200200
aqo_state->data_dsa_handler=DSM_HANDLE_INVALID;
201-
aqo_state->data_trancheid=LWLockNewTrancheId();
202201
aqo_state->data_changed= false;
203202
aqo_state->queries_changed= false;
204203

@@ -244,7 +243,6 @@ aqo_init_shmem(void)
244243
LWLockRegisterTranche(aqo_state->qtexts_lock.tranche,"AQO QTexts Lock Tranche");
245244
LWLockRegisterTranche(aqo_state->qtext_trancheid,"AQO Query Texts Tranche");
246245
LWLockRegisterTranche(aqo_state->data_lock.tranche,"AQO Data Lock Tranche");
247-
LWLockRegisterTranche(aqo_state->data_trancheid,"AQO Data Tranche");
248246
LWLockRegisterTranche(aqo_state->queries_lock.tranche,"AQO Queries Lock Tranche");
249247

250248
if (!IsUnderPostmaster)

‎aqo_shared.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ typedef struct AQOSharedState
3838

3939
LWLockdata_lock;/* Lock for shared fields below */
4040
dsa_handledata_dsa_handler;
41-
intdata_trancheid;
4241
booldata_changed;
4342

4443
LWLockqueries_lock;/* lock for access to queries storage */
@@ -52,7 +51,6 @@ extern HTAB *fss_htab;
5251

5352
externintfs_max_items;/* Max number of feature spaces that AQO can operate */
5453
externintfss_max_items;
55-
externintquerytext_max_size;
5654

5755
externSizeaqo_memsize(void);
5856
externvoidreset_dsm_cache(void);

‎storage.c

Lines changed: 116 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ typedef enum {
6161
}aqo_queries_cols;
6262

6363
typedefvoid* (*form_record_t) (void*ctx,size_t*size);
64-
typedefvoid (*deform_record_t) (void*data,size_tsize);
64+
typedefbool (*deform_record_t) (void*data,size_tsize);
6565

6666

6767
intquerytext_max_size=1000;
68+
intdsm_size_max=100;/* in MB */
6869

6970
HTAB*stat_htab=NULL;
7071
HTAB*queries_htab=NULL;
@@ -642,7 +643,7 @@ data_store(const char *filename, form_record_t callback,
642643
return-1;
643644
}
644645

645-
staticvoid
646+
staticbool
646647
_deform_stat_record_cb(void*data,size_tsize)
647648
{
648649
boolfound;
@@ -656,24 +657,35 @@ _deform_stat_record_cb(void *data, size_t size)
656657
entry= (StatEntry*)hash_search(stat_htab,&queryid,HASH_ENTER,&found);
657658
Assert(!found);
658659
memcpy(entry,data,sizeof(StatEntry));
660+
return true;
659661
}
660662

661663
void
662664
aqo_stat_load(void)
663665
{
664-
longentries;
665-
666666
Assert(!LWLockHeldByMe(&aqo_state->stat_lock));
667667

668668
LWLockAcquire(&aqo_state->stat_lock,LW_EXCLUSIVE);
669-
entries=hash_get_num_entries(stat_htab);
670-
Assert(entries==0);
669+
670+
/* Load on postmaster sturtup. So no any concurrent actions possible here. */
671+
Assert(hash_get_num_entries(stat_htab)==0);
672+
671673
data_load(PGAQO_STAT_FILE,_deform_stat_record_cb,NULL);
672674

673675
LWLockRelease(&aqo_state->stat_lock);
674676
}
675677

676-
staticvoid
678+
staticbool
679+
_check_dsa_validity(dsa_pointerptr)
680+
{
681+
if (DsaPointerIsValid(ptr))
682+
return true;
683+
684+
elog(LOG,"[AQO] DSA Pointer isn't valid. Is the memory limit exceeded?");
685+
return false;
686+
}
687+
688+
staticbool
677689
_deform_qtexts_record_cb(void*data,size_tsize)
678690
{
679691
boolfound;
@@ -690,9 +702,19 @@ _deform_qtexts_record_cb(void *data, size_t size)
690702
Assert(!found);
691703

692704
entry->qtext_dp=dsa_allocate(qtext_dsa,len);
693-
Assert(DsaPointerIsValid(entry->qtext_dp));
705+
if (!_check_dsa_validity(entry->qtext_dp))
706+
{
707+
/*
708+
* DSA stuck into problems. Rollback changes. Return false in belief
709+
* that caller recognize it and don't try to call us more.
710+
*/
711+
(void)hash_search(qtexts_htab,&queryid,HASH_REMOVE,NULL);
712+
return false;
713+
}
714+
694715
strptr= (char*)dsa_get_address(qtext_dsa,entry->qtext_dp);
695716
strlcpy(strptr,query_string,len);
717+
return true;
696718
}
697719

698720
void
@@ -705,7 +727,15 @@ aqo_qtexts_load(void)
705727
Assert(qtext_dsa!=NULL);
706728

707729
LWLockAcquire(&aqo_state->qtexts_lock,LW_EXCLUSIVE);
708-
Assert(hash_get_num_entries(qtexts_htab)==0);
730+
731+
if (hash_get_num_entries(qtexts_htab)!=0)
732+
{
733+
/* Someone have done it concurrently. */
734+
elog(LOG,"[AQO] Another backend have loaded query texts concurrently.");
735+
LWLockRelease(&aqo_state->qtexts_lock);
736+
return;
737+
}
738+
709739
data_load(PGAQO_TEXT_FILE,_deform_qtexts_record_cb,NULL);
710740

711741
/* Check existence of default feature space */
@@ -725,7 +755,7 @@ aqo_qtexts_load(void)
725755
* Getting a data chunk from a caller, add a record into the 'ML data'
726756
* shmem hash table. Allocate and fill DSA chunk for variadic part of the data.
727757
*/
728-
staticvoid
758+
staticbool
729759
_deform_data_record_cb(void*data,size_tsize)
730760
{
731761
boolfound;
@@ -737,7 +767,7 @@ _deform_data_record_cb(void *data, size_t size)
737767

738768
Assert(LWLockHeldByMeInMode(&aqo_state->data_lock,LW_EXCLUSIVE));
739769
entry= (DataEntry*)hash_search(data_htab,&fentry->key,
740-
HASH_ENTER,&found);
770+
HASH_ENTER,&found);
741771
Assert(!found);
742772

743773
/* Copy fixed-size part of entry byte-by-byte even with caves */
@@ -747,9 +777,20 @@ _deform_data_record_cb(void *data, size_t size)
747777
sz=_compute_data_dsa(entry);
748778
Assert(sz+ offsetof(DataEntry,data_dp)==size);
749779
entry->data_dp=dsa_allocate(data_dsa,sz);
750-
Assert(DsaPointerIsValid(entry->data_dp));
780+
781+
if (!_check_dsa_validity(entry->data_dp))
782+
{
783+
/*
784+
* DSA stuck into problems. Rollback changes. Return false in belief
785+
* that caller recognize it and don't try to call us more.
786+
*/
787+
(void)hash_search(data_htab,&fentry->key,HASH_REMOVE,NULL);
788+
return false;
789+
}
790+
751791
dsa_ptr= (char*)dsa_get_address(data_dsa,entry->data_dp);
752792
memcpy(dsa_ptr,ptr,sz);
793+
return true;
753794
}
754795

755796
void
@@ -759,14 +800,22 @@ aqo_data_load(void)
759800
Assert(data_dsa!=NULL);
760801

761802
LWLockAcquire(&aqo_state->data_lock,LW_EXCLUSIVE);
762-
Assert(hash_get_num_entries(data_htab)==0);
803+
804+
if (hash_get_num_entries(data_htab)!=0)
805+
{
806+
/* Someone have done it concurrently. */
807+
elog(LOG,"[AQO] Another backend have loaded query data concurrently.");
808+
LWLockRelease(&aqo_state->data_lock);
809+
return;
810+
}
811+
763812
data_load(PGAQO_DATA_FILE,_deform_data_record_cb,NULL);
764813

765814
aqo_state->data_changed= false;/* mem data is consistent with disk */
766815
LWLockRelease(&aqo_state->data_lock);
767816
}
768817

769-
staticvoid
818+
staticbool
770819
_deform_queries_record_cb(void*data,size_tsize)
771820
{
772821
boolfound;
@@ -780,20 +829,22 @@ _deform_queries_record_cb(void *data, size_t size)
780829
entry= (QueriesEntry*)hash_search(queries_htab,&queryid,HASH_ENTER,&found);
781830
Assert(!found);
782831
memcpy(entry,data,sizeof(QueriesEntry));
832+
return true;
783833
}
784834

785835
void
786836
aqo_queries_load(void)
787837
{
788-
longentries;
789838
boolfound;
790839
uint64queryid=0;
791840

792841
Assert(!LWLockHeldByMe(&aqo_state->queries_lock));
793842

794843
LWLockAcquire(&aqo_state->queries_lock,LW_EXCLUSIVE);
795-
entries=hash_get_num_entries(queries_htab);
796-
Assert(entries==0);
844+
845+
/* Load on postmaster sturtup. So no any concurrent actions possible here. */
846+
Assert(hash_get_num_entries(queries_htab)==0);
847+
797848
data_load(PGAQO_QUERIES_FILE,_deform_queries_record_cb,NULL);
798849

799850
/* Check existence of default feature space */
@@ -836,14 +887,23 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
836887
{
837888
void*data;
838889
size_tsize;
890+
boolres;
839891

840892
if (fread(&size,sizeof(size),1,file)!=1)
841893
gotoread_error;
842894
data=palloc(size);
843895
if (fread(data,size,1,file)!=1)
844896
gotoread_error;
845-
callback(data,size);
897+
res=callback(data,size);
846898
pfree(data);
899+
900+
if (!res)
901+
{
902+
/* Error detected. Do not try to read tails of the storage. */
903+
elog(LOG,"[AQO] Because of an error skip %ld storage records.",
904+
num-i);
905+
break;
906+
}
847907
}
848908

849909
FreeFile(file);
@@ -896,11 +956,15 @@ dsa_init()
896956
Assert(aqo_state->data_dsa_handler==DSM_HANDLE_INVALID);
897957

898958
qtext_dsa=dsa_create(aqo_state->qtext_trancheid);
959+
Assert(qtext_dsa!=NULL);
960+
961+
if (dsm_size_max>0)
962+
dsa_set_size_limit(qtext_dsa,dsm_size_max*1024*1024);
963+
899964
dsa_pin(qtext_dsa);
900965
aqo_state->qtexts_dsa_handler=dsa_get_handle(qtext_dsa);
901966

902-
data_dsa=dsa_create(aqo_state->data_trancheid);
903-
dsa_pin(data_dsa);
967+
data_dsa=qtext_dsa;
904968
aqo_state->data_dsa_handler=dsa_get_handle(data_dsa);
905969

906970
/* Load and initialize query texts hash table */
@@ -910,11 +974,10 @@ dsa_init()
910974
else
911975
{
912976
qtext_dsa=dsa_attach(aqo_state->qtexts_dsa_handler);
913-
data_dsa=dsa_attach(aqo_state->data_dsa_handler);
977+
data_dsa=qtext_dsa;
914978
}
915979

916980
dsa_pin_mapping(qtext_dsa);
917-
dsa_pin_mapping(data_dsa);
918981
MemoryContextSwitchTo(old_context);
919982
LWLockRelease(&aqo_state->lock);
920983

@@ -973,7 +1036,17 @@ aqo_qtext_store(uint64 queryid, const char *query_string)
9731036
entry->queryid=queryid;
9741037
size=size>querytext_max_size ?querytext_max_size :size;
9751038
entry->qtext_dp=dsa_allocate(qtext_dsa,size);
976-
Assert(DsaPointerIsValid(entry->qtext_dp));
1039+
1040+
if (!_check_dsa_validity(entry->qtext_dp))
1041+
{
1042+
/*
1043+
* DSA stuck into problems. Rollback changes. Return false in belief
1044+
* that caller recognize it and don't try to call us more.
1045+
*/
1046+
(void)hash_search(qtexts_htab,&queryid,HASH_REMOVE,NULL);
1047+
return false;
1048+
}
1049+
9771050
strptr= (char*)dsa_get_address(qtext_dsa,entry->qtext_dp);
9781051
strlcpy(strptr,query_string,size);
9791052
aqo_state->qtexts_changed= true;
@@ -1173,7 +1246,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11731246

11741247
size=_compute_data_dsa(entry);
11751248
entry->data_dp=dsa_allocate0(data_dsa,size);
1176-
Assert(DsaPointerIsValid(entry->data_dp));
1249+
1250+
if (!_check_dsa_validity(entry->data_dp))
1251+
{
1252+
/*
1253+
* DSA stuck into problems. Rollback changes. Return false in belief
1254+
* that caller recognize it and don't try to call us more.
1255+
*/
1256+
(void)hash_search(data_htab,&key,HASH_REMOVE,NULL);
1257+
return false;
1258+
}
11771259
}
11781260

11791261
Assert(DsaPointerIsValid(entry->data_dp));
@@ -1195,7 +1277,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11951277
/* Need to re-allocate DSA chunk */
11961278
dsa_free(data_dsa,entry->data_dp);
11971279
entry->data_dp=dsa_allocate0(data_dsa,size);
1198-
Assert(DsaPointerIsValid(entry->data_dp));
1280+
1281+
if (!_check_dsa_validity(entry->data_dp))
1282+
{
1283+
/*
1284+
* DSA stuck into problems. Rollback changes. Return false in belief
1285+
* that caller recognize it and don't try to call us more.
1286+
*/
1287+
(void)hash_search(data_htab,&key,HASH_REMOVE,NULL);
1288+
return false;
1289+
}
11991290
}
12001291
ptr= (char*)dsa_get_address(data_dsa,entry->data_dp);
12011292

‎storage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ typedef struct QueriesEntry
8383
}QueriesEntry;
8484

8585
externintquerytext_max_size;
86+
externintdsm_size_max;
8687

8788
externHTAB*stat_htab;
8889
externHTAB*qtexts_htab;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp