Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite543631

Browse files
committed
Make worker_spi sample code more complete
Make use of some GUC variables, and add SIGHUP handling to reloadthe config file. Patch submitted by Guillaume Lelarge.Also, report to pg_stat_activity. Per report from Marc Cousin, addsetting of statement start time.
1 parent66c0170 commite543631

File tree

1 file changed

+120
-33
lines changed

1 file changed

+120
-33
lines changed

‎contrib/worker_spi/worker_spi.c

Lines changed: 120 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
/* -------------------------------------------------------------------------
22
*
33
* worker_spi.c
4-
*Sample background worker code that demonstrates usage of a database
5-
*connection.
4+
*Sample background worker code that demonstrates various coding
5+
*patterns: establishing a database connection; starting and committing
6+
*transactions; using GUC variables, and heeding SIGHUP to reread
7+
*the configuration file; reporting to pg_stat_activity; using the
8+
*process latch to sleep and exit in case of postmaster death.
69
*
7-
* This code connects to a database,create a schema and table, and summarizes
10+
* This code connects to a database,creates a schema and table, and summarizes
811
* the numbers contained therein. To see it working, insert an initial value
912
* with "total" type and some initial value; then insert some other rows with
1013
* "delta" type. Delta rows will be deleted by this worker and their values
1114
* aggregated into the total.
1215
*
13-
* Copyright (C)2012, PostgreSQL Global Development Group
16+
* Copyright (C)2013, PostgreSQL Global Development Group
1417
*
1518
* IDENTIFICATION
1619
*contrib/worker_spi/worker_spi.c
@@ -33,14 +36,22 @@
3336
#include"executor/spi.h"
3437
#include"fmgr.h"
3538
#include"lib/stringinfo.h"
39+
#include"pgstat.h"
3640
#include"utils/builtins.h"
3741
#include"utils/snapmgr.h"
42+
#include"tcop/utility.h"
3843

3944
PG_MODULE_MAGIC;
4045

4146
void_PG_init(void);
4247

43-
staticboolgot_sigterm= false;
48+
/* flags set by signal handlers */
49+
staticvolatilesig_atomic_tgot_sighup= false;
50+
staticvolatilesig_atomic_tgot_sigterm= false;
51+
52+
/* GUC variables */
53+
staticintworker_spi_naptime=10;
54+
staticintworker_spi_total_workers=2;
4455

4556

4657
typedefstructworktable
@@ -49,6 +60,11 @@ typedef struct worktable
4960
constchar*name;
5061
}worktable;
5162

63+
/*
64+
* Signal handler for SIGTERM
65+
* Set a flag to let the main loop to terminate, and set our latch to wake
66+
* it up.
67+
*/
5268
staticvoid
5369
worker_spi_sigterm(SIGNAL_ARGS)
5470
{
@@ -61,14 +77,23 @@ worker_spi_sigterm(SIGNAL_ARGS)
6177
errno=save_errno;
6278
}
6379

80+
/*
81+
* Signal handler for SIGHUP
82+
* Set a flag to let the main loop to reread the config file, and set
83+
* our latch to wake it up.
84+
*/
6485
staticvoid
6586
worker_spi_sighup(SIGNAL_ARGS)
6687
{
67-
elog(LOG,"got sighup!");
88+
got_sighup= true;
6889
if (MyProc)
6990
SetLatch(&MyProc->procLatch);
7091
}
7192

93+
/*
94+
* Initialize workspace for a worker process: create the schema if it doesn't
95+
* already exist.
96+
*/
7297
staticvoid
7398
initialize_worker_spi(worktable*table)
7499
{
@@ -77,10 +102,13 @@ initialize_worker_spi(worktable *table)
77102
boolisnull;
78103
StringInfoDatabuf;
79104

105+
SetCurrentStatementStartTimestamp();
80106
StartTransactionCommand();
81107
SPI_connect();
82108
PushActiveSnapshot(GetTransactionSnapshot());
109+
pgstat_report_activity(STATE_RUNNING,"initializing spi_worker schema");
83110

111+
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
84112
initStringInfo(&buf);
85113
appendStringInfo(&buf,"select count(*) from pg_namespace where nspname = '%s'",
86114
table->schema);
@@ -110,6 +138,9 @@ initialize_worker_spi(worktable *table)
110138
"WHERE type = 'total'",
111139
table->schema,table->name,table->name,table->name);
112140

141+
/* set statement start time */
142+
SetCurrentStatementStartTimestamp();
143+
113144
ret=SPI_execute(buf.data, false,0);
114145

115146
if (ret!=SPI_OK_UTILITY)
@@ -119,6 +150,7 @@ initialize_worker_spi(worktable *table)
119150
SPI_finish();
120151
PopActiveSnapshot();
121152
CommitTransactionCommand();
153+
pgstat_report_activity(STATE_IDLE,NULL);
122154
}
123155

124156
staticvoid
@@ -163,6 +195,9 @@ worker_spi_main(void *main_arg)
163195
table->name,
164196
table->name);
165197

198+
/*
199+
* Main loop: do this until the SIGTERM handler tells us to terminate
200+
*/
166201
while (!got_sigterm)
167202
{
168203
intret;
@@ -176,17 +211,45 @@ worker_spi_main(void *main_arg)
176211
*/
177212
rc=WaitLatch(&MyProc->procLatch,
178213
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,
179-
1000L);
214+
worker_spi_naptime*1000L);
180215
ResetLatch(&MyProc->procLatch);
181216

182217
/* emergency bailout if postmaster has died */
183218
if (rc&WL_POSTMASTER_DEATH)
184219
proc_exit(1);
185220

221+
/*
222+
* In case of a SIGHUP, just reload the configuration.
223+
*/
224+
if (got_sighup)
225+
{
226+
got_sighup= false;
227+
ProcessConfigFile(PGC_SIGHUP);
228+
}
229+
230+
/*
231+
* Start a transaction on which we can run queries. Note that each
232+
* StartTransactionCommand() call should be preceded by a
233+
* SetCurrentStatementStartTimestamp() call, which sets both the time
234+
* for the statement we're about the run, and also the transaction
235+
* start time. Also, each other query sent to SPI should probably be
236+
* preceded by SetCurrentStatementStartTimestamp(), so that statement
237+
* start time is always up to date.
238+
*
239+
* The SPI_connect() call lets us run queries through the SPI manager,
240+
* and the PushActiveSnapshot() call creates an "active" snapshot which
241+
* is necessary for queries to have MVCC data to work on.
242+
*
243+
* The pgstat_report_activity() call makes our activity visible through
244+
* the pgstat views.
245+
*/
246+
SetCurrentStatementStartTimestamp();
186247
StartTransactionCommand();
187248
SPI_connect();
188249
PushActiveSnapshot(GetTransactionSnapshot());
250+
pgstat_report_activity(STATE_RUNNING,buf.data);
189251

252+
/* We can now execute queries via SPI */
190253
ret=SPI_execute(buf.data, false,0);
191254

192255
if (ret!=SPI_OK_UPDATE_RETURNING)
@@ -207,9 +270,13 @@ worker_spi_main(void *main_arg)
207270
table->schema,table->name,val);
208271
}
209272

273+
/*
274+
* And finish our transaction.
275+
*/
210276
SPI_finish();
211277
PopActiveSnapshot();
212278
CommitTransactionCommand();
279+
pgstat_report_activity(STATE_IDLE,NULL);
213280
}
214281

215282
proc_exit(0);
@@ -218,46 +285,66 @@ worker_spi_main(void *main_arg)
218285
/*
219286
* Entrypoint of this module.
220287
*
221-
* We register two worker processes here, to demonstrate how that can be done.
288+
* We register more than one worker process here, to demonstrate how that can
289+
* be done.
222290
*/
223291
void
224292
_PG_init(void)
225293
{
226294
BackgroundWorkerworker;
227295
worktable*table;
228-
229-
/* register the worker processes. These values are common for both */
296+
unsignedinti;
297+
charname[20];
298+
299+
/* get the configuration */
300+
DefineCustomIntVariable("worker_spi.naptime",
301+
"Duration between each check (in seconds).",
302+
NULL,
303+
&worker_spi_naptime,
304+
10,
305+
1,
306+
INT_MAX,
307+
PGC_SIGHUP,
308+
0,
309+
NULL,
310+
NULL,
311+
NULL);
312+
DefineCustomIntVariable("worker_spi.total_workers",
313+
"Number of workers.",
314+
NULL,
315+
&worker_spi_total_workers,
316+
2,
317+
1,
318+
100,
319+
PGC_POSTMASTER,
320+
0,
321+
NULL,
322+
NULL,
323+
NULL);
324+
325+
/* set up common data for all our workers */
230326
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |
231327
BGWORKER_BACKEND_DATABASE_CONNECTION;
232328
worker.bgw_start_time=BgWorkerStart_RecoveryFinished;
329+
worker.bgw_restart_time=BGW_NEVER_RESTART;
233330
worker.bgw_main=worker_spi_main;
234331
worker.bgw_sighup=worker_spi_sighup;
235332
worker.bgw_sigterm=worker_spi_sigterm;
236333

237334
/*
238-
* These values are used for the first worker.
239-
*
240-
* Note these are palloc'd. The reason this works after starting a new
241-
* worker process is that if we only fork, they point to valid allocated
242-
* memory in the child process; and if we fork and then exec, the exec'd
243-
* process will run this code again, and so the memory is also valid there.
335+
* Now fill in worker-specific data, and do the actual registrations.
244336
*/
245-
table=palloc(sizeof(worktable));
246-
table->schema=pstrdup("schema1");
247-
table->name=pstrdup("counted");
337+
for (i=1;i <=worker_spi_total_workers;i++)
338+
{
339+
sprintf(name,"worker %d",i);
340+
worker.bgw_name=pstrdup(name);
248341

249-
worker.bgw_name="SPI worker 1";
250-
worker.bgw_restart_time=BGW_NEVER_RESTART;
251-
worker.bgw_main_arg= (void*)table;
252-
RegisterBackgroundWorker(&worker);
253-
254-
/* Values for the second worker */
255-
table=palloc(sizeof(worktable));
256-
table->schema=pstrdup("our schema2");
257-
table->name=pstrdup("counted rows");
258-
259-
worker.bgw_name="SPI worker 2";
260-
worker.bgw_restart_time=2;
261-
worker.bgw_main_arg= (void*)table;
262-
RegisterBackgroundWorker(&worker);
342+
table=palloc(sizeof(worktable));
343+
sprintf(name,"schema%d",i);
344+
table->schema=pstrdup(name);
345+
table->name=pstrdup("counted");
346+
worker.bgw_main_arg= (void*)table;
347+
348+
RegisterBackgroundWorker(&worker);
349+
}
263350
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp