Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9c225ac

Browse files
committed
Avoid passing function pointers across process boundaries.
This back-patches commit3247082into 9.6, primarily to make buildfarm member culicidae happy.Unlike the HEAD patch, avoid changing the existing API ofCreateParallelContext; instead we just switch to usingCreateParallelContextForExternalFunction, even for core functions.Petr Jelinek, with a bunch of basically-cosmetic adjustments by meDiscussion:https://postgr.es/m/548f9c1d-eafa-e3fa-9da8-f0cc2f654e60@2ndquadrant.com
1 parentd512794 commit9c225ac

File tree

5 files changed

+167
-92
lines changed

5 files changed

+167
-92
lines changed

‎src/backend/access/transam/README.parallel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ pattern looks like this:
198198

199199
EnterParallelMode();/* prohibit unsafe state changes */
200200

201-
pcxt =CreateParallelContext(entrypoint, nworkers);
201+
pcxt =CreateParallelContextForExternalFunction("library_name", "function_name", nworkers);
202202

203203
/* Allow space for application-specific data here. */
204204
shm_toc_estimate_chunk(&pcxt->estimator, size);

‎src/backend/access/transam/parallel.c

Lines changed: 104 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include"access/xlog.h"
2020
#include"catalog/namespace.h"
2121
#include"commands/async.h"
22+
#include"executor/execParallel.h"
2223
#include"libpq/libpq.h"
2324
#include"libpq/pqformat.h"
2425
#include"libpq/pqmq.h"
@@ -60,7 +61,7 @@
6061
#definePARALLEL_KEY_TRANSACTION_SNAPSHOTUINT64CONST(0xFFFFFFFFFFFF0006)
6162
#definePARALLEL_KEY_ACTIVE_SNAPSHOTUINT64CONST(0xFFFFFFFFFFFF0007)
6263
#definePARALLEL_KEY_TRANSACTION_STATEUINT64CONST(0xFFFFFFFFFFFF0008)
63-
#definePARALLEL_KEY_EXTENSION_TRAMPOLINEUINT64CONST(0xFFFFFFFFFFFF0009)
64+
#definePARALLEL_KEY_ENTRYPOINTUINT64CONST(0xFFFFFFFFFFFF0009)
6465

6566
/* Fixed-size parallel state. */
6667
typedefstructFixedParallelState
@@ -76,7 +77,7 @@ typedef struct FixedParallelState
7677
pid_tparallel_master_pid;
7778
BackendIdparallel_master_backend_id;
7879

79-
/* Entrypoint for parallel workers. */
80+
/* Entrypoint for parallel workers (deprecated)! */
8081
parallel_worker_main_typeentrypoint;
8182

8283
/* Mutex protects remaining fields. */
@@ -106,16 +107,36 @@ static FixedParallelState *MyFixedParallelState;
106107
/* List of active parallel contexts. */
107108
staticdlist_headpcxt_list=DLIST_STATIC_INIT(pcxt_list);
108109

110+
/*
111+
* List of internal parallel worker entry points. We need this for
112+
* reasons explained in LookupParallelWorkerFunction(), below.
113+
*/
114+
staticconststruct
115+
{
116+
constchar*fn_name;
117+
parallel_worker_main_typefn_addr;
118+
}InternalParallelWorkers[]=
119+
120+
{
121+
{
122+
"ParallelQueryMain",ParallelQueryMain
123+
}
124+
};
125+
109126
/* Private functions. */
110127
staticvoidHandleParallelMessage(ParallelContext*pcxt,inti,StringInfomsg);
111-
staticvoidParallelExtensionTrampoline(dsm_segment*seg,shm_toc*toc);
112128
staticvoidWaitForParallelWorkersToExit(ParallelContext*pcxt);
129+
staticparallel_worker_main_typeLookupParallelWorkerFunction(char*libraryname,char*funcname);
113130

114131

115132
/*
116133
* Establish a new parallel context. This should be done after entering
117134
* parallel mode, and (unless there is an error) the context should be
118135
* destroyed before exiting the current subtransaction.
136+
*
137+
* NB: specifying the entrypoint as a function address is unportable.
138+
* This will go away in Postgres 10, in favor of the API provided by
139+
* CreateParallelContextForExternalFunction; in the meantime use that.
119140
*/
120141
ParallelContext*
121142
CreateParallelContext(parallel_worker_main_typeentrypoint,intnworkers)
@@ -163,9 +184,9 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
163184
}
164185

165186
/*
166-
* Establish a new parallel context that calls a functionprovided byan
167-
*extension. This works around the fact that the library might get mapped
168-
*at a different address in each backend.
187+
* Establish a new parallel context that calls a functionspecified byname.
188+
*Unlike CreateParallelContext, this is robust against possible differences
189+
*in address space layout between different processes.
169190
*/
170191
ParallelContext*
171192
CreateParallelContextForExternalFunction(char*library_name,
@@ -179,7 +200,7 @@ CreateParallelContextForExternalFunction(char *library_name,
179200
oldcontext=MemoryContextSwitchTo(TopTransactionContext);
180201

181202
/* Create the context. */
182-
pcxt=CreateParallelContext(ParallelExtensionTrampoline,nworkers);
203+
pcxt=CreateParallelContext(NULL,nworkers);
183204
pcxt->library_name=pstrdup(library_name);
184205
pcxt->function_name=pstrdup(function_name);
185206

@@ -248,10 +269,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
248269
pcxt->nworkers));
249270
shm_toc_estimate_keys(&pcxt->estimator,1);
250271

251-
/* Estimate how much we'll need forextensionentrypoint info. */
272+
/* Estimate how much we'll need for entrypoint info. */
252273
if (pcxt->library_name!=NULL)
253274
{
254-
Assert(pcxt->entrypoint==ParallelExtensionTrampoline);
255275
Assert(pcxt->function_name!=NULL);
256276
shm_toc_estimate_chunk(&pcxt->estimator,strlen(pcxt->library_name)
257277
+strlen(pcxt->function_name)+2);
@@ -367,7 +387,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
367387
}
368388
shm_toc_insert(pcxt->toc,PARALLEL_KEY_ERROR_QUEUE,error_queue_space);
369389

370-
/* Serializeextensionentrypoint information. */
390+
/* Serialize entrypoint information. */
371391
if (pcxt->library_name!=NULL)
372392
{
373393
Sizelnamelen=strlen(pcxt->library_name);
@@ -377,7 +397,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
377397
+strlen(pcxt->function_name)+2);
378398
strcpy(extensionstate,pcxt->library_name);
379399
strcpy(extensionstate+lnamelen+1,pcxt->function_name);
380-
shm_toc_insert(pcxt->toc,PARALLEL_KEY_EXTENSION_TRAMPOLINE,
400+
shm_toc_insert(pcxt->toc,PARALLEL_KEY_ENTRYPOINT,
381401
extensionstate);
382402
}
383403
}
@@ -669,6 +689,10 @@ DestroyParallelContext(ParallelContext *pcxt)
669689
}
670690

671691
/* Free memory. */
692+
if (pcxt->library_name)
693+
pfree(pcxt->library_name);
694+
if (pcxt->function_name)
695+
pfree(pcxt->function_name);
672696
pfree(pcxt);
673697
}
674698

@@ -939,6 +963,8 @@ ParallelWorkerMain(Datum main_arg)
939963
shm_mq*mq;
940964
shm_mq_handle*mqh;
941965
char*libraryspace;
966+
char*entrypointstate;
967+
parallel_worker_main_typeentrypt;
942968
char*gucspace;
943969
char*combocidspace;
944970
char*tsnapspace;
@@ -1038,6 +1064,25 @@ ParallelWorkerMain(Datum main_arg)
10381064
Assert(libraryspace!=NULL);
10391065
RestoreLibraryState(libraryspace);
10401066

1067+
/*
1068+
* Identify the entry point to be called. In theory this could result in
1069+
* loading an additional library, though most likely the entry point is in
1070+
* the core backend or in a library we just loaded.
1071+
*/
1072+
entrypointstate=shm_toc_lookup(toc,PARALLEL_KEY_ENTRYPOINT);
1073+
if (entrypointstate!=NULL)
1074+
{
1075+
char*library_name;
1076+
char*function_name;
1077+
1078+
library_name=entrypointstate;
1079+
function_name=entrypointstate+strlen(library_name)+1;
1080+
1081+
entrypt=LookupParallelWorkerFunction(library_name,function_name);
1082+
}
1083+
else
1084+
entrypt=fps->entrypoint;
1085+
10411086
/* Restore database connection. */
10421087
BackgroundWorkerInitializeConnectionByOid(fps->database_id,
10431088
fps->authenticated_user_id);
@@ -1101,10 +1146,11 @@ ParallelWorkerMain(Datum main_arg)
11011146
/*
11021147
* Time to do the real work: invoke the caller-supplied code.
11031148
*
1104-
* If you get a crash at this line, see the comments for
1105-
* ParallelExtensionTrampoline.
1149+
* If you get a crash at this line, try using
1150+
* CreateParallelContextForExternalFunction instead of
1151+
* CreateParallelContext.
11061152
*/
1107-
fps->entrypoint(seg,toc);
1153+
entrypt(seg,toc);
11081154

11091155
/* Must exit parallel mode to pop active snapshot. */
11101156
ExitParallelMode();
@@ -1119,33 +1165,6 @@ ParallelWorkerMain(Datum main_arg)
11191165
pq_putmessage('X',NULL,0);
11201166
}
11211167

1122-
/*
1123-
* It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
1124-
* function living in a dynamically loaded module, because the module might
1125-
* not be loaded in every process, or might be loaded but not at the same
1126-
* address. To work around that problem, CreateParallelContextForExtension()
1127-
* arranges to call this function rather than calling the extension-provided
1128-
* function directly; and this function then looks up the real entrypoint and
1129-
* calls it.
1130-
*/
1131-
staticvoid
1132-
ParallelExtensionTrampoline(dsm_segment*seg,shm_toc*toc)
1133-
{
1134-
char*extensionstate;
1135-
char*library_name;
1136-
char*function_name;
1137-
parallel_worker_main_typeentrypt;
1138-
1139-
extensionstate=shm_toc_lookup(toc,PARALLEL_KEY_EXTENSION_TRAMPOLINE);
1140-
Assert(extensionstate!=NULL);
1141-
library_name=extensionstate;
1142-
function_name=extensionstate+strlen(library_name)+1;
1143-
1144-
entrypt= (parallel_worker_main_type)
1145-
load_external_function(library_name,function_name, true,NULL);
1146-
entrypt(seg,toc);
1147-
}
1148-
11491168
/*
11501169
* Update shared memory with the ending location of the last WAL record we
11511170
* wrote, if it's greater than the value already stored there.
@@ -1161,3 +1180,47 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
11611180
fps->last_xlog_end=last_xlog_end;
11621181
SpinLockRelease(&fps->mutex);
11631182
}
1183+
1184+
/*
1185+
* Look up (and possibly load) a parallel worker entry point function.
1186+
*
1187+
* For functions contained in the core code, we use library name "postgres"
1188+
* and consult the InternalParallelWorkers array. External functions are
1189+
* looked up, and loaded if necessary, using load_external_function().
1190+
*
1191+
* The point of this is to pass function names as strings across process
1192+
* boundaries. We can't pass actual function addresses because of the
1193+
* possibility that the function has been loaded at a different address
1194+
* in a different process. This is obviously a hazard for functions in
1195+
* loadable libraries, but it can happen even for functions in the core code
1196+
* on platforms using EXEC_BACKEND (e.g., Windows).
1197+
*
1198+
* At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1199+
* in favor of applying load_external_function() for core functions too;
1200+
* but that raises portability issues that are not worth addressing now.
1201+
*/
1202+
staticparallel_worker_main_type
1203+
LookupParallelWorkerFunction(char*libraryname,char*funcname)
1204+
{
1205+
/*
1206+
* If the function is to be loaded from postgres itself, search the
1207+
* InternalParallelWorkers array.
1208+
*/
1209+
if (strcmp(libraryname,"postgres")==0)
1210+
{
1211+
inti;
1212+
1213+
for (i=0;i<lengthof(InternalParallelWorkers);i++)
1214+
{
1215+
if (strcmp(InternalParallelWorkers[i].fn_name,funcname)==0)
1216+
returnInternalParallelWorkers[i].fn_addr;
1217+
}
1218+
1219+
/* We can only reach this by programming error. */
1220+
elog(ERROR,"internal function \"%s\" not found",funcname);
1221+
}
1222+
1223+
/* Otherwise load from external library. */
1224+
return (parallel_worker_main_type)
1225+
load_external_function(libraryname,funcname, true,NULL);
1226+
}

‎src/backend/executor/execParallel.c

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
105105
staticboolExecParallelRetrieveInstrumentation(PlanState*planstate,
106106
SharedExecutorInstrumentation*instrumentation);
107107

108-
/* Helper functions that run in the parallel worker. */
109-
staticvoidParallelQueryMain(dsm_segment*seg,shm_toc*toc);
108+
/* Helper function that runs in the parallel worker. */
110109
staticDestReceiver*ExecParallelGetReceiver(dsm_segment*seg,shm_toc*toc);
111110

112111
/*
@@ -355,7 +354,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
355354
pstmt_data=ExecSerializePlan(planstate->plan,estate);
356355

357356
/* Create a parallel context. */
358-
pcxt=CreateParallelContext(ParallelQueryMain,nworkers);
357+
pcxt=CreateParallelContextForExternalFunction("postgres","ParallelQueryMain",nworkers);
359358
pei->pcxt=pcxt;
360359

361360
/*
@@ -720,7 +719,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
720719
* to do this are also stored in the dsm_segment and can be accessed through
721720
* the shm_toc.
722721
*/
723-
staticvoid
722+
void
724723
ParallelQueryMain(dsm_segment*seg,shm_toc*toc)
725724
{
726725
BufferUsage*buffer_usage;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp