88
99#include "aqo_shared.h"
1010
11+
12+ typedef struct
13+ {
14+ int magic ;
15+ uint32 total_size ;
16+ uint32 delta ;
17+ }dsm_seg_hdr ;
18+
19+ #define free_space (hdr ) (uint32) (temp_storage_size - sizeof(dsm_seg_hdr) - hdr->delta)
20+ #define addr (delta )((char *) dsm_segment_address(seg) + sizeof(dsm_seg_hdr) + delta)
21+
1122shmem_startup_hook_type prev_shmem_startup_hook = NULL ;
12- static AQOSharedState * aqo_state = NULL ;
13- unsigned long temp_storage_size = 1024 * 1024 ;/* Storage size, in bytes */
14- void * temp_storage = NULL ;
23+ AQOSharedState * aqo_state = NULL ;
24+ HTAB * fss_htab = NULL ;
25+ static int aqo_htab_max_items = 1000 ;
26+ static uint32 temp_storage_size = 1024 * 1024 * 10 ;/* Storage size, in bytes */
27+ static dsm_segment * seg = NULL ;
1528
16- static void
17- attach_dsm_segment (void )
29+
30+ static void aqo_detach_shmem (int code ,Datum arg );
31+
32+
33+ void *
34+ get_dsm_all (uint32 * size )
1835{
19- dsm_segment * seg ;
36+ dsm_seg_hdr * hdr ;
2037
21- LWLockAcquire ( & aqo_state -> lock ,LW_EXCLUSIVE );
38+ Assert ( LWLockHeldByMeInMode ( & aqo_state -> lock ,LW_EXCLUSIVE ) );
2239
23- if (aqo_state -> dsm_handler != DSM_HANDLE_INVALID )
40+ if (aqo_state -> dsm_handler == DSM_HANDLE_INVALID )
41+ {
42+ /* Fast path. No any cached data exists. */
43+ * size = 0 ;
44+ return NULL ;
45+ }
46+
47+ if (!seg )
2448{
49+ /* if segment exists we should connect to */
2550seg = dsm_attach (aqo_state -> dsm_handler );
51+ Assert (seg );
52+ dsm_pin_mapping (seg );
53+ on_shmem_exit (aqo_detach_shmem , (Datum )& aqo_state -> dsm_handler );
54+ }
55+
56+ hdr = (dsm_seg_hdr * )dsm_segment_address (seg );
57+ * size = hdr -> delta ;
58+ return (char * )hdr + sizeof (dsm_seg_hdr );
59+ }
60+
61+ /*
62+ * Cleanup of DSM cache: set header into default state and zero the memory block.
63+ * This operation can be coupled with the cache dump, so we do it under an external
64+ * hold of the lock.
65+ */
66+ void
67+ reset_dsm_cache (void )
68+ {
69+ dsm_seg_hdr * hdr ;
70+ char * start ;
71+
72+ Assert (LWLockHeldByMeInMode (& aqo_state -> lock ,LW_EXCLUSIVE ));
73+
74+ if (aqo_state -> dsm_handler == DSM_HANDLE_INVALID )
75+ /* Fast path. No any cached data exists. */
76+ return ;
77+
78+ Assert (seg );
79+
80+ hdr = (dsm_seg_hdr * )dsm_segment_address (seg );
81+ start = (char * )hdr + sizeof (dsm_seg_hdr );
82+
83+ /* Reset the cache */
84+ memset (start ,0 ,hdr -> delta );
85+
86+ hdr -> delta = 0 ;
87+ hdr -> total_size = temp_storage_size - sizeof (dsm_seg_hdr );
88+ }
89+
90+ char *
91+ get_cache_address (void )
92+ {
93+ dsm_seg_hdr * hdr ;
94+
95+ Assert (LWLockHeldByMeInMode (& aqo_state -> lock ,LW_EXCLUSIVE )||
96+ LWLockHeldByMeInMode (& aqo_state -> lock ,LW_SHARED ));
97+
98+ if (aqo_state -> dsm_handler != DSM_HANDLE_INVALID )
99+ {
100+ if (!seg )
101+ {
102+ /* Another process created the segment yet. Just attach to. */
103+ seg = dsm_attach (aqo_state -> dsm_handler );
104+ dsm_pin_mapping (seg );
105+ on_shmem_exit (aqo_detach_shmem , (Datum )& aqo_state -> dsm_handler );
106+ }
107+
108+ hdr = (dsm_seg_hdr * )dsm_segment_address (seg );
26109}
27110else
28111{
112+ /*
113+ * First request for DSM cache in this instance.
114+ * Create the DSM segment. Pin it to live up to instance shutdown.
115+ * Don't forget to detach DSM segment before an exit.
116+ */
29117seg = dsm_create (temp_storage_size ,0 );
118+ dsm_pin_mapping (seg );
119+ dsm_pin_segment (seg );
30120aqo_state -> dsm_handler = dsm_segment_handle (seg );
121+ on_shmem_exit (aqo_detach_shmem , (Datum )& aqo_state -> dsm_handler );
122+
123+ hdr = (dsm_seg_hdr * )dsm_segment_address (seg );
124+ hdr -> magic = AQO_SHARED_MAGIC ;
125+ hdr -> delta = 0 ;
126+ hdr -> total_size = temp_storage_size - sizeof (dsm_seg_hdr );
31127}
32128
33- temp_storage = dsm_segment_address (seg );
34- LWLockRelease (& aqo_state -> lock );
129+ Assert (seg );
130+ Assert (hdr -> magic == AQO_SHARED_MAGIC && hdr -> total_size > 0 );
131+
132+ return (char * )hdr + sizeof (dsm_seg_hdr );
133+ }
134+
135+ uint32
136+ get_dsm_cache_pos (uint32 size )
137+ {
138+ dsm_seg_hdr * hdr ;
139+ uint32 pos ;
140+
141+ Assert (LWLockHeldByMeInMode (& aqo_state -> lock ,LW_EXCLUSIVE )||
142+ LWLockHeldByMeInMode (& aqo_state -> lock ,LW_SHARED ));
143+
144+ (void )get_cache_address ();
145+ hdr = (dsm_seg_hdr * )dsm_segment_address (seg );
146+
147+ if (free_space (hdr )< size || size == 0 )
148+ elog (ERROR ,
149+ "DSM cache can't allcoate a mem block. Required: %u, free: %u" ,
150+ size ,free_space (hdr ));
151+
152+ pos = hdr -> delta ;
153+ hdr -> delta += size ;
154+ Assert (free_space (hdr ) >=0 );
155+ return pos ;
35156}
36157
37158static void
38159aqo_detach_shmem (int code ,Datum arg )
39160{
40- dsm_handle handler = * (dsm_handle * )arg ;
41- dsm_detach (dsm_find_mapping (handler ));
161+ if (seg != NULL )
162+ dsm_detach (seg );
163+ seg = NULL ;
42164}
43165
44166void
45167aqo_init_shmem (void )
46168{
47169bool found ;
170+ HASHCTL info ;
171+
172+ aqo_state = NULL ;
173+ fss_htab = NULL ;
48174
49175LWLockAcquire (AddinShmemInitLock ,LW_EXCLUSIVE );
50176aqo_state = ShmemInitStruct ("aqo" ,sizeof (AQOSharedState ),& found );
@@ -54,8 +180,26 @@ aqo_init_shmem(void)
54180LWLockInitialize (& aqo_state -> lock ,LWLockNewTrancheId ());
55181aqo_state -> dsm_handler = DSM_HANDLE_INVALID ;
56182}
183+
184+ info .keysize = sizeof (htab_key );
185+ info .entrysize = sizeof (htab_entry );
186+ fss_htab = ShmemInitHash ("aqo hash" ,
187+ aqo_htab_max_items ,aqo_htab_max_items ,
188+ & info ,
189+ HASH_ELEM |HASH_BLOBS );
190+
57191LWLockRelease (AddinShmemInitLock );
58192
59193LWLockRegisterTranche (aqo_state -> lock .tranche ,"aqo" );
60- on_shmem_exit (aqo_detach_shmem , (Datum )& aqo_state -> dsm_handler );
194+ }
195+
196+ Size
197+ aqo_memsize (void )
198+ {
199+ Size size ;
200+
201+ size = MAXALIGN (sizeof (AQOSharedState ));
202+ size = add_size (size ,hash_estimate_size (aqo_htab_max_items ,sizeof (htab_entry )));
203+
204+ return size ;
61205}