Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcc5f813

Browse files
committed
Add support for coordinating record typmods among parallel workers.
Tuples can have type RECORDOID and a typmod number that identifies a blessedTupleDesc in a backend-private cache. To support the sharing of such tuplesthrough shared memory and temporary files, provide a typmod registry inshared memory.To achieve that, introduce per-session DSM segments, created on demand when abackend first runs a parallel query. The per-session DSM segment has atable-of-contents just like the per-query DSM segment, and initially thecontents are a shared record typmod registry and a DSA area to provide thespace it needs to grow.State relating to the current session is accessed via a Session objectreached through global variable CurrentSession that may require significantredesign further down the road as we figure out what else needs to be sharedor remodelled.Author: Thomas MunroReviewed-By: Andres FreundDiscussion:https://postgr.es/m/CAEepm=0ZtQ-SpsgCyzzYpsXS6e=kZWqk3g5Ygn3MDV7A8dabUA@mail.gmail.com
1 parent9b6cb46 commitcc5f813

File tree

12 files changed

+946
-37
lines changed

12 files changed

+946
-37
lines changed

‎src/backend/access/common/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ top_builddir = ../../../..
1313
include$(top_builddir)/src/Makefile.global
1414

1515
OBJS = bufmask.o heaptuple.o indextuple.o printsimple.o printtup.o\
16-
reloptions.o scankey.o tupconvert.o tupdesc.o
16+
reloptions.o scankey.osession.otupconvert.o tupdesc.o
1717

1818
include$(top_srcdir)/src/backend/common.mk

‎src/backend/access/common/session.c

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* session.c
4+
*Encapsulation of user session.
5+
*
6+
* This is intended to contain data that needs to be shared between backends
7+
* performing work for a client session. In particular such a session is
8+
* shared between the leader and worker processes for parallel queries. At
9+
* some later point it might also become useful infrastructure for separating
10+
* backends from client connections, e.g. for the purpose of pooling.
11+
*
12+
* Currently this infrastructure is used to share:
13+
* - typemod registry for ephemeral row-types, i.e. BlessTupleDesc etc.
14+
*
15+
* Portions Copyright (c) 2017, PostgreSQL Global Development Group
16+
*
17+
* src/backend/access/common/session.c
18+
*
19+
*-------------------------------------------------------------------------
20+
*/
21+
#include"postgres.h"
22+
23+
#include"access/session.h"
24+
#include"storage/lwlock.h"
25+
#include"storage/shm_toc.h"
26+
#include"utils/memutils.h"
27+
#include"utils/typcache.h"
28+
29+
/* Magic number for per-session DSM TOC. */
30+
#defineSESSION_MAGIC0xabb0fbc9
31+
32+
/*
33+
* We want to create a DSA area to store shared state that has the same
34+
* lifetime as a session. So far, it's only used to hold the shared record
35+
* type registry. We don't want it to have to create any DSM segments just
36+
* yet in common cases, so we'll give it enough space to hold a very small
37+
* SharedRecordTypmodRegistry.
38+
*/
39+
#defineSESSION_DSA_SIZE0x30000
40+
41+
/*
42+
* Magic numbers for state sharing in the per-session DSM area.
43+
*/
44+
#defineSESSION_KEY_DSAUINT64CONST(0xFFFFFFFFFFFF0001)
45+
#defineSESSION_KEY_RECORD_TYPMOD_REGISTRYUINT64CONST(0xFFFFFFFFFFFF0002)
46+
47+
/* This backend's current session. */
48+
Session*CurrentSession=NULL;
49+
50+
/*
51+
* Set up CurrentSession to point to an empty Session object.
52+
*/
53+
void
54+
InitializeSession(void)
55+
{
56+
CurrentSession=MemoryContextAllocZero(TopMemoryContext,sizeof(Session));
57+
}
58+
59+
/*
60+
* Initialize the per-session DSM segment if it isn't already initialized, and
61+
* return its handle so that worker processes can attach to it.
62+
*
63+
* Unlike the per-context DSM segment, this segement and its contents are
64+
* reused for future parallel queries.
65+
*
66+
* Return DSM_HANDLE_INVALID if a segment can't be allocated due to lack of
67+
* resources.
68+
*/
69+
dsm_handle
70+
GetSessionDsmHandle(void)
71+
{
72+
shm_toc_estimatorestimator;
73+
shm_toc*toc;
74+
dsm_segment*seg;
75+
size_ttypmod_registry_size;
76+
size_tsize;
77+
void*dsa_space;
78+
void*typmod_registry_space;
79+
dsa_area*dsa;
80+
MemoryContextold_context;
81+
82+
/*
83+
* If we have already created a session-scope DSM segment in this backend,
84+
* return its handle. The same segment will be used for the rest of this
85+
* backend's lifetime.
86+
*/
87+
if (CurrentSession->segment!=NULL)
88+
returndsm_segment_handle(CurrentSession->segment);
89+
90+
/* Otherwise, prepare to set one up. */
91+
old_context=MemoryContextSwitchTo(TopMemoryContext);
92+
shm_toc_initialize_estimator(&estimator);
93+
94+
/* Estimate space for the per-session DSA area. */
95+
shm_toc_estimate_keys(&estimator,1);
96+
shm_toc_estimate_chunk(&estimator,SESSION_DSA_SIZE);
97+
98+
/* Estimate space for the per-session record typmod registry. */
99+
typmod_registry_size=SharedRecordTypmodRegistryEstimate();
100+
shm_toc_estimate_keys(&estimator,1);
101+
shm_toc_estimate_chunk(&estimator,typmod_registry_size);
102+
103+
/* Set up segment and TOC. */
104+
size=shm_toc_estimate(&estimator);
105+
seg=dsm_create(size,DSM_CREATE_NULL_IF_MAXSEGMENTS);
106+
if (seg==NULL)
107+
{
108+
MemoryContextSwitchTo(old_context);
109+
110+
returnDSM_HANDLE_INVALID;
111+
}
112+
toc=shm_toc_create(SESSION_MAGIC,
113+
dsm_segment_address(seg),
114+
size);
115+
116+
/* Create per-session DSA area. */
117+
dsa_space=shm_toc_allocate(toc,SESSION_DSA_SIZE);
118+
dsa=dsa_create_in_place(dsa_space,
119+
SESSION_DSA_SIZE,
120+
LWTRANCHE_SESSION_DSA,
121+
seg);
122+
shm_toc_insert(toc,SESSION_KEY_DSA,dsa_space);
123+
124+
125+
/* Create session-scoped shared record typmod registry. */
126+
typmod_registry_space=shm_toc_allocate(toc,typmod_registry_size);
127+
SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry*)
128+
typmod_registry_space,seg,dsa);
129+
shm_toc_insert(toc,SESSION_KEY_RECORD_TYPMOD_REGISTRY,
130+
typmod_registry_space);
131+
132+
/*
133+
* If we got this far, we can pin the shared memory so it stays mapped for
134+
* the rest of this backend's life. If we don't make it this far, cleanup
135+
* callbacks for anything we installed above (ie currently
136+
* SharedRecordTypemodRegistry) will run when the DSM segment is detached
137+
* by CurrentResourceOwner so we aren't left with a broken CurrentSession.
138+
*/
139+
dsm_pin_mapping(seg);
140+
dsa_pin_mapping(dsa);
141+
142+
/* Make segment and area available via CurrentSession. */
143+
CurrentSession->segment=seg;
144+
CurrentSession->area=dsa;
145+
146+
MemoryContextSwitchTo(old_context);
147+
148+
returndsm_segment_handle(seg);
149+
}
150+
151+
/*
152+
* Attach to a per-session DSM segment provided by a parallel leader.
153+
*/
154+
void
155+
AttachSession(dsm_handlehandle)
156+
{
157+
dsm_segment*seg;
158+
shm_toc*toc;
159+
void*dsa_space;
160+
void*typmod_registry_space;
161+
dsa_area*dsa;
162+
MemoryContextold_context;
163+
164+
old_context=MemoryContextSwitchTo(TopMemoryContext);
165+
166+
/* Attach to the DSM segment. */
167+
seg=dsm_attach(handle);
168+
if (seg==NULL)
169+
elog(ERROR,"could not attach to per-session DSM segment");
170+
toc=shm_toc_attach(SESSION_MAGIC,dsm_segment_address(seg));
171+
172+
/* Attach to the DSA area. */
173+
dsa_space=shm_toc_lookup(toc,SESSION_KEY_DSA, false);
174+
dsa=dsa_attach_in_place(dsa_space,seg);
175+
176+
/* Make them available via the current session. */
177+
CurrentSession->segment=seg;
178+
CurrentSession->area=dsa;
179+
180+
/* Attach to the shared record typmod registry. */
181+
typmod_registry_space=
182+
shm_toc_lookup(toc,SESSION_KEY_RECORD_TYPMOD_REGISTRY, false);
183+
SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry*)
184+
typmod_registry_space);
185+
186+
/* Remain attached until end of backend or DetachSession(). */
187+
dsm_pin_mapping(seg);
188+
dsa_pin_mapping(dsa);
189+
190+
MemoryContextSwitchTo(old_context);
191+
}
192+
193+
/*
194+
* Detach from the current session DSM segment. It's not strictly necessary
195+
* to do this explicitly since we'll detach automatically at backend exit, but
196+
* if we ever reuse parallel workers it will become important for workers to
197+
* detach from one session before attaching to another. Note that this runs
198+
* detach hooks.
199+
*/
200+
void
201+
DetachSession(void)
202+
{
203+
/* Runs detach hooks. */
204+
dsm_detach(CurrentSession->segment);
205+
CurrentSession->segment=NULL;
206+
dsa_detach(CurrentSession->area);
207+
CurrentSession->area=NULL;
208+
}

‎src/backend/access/common/tupdesc.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,22 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
184184
returndesc;
185185
}
186186

187+
/*
188+
* TupleDescCopy
189+
*Copy a tuple descriptor into caller-supplied memory.
190+
*The memory may be shared memory mapped at any address, and must
191+
*be sufficient to hold TupleDescSize(src) bytes.
192+
*
193+
* !!! Constraints and defaults are not copied !!!
194+
*/
195+
void
196+
TupleDescCopy(TupleDescdst,TupleDescsrc)
197+
{
198+
memcpy(dst,src,TupleDescSize(src));
199+
dst->constr=NULL;
200+
dst->tdrefcount=-1;
201+
}
202+
187203
/*
188204
* TupleDescCopyEntry
189205
*This function copies a single attribute structure from one tuple

‎src/backend/access/transam/parallel.c

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include"postgres.h"
1616

1717
#include"access/parallel.h"
18+
#include"access/session.h"
1819
#include"access/xact.h"
1920
#include"access/xlog.h"
2021
#include"catalog/namespace.h"
@@ -36,6 +37,7 @@
3637
#include"utils/memutils.h"
3738
#include"utils/resowner.h"
3839
#include"utils/snapmgr.h"
40+
#include"utils/typcache.h"
3941

4042

4143
/*
@@ -51,8 +53,9 @@
5153
#definePARALLEL_MAGIC0x50477c7c
5254

5355
/*
54-
* Magic numbers for parallel state sharing. Higher-level code should use
55-
* smaller values, leaving these very large ones for use by this module.
56+
* Magic numbers for per-context parallel state sharing. Higher-level code
57+
* should use smaller values, leaving these very large ones for use by this
58+
* module.
5659
*/
5760
#definePARALLEL_KEY_FIXEDUINT64CONST(0xFFFFFFFFFFFF0001)
5861
#definePARALLEL_KEY_ERROR_QUEUEUINT64CONST(0xFFFFFFFFFFFF0002)
@@ -63,6 +66,7 @@
6366
#definePARALLEL_KEY_ACTIVE_SNAPSHOTUINT64CONST(0xFFFFFFFFFFFF0007)
6467
#definePARALLEL_KEY_TRANSACTION_STATEUINT64CONST(0xFFFFFFFFFFFF0008)
6568
#definePARALLEL_KEY_ENTRYPOINTUINT64CONST(0xFFFFFFFFFFFF0009)
69+
#definePARALLEL_KEY_SESSION_DSMUINT64CONST(0xFFFFFFFFFFFF000A)
6670

6771
/* Fixed-size parallel state. */
6872
typedefstructFixedParallelState
@@ -197,6 +201,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
197201
Sizesegsize=0;
198202
inti;
199203
FixedParallelState*fps;
204+
dsm_handlesession_dsm_handle=DSM_HANDLE_INVALID;
200205
Snapshottransaction_snapshot=GetTransactionSnapshot();
201206
Snapshotactive_snapshot=GetActiveSnapshot();
202207

@@ -211,6 +216,21 @@ InitializeParallelDSM(ParallelContext *pcxt)
211216
* Normally, the user will have requested at least one worker process, but
212217
* if by chance they have not, we can skip a bunch of things here.
213218
*/
219+
if (pcxt->nworkers>0)
220+
{
221+
/* Get (or create) the per-session DSM segment's handle. */
222+
session_dsm_handle=GetSessionDsmHandle();
223+
224+
/*
225+
* If we weren't able to create a per-session DSM segment, then we can
226+
* continue but we can't safely launch any workers because their
227+
* record typmods would be incompatible so they couldn't exchange
228+
* tuples.
229+
*/
230+
if (session_dsm_handle==DSM_HANDLE_INVALID)
231+
pcxt->nworkers=0;
232+
}
233+
214234
if (pcxt->nworkers>0)
215235
{
216236
/* Estimate space for various kinds of state sharing. */
@@ -226,8 +246,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
226246
shm_toc_estimate_chunk(&pcxt->estimator,asnaplen);
227247
tstatelen=EstimateTransactionStateSpace();
228248
shm_toc_estimate_chunk(&pcxt->estimator,tstatelen);
249+
shm_toc_estimate_chunk(&pcxt->estimator,sizeof(dsm_handle));
229250
/* If you add more chunks here, you probably need to add keys. */
230-
shm_toc_estimate_keys(&pcxt->estimator,6);
251+
shm_toc_estimate_keys(&pcxt->estimator,7);
231252

232253
/* Estimate space need for error queues. */
233254
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE)==
@@ -295,6 +316,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
295316
char*asnapspace;
296317
char*tstatespace;
297318
char*error_queue_space;
319+
char*session_dsm_handle_space;
298320
char*entrypointstate;
299321
Sizelnamelen;
300322

@@ -322,6 +344,13 @@ InitializeParallelDSM(ParallelContext *pcxt)
322344
SerializeSnapshot(active_snapshot,asnapspace);
323345
shm_toc_insert(pcxt->toc,PARALLEL_KEY_ACTIVE_SNAPSHOT,asnapspace);
324346

347+
/* Provide the handle for per-session segment. */
348+
session_dsm_handle_space=shm_toc_allocate(pcxt->toc,
349+
sizeof(dsm_handle));
350+
*(dsm_handle*)session_dsm_handle_space=session_dsm_handle;
351+
shm_toc_insert(pcxt->toc,PARALLEL_KEY_SESSION_DSM,
352+
session_dsm_handle_space);
353+
325354
/* Serialize transaction state. */
326355
tstatespace=shm_toc_allocate(pcxt->toc,tstatelen);
327356
SerializeTransactionState(tstatelen,tstatespace);
@@ -938,6 +967,7 @@ ParallelWorkerMain(Datum main_arg)
938967
char*asnapspace;
939968
char*tstatespace;
940969
StringInfoDatamsgbuf;
970+
char*session_dsm_handle_space;
941971

942972
/* Set flag to indicate that we're initializing a parallel worker. */
943973
InitializingParallelWorker= true;
@@ -1064,6 +1094,11 @@ ParallelWorkerMain(Datum main_arg)
10641094
combocidspace=shm_toc_lookup(toc,PARALLEL_KEY_COMBO_CID, false);
10651095
RestoreComboCIDState(combocidspace);
10661096

1097+
/* Attach to the per-session DSM segment and contained objects. */
1098+
session_dsm_handle_space=
1099+
shm_toc_lookup(toc,PARALLEL_KEY_SESSION_DSM, false);
1100+
AttachSession(*(dsm_handle*)session_dsm_handle_space);
1101+
10671102
/* Restore transaction snapshot. */
10681103
tsnapspace=shm_toc_lookup(toc,PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
10691104
RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
@@ -1110,6 +1145,9 @@ ParallelWorkerMain(Datum main_arg)
11101145
/* Shut down the parallel-worker transaction. */
11111146
EndParallelWorkerTransaction();
11121147

1148+
/* Detach from the per-session DSM segment. */
1149+
DetachSession();
1150+
11131151
/* Report success. */
11141152
pq_putmessage('X',NULL,0);
11151153
}

‎src/backend/storage/lmgr/lwlock.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
494494

495495
if (LWLockTrancheArray==NULL)
496496
{
497-
LWLockTranchesAllocated=64;
497+
LWLockTranchesAllocated=128;
498498
LWLockTrancheArray= (char**)
499499
MemoryContextAllocZero(TopMemoryContext,
500500
LWLockTranchesAllocated*sizeof(char*));
@@ -510,6 +510,12 @@ RegisterLWLockTranches(void)
510510
"predicate_lock_manager");
511511
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
512512
"parallel_query_dsa");
513+
LWLockRegisterTranche(LWTRANCHE_SESSION_DSA,
514+
"session_dsa");
515+
LWLockRegisterTranche(LWTRANCHE_SESSION_RECORD_TABLE,
516+
"session_record_table");
517+
LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
518+
"session_typmod_table");
513519
LWLockRegisterTranche(LWTRANCHE_TBM,"tbm");
514520

515521
/* Register named tranches. */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp