11/* -------------------------------------------------------------------------
22 *
33 * worker_spi.c
4- *Sample background worker code that demonstrates usage of a database
5- *connection.
4+ *Sample background worker code that demonstrates various coding
5+ *patterns: establishing a database connection; starting and committing
6+ *transactions; using GUC variables, and heeding SIGHUP to reread
7+ *the configuration file; reporting to pg_stat_activity; using the
8+ *process latch to sleep and exit in case of postmaster death.
69 *
7- * This code connects to a database,create a schema and table, and summarizes
10+ * This code connects to a database,creates a schema and table, and summarizes
811 * the numbers contained therein. To see it working, insert an initial value
912 * with "total" type and some initial value; then insert some other rows with
1013 * "delta" type. Delta rows will be deleted by this worker and their values
1114 * aggregated into the total.
1215 *
13- * Copyright (C)2012 , PostgreSQL Global Development Group
16+ * Copyright (C)2013 , PostgreSQL Global Development Group
1417 *
1518 * IDENTIFICATION
1619 *contrib/worker_spi/worker_spi.c
3336#include "executor/spi.h"
3437#include "fmgr.h"
3538#include "lib/stringinfo.h"
39+ #include "pgstat.h"
3640#include "utils/builtins.h"
3741#include "utils/snapmgr.h"
42+ #include "tcop/utility.h"
3843
3944PG_MODULE_MAGIC ;
4045
4146void _PG_init (void );
4247
43- static bool got_sigterm = false;
48+ /* flags set by signal handlers */
49+ static volatile sig_atomic_t got_sighup = false;
50+ static volatile sig_atomic_t got_sigterm = false;
51+
52+ /* GUC variables */
53+ static int worker_spi_naptime = 10 ;
54+ static int worker_spi_total_workers = 2 ;
4455
4556
4657typedef struct worktable
@@ -49,6 +60,11 @@ typedef struct worktable
4960const char * name ;
5061}worktable ;
5162
63+ /*
64+ * Signal handler for SIGTERM
65+ * Set a flag to let the main loop to terminate, and set our latch to wake
66+ * it up.
67+ */
5268static void
5369worker_spi_sigterm (SIGNAL_ARGS )
5470{
@@ -61,14 +77,23 @@ worker_spi_sigterm(SIGNAL_ARGS)
6177errno = save_errno ;
6278}
6379
80+ /*
81+ * Signal handler for SIGHUP
82+ * Set a flag to let the main loop to reread the config file, and set
83+ * our latch to wake it up.
84+ */
6485static void
6586worker_spi_sighup (SIGNAL_ARGS )
6687{
67- elog ( LOG , "got sighup!" ) ;
88+ got_sighup = true ;
6889if (MyProc )
6990SetLatch (& MyProc -> procLatch );
7091}
7192
93+ /*
94+ * Initialize workspace for a worker process: create the schema if it doesn't
95+ * already exist.
96+ */
7297static void
7398initialize_worker_spi (worktable * table )
7499{
@@ -77,10 +102,13 @@ initialize_worker_spi(worktable *table)
77102bool isnull ;
78103StringInfoData buf ;
79104
105+ SetCurrentStatementStartTimestamp ();
80106StartTransactionCommand ();
81107SPI_connect ();
82108PushActiveSnapshot (GetTransactionSnapshot ());
109+ pgstat_report_activity (STATE_RUNNING ,"initializing spi_worker schema" );
83110
111+ /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
84112initStringInfo (& buf );
85113appendStringInfo (& buf ,"select count(*) from pg_namespace where nspname = '%s'" ,
86114table -> schema );
@@ -110,6 +138,9 @@ initialize_worker_spi(worktable *table)
110138"WHERE type = 'total'" ,
111139table -> schema ,table -> name ,table -> name ,table -> name );
112140
141+ /* set statement start time */
142+ SetCurrentStatementStartTimestamp ();
143+
113144ret = SPI_execute (buf .data , false,0 );
114145
115146if (ret != SPI_OK_UTILITY )
@@ -119,6 +150,7 @@ initialize_worker_spi(worktable *table)
119150SPI_finish ();
120151PopActiveSnapshot ();
121152CommitTransactionCommand ();
153+ pgstat_report_activity (STATE_IDLE ,NULL );
122154}
123155
124156static void
@@ -163,6 +195,9 @@ worker_spi_main(void *main_arg)
163195table -> name ,
164196table -> name );
165197
198+ /*
199+ * Main loop: do this until the SIGTERM handler tells us to terminate
200+ */
166201while (!got_sigterm )
167202{
168203int ret ;
@@ -176,17 +211,45 @@ worker_spi_main(void *main_arg)
176211 */
177212rc = WaitLatch (& MyProc -> procLatch ,
178213WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,
179- 1000L );
214+ worker_spi_naptime * 1000L );
180215ResetLatch (& MyProc -> procLatch );
181216
182217/* emergency bailout if postmaster has died */
183218if (rc & WL_POSTMASTER_DEATH )
184219proc_exit (1 );
185220
221+ /*
222+ * In case of a SIGHUP, just reload the configuration.
223+ */
224+ if (got_sighup )
225+ {
226+ got_sighup = false;
227+ ProcessConfigFile (PGC_SIGHUP );
228+ }
229+
230+ /*
231+ * Start a transaction on which we can run queries. Note that each
232+ * StartTransactionCommand() call should be preceded by a
233+ * SetCurrentStatementStartTimestamp() call, which sets both the time
234+ * for the statement we're about the run, and also the transaction
235+ * start time. Also, each other query sent to SPI should probably be
236+ * preceded by SetCurrentStatementStartTimestamp(), so that statement
237+ * start time is always up to date.
238+ *
239+ * The SPI_connect() call lets us run queries through the SPI manager,
240+ * and the PushActiveSnapshot() call creates an "active" snapshot which
241+ * is necessary for queries to have MVCC data to work on.
242+ *
243+ * The pgstat_report_activity() call makes our activity visible through
244+ * the pgstat views.
245+ */
246+ SetCurrentStatementStartTimestamp ();
186247StartTransactionCommand ();
187248SPI_connect ();
188249PushActiveSnapshot (GetTransactionSnapshot ());
250+ pgstat_report_activity (STATE_RUNNING ,buf .data );
189251
252+ /* We can now execute queries via SPI */
190253ret = SPI_execute (buf .data , false,0 );
191254
192255if (ret != SPI_OK_UPDATE_RETURNING )
@@ -207,9 +270,13 @@ worker_spi_main(void *main_arg)
207270table -> schema ,table -> name ,val );
208271}
209272
273+ /*
274+ * And finish our transaction.
275+ */
210276SPI_finish ();
211277PopActiveSnapshot ();
212278CommitTransactionCommand ();
279+ pgstat_report_activity (STATE_IDLE ,NULL );
213280}
214281
215282proc_exit (0 );
@@ -218,46 +285,66 @@ worker_spi_main(void *main_arg)
218285/*
219286 * Entrypoint of this module.
220287 *
221- * We register two worker processes here, to demonstrate how that can be done.
288+ * We register more than one worker process here, to demonstrate how that can
289+ * be done.
222290 */
223291void
224292_PG_init (void )
225293{
226294BackgroundWorker worker ;
227295worktable * table ;
228-
229- /* register the worker processes. These values are common for both */
296+ unsignedint i ;
297+ char name [20 ];
298+
299+ /* get the configuration */
300+ DefineCustomIntVariable ("worker_spi.naptime" ,
301+ "Duration between each check (in seconds)." ,
302+ NULL ,
303+ & worker_spi_naptime ,
304+ 10 ,
305+ 1 ,
306+ INT_MAX ,
307+ PGC_SIGHUP ,
308+ 0 ,
309+ NULL ,
310+ NULL ,
311+ NULL );
312+ DefineCustomIntVariable ("worker_spi.total_workers" ,
313+ "Number of workers." ,
314+ NULL ,
315+ & worker_spi_total_workers ,
316+ 2 ,
317+ 1 ,
318+ 100 ,
319+ PGC_POSTMASTER ,
320+ 0 ,
321+ NULL ,
322+ NULL ,
323+ NULL );
324+
325+ /* set up common data for all our workers */
230326worker .bgw_flags = BGWORKER_SHMEM_ACCESS |
231327BGWORKER_BACKEND_DATABASE_CONNECTION ;
232328worker .bgw_start_time = BgWorkerStart_RecoveryFinished ;
329+ worker .bgw_restart_time = BGW_NEVER_RESTART ;
233330worker .bgw_main = worker_spi_main ;
234331worker .bgw_sighup = worker_spi_sighup ;
235332worker .bgw_sigterm = worker_spi_sigterm ;
236333
237334/*
238- * These values are used for the first worker.
239- *
240- * Note these are palloc'd. The reason this works after starting a new
241- * worker process is that if we only fork, they point to valid allocated
242- * memory in the child process; and if we fork and then exec, the exec'd
243- * process will run this code again, and so the memory is also valid there.
335+ * Now fill in worker-specific data, and do the actual registrations.
244336 */
245- table = palloc (sizeof (worktable ));
246- table -> schema = pstrdup ("schema1" );
247- table -> name = pstrdup ("counted" );
337+ for (i = 1 ;i <=worker_spi_total_workers ;i ++ )
338+ {
339+ sprintf (name ,"worker %d" ,i );
340+ worker .bgw_name = pstrdup (name );
248341
249- worker .bgw_name = "SPI worker 1" ;
250- worker .bgw_restart_time = BGW_NEVER_RESTART ;
251- worker .bgw_main_arg = (void * )table ;
252- RegisterBackgroundWorker (& worker );
253-
254- /* Values for the second worker */
255- table = palloc (sizeof (worktable ));
256- table -> schema = pstrdup ("our schema2" );
257- table -> name = pstrdup ("counted rows" );
258-
259- worker .bgw_name = "SPI worker 2" ;
260- worker .bgw_restart_time = 2 ;
261- worker .bgw_main_arg = (void * )table ;
262- RegisterBackgroundWorker (& worker );
342+ table = palloc (sizeof (worktable ));
343+ sprintf (name ,"schema%d" ,i );
344+ table -> schema = pstrdup (name );
345+ table -> name = pstrdup ("counted" );
346+ worker .bgw_main_arg = (void * )table ;
347+
348+ RegisterBackgroundWorker (& worker );
349+ }
263350}