@@ -61,10 +61,11 @@ typedef enum {
6161}aqo_queries_cols ;
6262
6363typedef void * (* form_record_t ) (void * ctx ,size_t * size );
64- typedef void (* deform_record_t ) (void * data ,size_t size );
64+ typedef bool (* deform_record_t ) (void * data ,size_t size );
6565
6666
6767int querytext_max_size = 1000 ;
68+ int dsm_size_max = 100 ;/* in MB */
6869
6970HTAB * stat_htab = NULL ;
7071HTAB * queries_htab = NULL ;
@@ -642,7 +643,7 @@ data_store(const char *filename, form_record_t callback,
642643return -1 ;
643644}
644645
645- static void
646+ static bool
646647_deform_stat_record_cb (void * data ,size_t size )
647648{
648649bool found ;
@@ -656,24 +657,35 @@ _deform_stat_record_cb(void *data, size_t size)
656657entry = (StatEntry * )hash_search (stat_htab ,& queryid ,HASH_ENTER ,& found );
657658Assert (!found );
658659memcpy (entry ,data ,sizeof (StatEntry ));
660+ return true;
659661}
660662
661663void
662664aqo_stat_load (void )
663665{
664- long entries ;
665-
666666Assert (!LWLockHeldByMe (& aqo_state -> stat_lock ));
667667
668668LWLockAcquire (& aqo_state -> stat_lock ,LW_EXCLUSIVE );
669- entries = hash_get_num_entries (stat_htab );
670- Assert (entries == 0 );
669+
670+ /* Load on postmaster sturtup. So no any concurrent actions possible here. */
671+ Assert (hash_get_num_entries (stat_htab )== 0 );
672+
671673data_load (PGAQO_STAT_FILE ,_deform_stat_record_cb ,NULL );
672674
673675LWLockRelease (& aqo_state -> stat_lock );
674676}
675677
676- static void
678+ static bool
679+ _check_dsa_validity (dsa_pointer ptr )
680+ {
681+ if (DsaPointerIsValid (ptr ))
682+ return true;
683+
684+ elog (LOG ,"[AQO] DSA Pointer isn't valid. Is the memory limit exceeded?" );
685+ return false;
686+ }
687+
688+ static bool
677689_deform_qtexts_record_cb (void * data ,size_t size )
678690{
679691bool found ;
@@ -690,9 +702,19 @@ _deform_qtexts_record_cb(void *data, size_t size)
690702Assert (!found );
691703
692704entry -> qtext_dp = dsa_allocate (qtext_dsa ,len );
693- Assert (DsaPointerIsValid (entry -> qtext_dp ));
705+ if (!_check_dsa_validity (entry -> qtext_dp ))
706+ {
707+ /*
708+ * DSA stuck into problems. Rollback changes. Return false in belief
709+ * that caller recognize it and don't try to call us more.
710+ */
711+ (void )hash_search (qtexts_htab ,& queryid ,HASH_REMOVE ,NULL );
712+ return false;
713+ }
714+
694715strptr = (char * )dsa_get_address (qtext_dsa ,entry -> qtext_dp );
695716strlcpy (strptr ,query_string ,len );
717+ return true;
696718}
697719
698720void
@@ -705,7 +727,15 @@ aqo_qtexts_load(void)
705727Assert (qtext_dsa != NULL );
706728
707729LWLockAcquire (& aqo_state -> qtexts_lock ,LW_EXCLUSIVE );
708- Assert (hash_get_num_entries (qtexts_htab )== 0 );
730+
731+ if (hash_get_num_entries (qtexts_htab )!= 0 )
732+ {
733+ /* Someone have done it concurrently. */
734+ elog (LOG ,"[AQO] Another backend have loaded query texts concurrently." );
735+ LWLockRelease (& aqo_state -> qtexts_lock );
736+ return ;
737+ }
738+
709739data_load (PGAQO_TEXT_FILE ,_deform_qtexts_record_cb ,NULL );
710740
711741/* Check existence of default feature space */
@@ -725,7 +755,7 @@ aqo_qtexts_load(void)
725755 * Getting a data chunk from a caller, add a record into the 'ML data'
726756 * shmem hash table. Allocate and fill DSA chunk for variadic part of the data.
727757 */
728- static void
758+ static bool
729759_deform_data_record_cb (void * data ,size_t size )
730760{
731761bool found ;
@@ -737,7 +767,7 @@ _deform_data_record_cb(void *data, size_t size)
737767
738768Assert (LWLockHeldByMeInMode (& aqo_state -> data_lock ,LW_EXCLUSIVE ));
739769entry = (DataEntry * )hash_search (data_htab ,& fentry -> key ,
740- HASH_ENTER ,& found );
770+ HASH_ENTER ,& found );
741771Assert (!found );
742772
743773/* Copy fixed-size part of entry byte-by-byte even with caves */
@@ -747,9 +777,20 @@ _deform_data_record_cb(void *data, size_t size)
747777sz = _compute_data_dsa (entry );
748778Assert (sz + offsetof(DataEntry ,data_dp )== size );
749779entry -> data_dp = dsa_allocate (data_dsa ,sz );
750- Assert (DsaPointerIsValid (entry -> data_dp ));
780+
781+ if (!_check_dsa_validity (entry -> data_dp ))
782+ {
783+ /*
784+ * DSA stuck into problems. Rollback changes. Return false in belief
785+ * that caller recognize it and don't try to call us more.
786+ */
787+ (void )hash_search (data_htab ,& fentry -> key ,HASH_REMOVE ,NULL );
788+ return false;
789+ }
790+
751791dsa_ptr = (char * )dsa_get_address (data_dsa ,entry -> data_dp );
752792memcpy (dsa_ptr ,ptr ,sz );
793+ return true;
753794}
754795
755796void
@@ -759,14 +800,22 @@ aqo_data_load(void)
759800Assert (data_dsa != NULL );
760801
761802LWLockAcquire (& aqo_state -> data_lock ,LW_EXCLUSIVE );
762- Assert (hash_get_num_entries (data_htab )== 0 );
803+
804+ if (hash_get_num_entries (data_htab )!= 0 )
805+ {
806+ /* Someone have done it concurrently. */
807+ elog (LOG ,"[AQO] Another backend have loaded query data concurrently." );
808+ LWLockRelease (& aqo_state -> data_lock );
809+ return ;
810+ }
811+
763812data_load (PGAQO_DATA_FILE ,_deform_data_record_cb ,NULL );
764813
765814aqo_state -> data_changed = false;/* mem data is consistent with disk */
766815LWLockRelease (& aqo_state -> data_lock );
767816}
768817
769- static void
818+ static bool
770819_deform_queries_record_cb (void * data ,size_t size )
771820{
772821bool found ;
@@ -780,20 +829,22 @@ _deform_queries_record_cb(void *data, size_t size)
780829entry = (QueriesEntry * )hash_search (queries_htab ,& queryid ,HASH_ENTER ,& found );
781830Assert (!found );
782831memcpy (entry ,data ,sizeof (QueriesEntry ));
832+ return true;
783833}
784834
785835void
786836aqo_queries_load (void )
787837{
788- long entries ;
789838bool found ;
790839uint64 queryid = 0 ;
791840
792841Assert (!LWLockHeldByMe (& aqo_state -> queries_lock ));
793842
794843LWLockAcquire (& aqo_state -> queries_lock ,LW_EXCLUSIVE );
795- entries = hash_get_num_entries (queries_htab );
796- Assert (entries == 0 );
844+
845+ /* Load on postmaster sturtup. So no any concurrent actions possible here. */
846+ Assert (hash_get_num_entries (queries_htab )== 0 );
847+
797848data_load (PGAQO_QUERIES_FILE ,_deform_queries_record_cb ,NULL );
798849
799850/* Check existence of default feature space */
@@ -836,14 +887,23 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
836887{
837888void * data ;
838889size_t size ;
890+ bool res ;
839891
840892if (fread (& size ,sizeof (size ),1 ,file )!= 1 )
841893gotoread_error ;
842894data = palloc (size );
843895if (fread (data ,size ,1 ,file )!= 1 )
844896gotoread_error ;
845- callback (data ,size );
897+ res = callback (data ,size );
846898pfree (data );
899+
900+ if (!res )
901+ {
902+ /* Error detected. Do not try to read tails of the storage. */
903+ elog (LOG ,"[AQO] Because of an error skip %ld storage records." ,
904+ num - i );
905+ break ;
906+ }
847907}
848908
849909FreeFile (file );
@@ -896,11 +956,15 @@ dsa_init()
896956Assert (aqo_state -> data_dsa_handler == DSM_HANDLE_INVALID );
897957
898958qtext_dsa = dsa_create (aqo_state -> qtext_trancheid );
959+ Assert (qtext_dsa != NULL );
960+
961+ if (dsm_size_max > 0 )
962+ dsa_set_size_limit (qtext_dsa ,dsm_size_max * 1024 * 1024 );
963+
899964dsa_pin (qtext_dsa );
900965aqo_state -> qtexts_dsa_handler = dsa_get_handle (qtext_dsa );
901966
902- data_dsa = dsa_create (aqo_state -> data_trancheid );
903- dsa_pin (data_dsa );
967+ data_dsa = qtext_dsa ;
904968aqo_state -> data_dsa_handler = dsa_get_handle (data_dsa );
905969
906970/* Load and initialize query texts hash table */
@@ -910,11 +974,10 @@ dsa_init()
910974else
911975{
912976qtext_dsa = dsa_attach (aqo_state -> qtexts_dsa_handler );
913- data_dsa = dsa_attach ( aqo_state -> data_dsa_handler ) ;
977+ data_dsa = qtext_dsa ;
914978}
915979
916980dsa_pin_mapping (qtext_dsa );
917- dsa_pin_mapping (data_dsa );
918981MemoryContextSwitchTo (old_context );
919982LWLockRelease (& aqo_state -> lock );
920983
@@ -973,7 +1036,17 @@ aqo_qtext_store(uint64 queryid, const char *query_string)
9731036entry -> queryid = queryid ;
9741037size = size > querytext_max_size ?querytext_max_size :size ;
9751038entry -> qtext_dp = dsa_allocate (qtext_dsa ,size );
976- Assert (DsaPointerIsValid (entry -> qtext_dp ));
1039+
1040+ if (!_check_dsa_validity (entry -> qtext_dp ))
1041+ {
1042+ /*
1043+ * DSA stuck into problems. Rollback changes. Return false in belief
1044+ * that caller recognize it and don't try to call us more.
1045+ */
1046+ (void )hash_search (qtexts_htab ,& queryid ,HASH_REMOVE ,NULL );
1047+ return false;
1048+ }
1049+
9771050strptr = (char * )dsa_get_address (qtext_dsa ,entry -> qtext_dp );
9781051strlcpy (strptr ,query_string ,size );
9791052aqo_state -> qtexts_changed = true;
@@ -1173,7 +1246,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11731246
11741247size = _compute_data_dsa (entry );
11751248entry -> data_dp = dsa_allocate0 (data_dsa ,size );
1176- Assert (DsaPointerIsValid (entry -> data_dp ));
1249+
1250+ if (!_check_dsa_validity (entry -> data_dp ))
1251+ {
1252+ /*
1253+ * DSA stuck into problems. Rollback changes. Return false in belief
1254+ * that caller recognize it and don't try to call us more.
1255+ */
1256+ (void )hash_search (data_htab ,& key ,HASH_REMOVE ,NULL );
1257+ return false;
1258+ }
11771259}
11781260
11791261Assert (DsaPointerIsValid (entry -> data_dp ));
@@ -1195,7 +1277,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11951277/* Need to re-allocate DSA chunk */
11961278dsa_free (data_dsa ,entry -> data_dp );
11971279entry -> data_dp = dsa_allocate0 (data_dsa ,size );
1198- Assert (DsaPointerIsValid (entry -> data_dp ));
1280+
1281+ if (!_check_dsa_validity (entry -> data_dp ))
1282+ {
1283+ /*
1284+ * DSA stuck into problems. Rollback changes. Return false in belief
1285+ * that caller recognize it and don't try to call us more.
1286+ */
1287+ (void )hash_search (data_htab ,& key ,HASH_REMOVE ,NULL );
1288+ return false;
1289+ }
11991290}
12001291ptr = (char * )dsa_get_address (data_dsa ,entry -> data_dp );
12011292