Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit37b369d

Browse files
committed
injection_points: Add wait and wakeup of processes
This commit adds two features to the in-core module for injectionpoints:- A new callback called "wait" that can be attached to an injectionpoint to make it wait.- A new SQL function to update the shared state and broadcast the updateusing a condition variable. This function uses an input an injectionpoint name.This offers the possibility to stop a process in flight and wake it upin a controlled manner, which is useful when implementing tests that aimto trigger scenarios for race conditions (some tests are planned forintegration). The logic uses a set of counters with a conditionvariable to monitor and broadcast the changes. Up to 8 waits can beregistered in a single run, which should be plenty enough. Waits can bemonitored in pg_stat_activity, based on the injection point name whichis registered in a custom wait event under the "Extension" category.The shared memory state used by the module is registered using the DSMregistry, and is optional, so there is no need to load the module withshared_preload_libraries to be able to use these features.Author: Michael PaquierReviewed-by: Andrey Borodin, Bertrand DrouvotDiscussion:https://postgr.es/m/ZdLuxBk5hGpol91B@paquier.xyz
1 parent024c521 commit37b369d

File tree

3 files changed

+166
-0
lines changed

3 files changed

+166
-0
lines changed

‎src/test/modules/injection_points/injection_points--1.0.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ RETURNS void
2424
AS'MODULE_PATHNAME','injection_points_run'
2525
LANGUAGE C STRICT PARALLEL UNSAFE;
2626

27+
--
28+
-- injection_points_wakeup()
29+
--
30+
-- Wakes up a waiting injection point.
31+
--
32+
CREATEFUNCTIONinjection_points_wakeup(IN point_nameTEXT)
33+
RETURNS void
34+
AS'MODULE_PATHNAME','injection_points_wakeup'
35+
LANGUAGE C STRICT PARALLEL UNSAFE;
36+
2737
--
2838
-- injection_points_detach()
2939
--

‎src/test/modules/injection_points/injection_points.c

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,74 @@
1818
#include"postgres.h"
1919

2020
#include"fmgr.h"
21+
#include"storage/condition_variable.h"
2122
#include"storage/lwlock.h"
2223
#include"storage/shmem.h"
24+
#include"storage/dsm_registry.h"
2325
#include"utils/builtins.h"
2426
#include"utils/injection_point.h"
2527
#include"utils/wait_event.h"
2628

2729
PG_MODULE_MAGIC;
2830

31+
/* Maximum number of waits usable in injection points at once */
32+
#defineINJ_MAX_WAIT8
33+
#defineINJ_NAME_MAXLEN64
34+
35+
/* Shared state information for injection points. */
36+
typedefstructInjectionPointSharedState
37+
{
38+
/* Protects access to other fields */
39+
slock_tlock;
40+
41+
/* Counters advancing when injection_points_wakeup() is called */
42+
uint32wait_counts[INJ_MAX_WAIT];
43+
44+
/* Names of injection points attached to wait counters */
45+
charname[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
46+
47+
/* Condition variable used for waits and wakeups */
48+
ConditionVariablewait_point;
49+
}InjectionPointSharedState;
50+
51+
/* Pointer to shared-memory state. */
52+
staticInjectionPointSharedState*inj_state=NULL;
53+
2954
externPGDLLEXPORTvoidinjection_error(constchar*name);
3055
externPGDLLEXPORTvoidinjection_notice(constchar*name);
56+
externPGDLLEXPORTvoidinjection_wait(constchar*name);
57+
58+
59+
/*
60+
* Callback for shared memory area initialization.
61+
*/
62+
staticvoid
63+
injection_point_init_state(void*ptr)
64+
{
65+
InjectionPointSharedState*state= (InjectionPointSharedState*)ptr;
66+
67+
SpinLockInit(&state->lock);
68+
memset(state->wait_counts,0,sizeof(state->wait_counts));
69+
memset(state->name,0,sizeof(state->name));
70+
ConditionVariableInit(&state->wait_point);
71+
}
72+
73+
/*
74+
* Initialize shared memory area for this module.
75+
*/
76+
staticvoid
77+
injection_init_shmem(void)
78+
{
79+
boolfound;
3180

81+
if (inj_state!=NULL)
82+
return;
83+
84+
inj_state=GetNamedDSMSegment("injection_points",
85+
sizeof(InjectionPointSharedState),
86+
injection_point_init_state,
87+
&found);
88+
}
3289

3390
/* Set of callbacks available to be attached to an injection point. */
3491
void
@@ -43,6 +100,66 @@ injection_notice(const char *name)
43100
elog(NOTICE,"notice triggered for injection point %s",name);
44101
}
45102

103+
/* Wait on a condition variable, awaken by injection_points_wakeup() */
104+
void
105+
injection_wait(constchar*name)
106+
{
107+
uint32old_wait_counts=0;
108+
intindex=-1;
109+
uint32injection_wait_event=0;
110+
111+
if (inj_state==NULL)
112+
injection_init_shmem();
113+
114+
/*
115+
* Use the injection point name for this custom wait event. Note that
116+
* this custom wait event name is not released, but we don't care much for
117+
* testing as this should be short-lived.
118+
*/
119+
injection_wait_event=WaitEventExtensionNew(name);
120+
121+
/*
122+
* Find a free slot to wait for, and register this injection point's name.
123+
*/
124+
SpinLockAcquire(&inj_state->lock);
125+
for (inti=0;i<INJ_MAX_WAIT;i++)
126+
{
127+
if (inj_state->name[i][0]=='\0')
128+
{
129+
index=i;
130+
strlcpy(inj_state->name[i],name,INJ_NAME_MAXLEN);
131+
old_wait_counts=inj_state->wait_counts[i];
132+
break;
133+
}
134+
}
135+
SpinLockRelease(&inj_state->lock);
136+
137+
if (index<0)
138+
elog(ERROR,"could not find free slot for wait of injection point %s ",
139+
name);
140+
141+
/* And sleep.. */
142+
ConditionVariablePrepareToSleep(&inj_state->wait_point);
143+
for (;;)
144+
{
145+
uint32new_wait_counts;
146+
147+
SpinLockAcquire(&inj_state->lock);
148+
new_wait_counts=inj_state->wait_counts[index];
149+
SpinLockRelease(&inj_state->lock);
150+
151+
if (old_wait_counts!=new_wait_counts)
152+
break;
153+
ConditionVariableSleep(&inj_state->wait_point,injection_wait_event);
154+
}
155+
ConditionVariableCancelSleep();
156+
157+
/* Remove this injection point from the waiters. */
158+
SpinLockAcquire(&inj_state->lock);
159+
inj_state->name[index][0]='\0';
160+
SpinLockRelease(&inj_state->lock);
161+
}
162+
46163
/*
47164
* SQL function for creating an injection point.
48165
*/
@@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
58175
function="injection_error";
59176
elseif (strcmp(action,"notice")==0)
60177
function="injection_notice";
178+
elseif (strcmp(action,"wait")==0)
179+
function="injection_wait";
61180
else
62181
elog(ERROR,"incorrect action \"%s\" for injection point creation",action);
63182

@@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS)
80199
PG_RETURN_VOID();
81200
}
82201

202+
/*
203+
* SQL function for waking up an injection point waiting in injection_wait().
204+
*/
205+
PG_FUNCTION_INFO_V1(injection_points_wakeup);
206+
Datum
207+
injection_points_wakeup(PG_FUNCTION_ARGS)
208+
{
209+
char*name=text_to_cstring(PG_GETARG_TEXT_PP(0));
210+
intindex=-1;
211+
212+
if (inj_state==NULL)
213+
injection_init_shmem();
214+
215+
/* First bump the wait counter for the injection point to wake up */
216+
SpinLockAcquire(&inj_state->lock);
217+
for (inti=0;i<INJ_MAX_WAIT;i++)
218+
{
219+
if (strcmp(name,inj_state->name[i])==0)
220+
{
221+
index=i;
222+
break;
223+
}
224+
}
225+
if (index<0)
226+
{
227+
SpinLockRelease(&inj_state->lock);
228+
elog(ERROR,"could not find injection point %s to wake up",name);
229+
}
230+
inj_state->wait_counts[index]++;
231+
SpinLockRelease(&inj_state->lock);
232+
233+
/* And broadcast the change to the waiters */
234+
ConditionVariableBroadcast(&inj_state->wait_point);
235+
PG_RETURN_VOID();
236+
}
237+
83238
/*
84239
* SQL function for dropping an injection point.
85240
*/

‎src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,7 @@ InitializeDSMForeignScan_function
12101210
InitializeWorkerForeignScan_function
12111211
InjectionPointCacheEntry
12121212
InjectionPointEntry
1213+
InjectionPointSharedState
12131214
InlineCodeBlock
12141215
InsertStmt
12151216
Instrumentation

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp