Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitda07a1e

Browse files
committed
Background worker processes
Background workers are postmaster subprocesses that run arbitraryuser-specified code. They can request shared memory access as well asbackend database connections; or they can just use plain libpq frontenddatabase connections.Modules listed in shared_preload_libraries can register backgroundworkers in their _PG_init() function; this is early enough that it's notnecessary to provide an extra GUC option, because the necessary extraresources can be allocated early on. Modules can install more than onebgworker, if necessary.Care is taken that these extra processes do not interfere with otherpostmaster tasks: only one such process is started on each ServerLoopiteration. This means a large number of them could be waiting to bestarted up and postmaster is still able to quickly service externalconnection requests. Also, shutdown sequence should not be impacted bya worker process that's reasonably well behaved (i.e. promptly respondsto termination signals.)The current implementation lets worker processes specify their starttime, i.e. at what point in the server startup process they are to bestarted: right after postmaster start (in which case they mustn't askfor shared memory access), when consistent state has been reached(useful during recovery in a HOT standby server), or when recovery hasterminated (i.e. when normal backends are allowed).In case of a bgworker crash, actions to take depend on registrationdata: if shared memory was requested, then all other connections aretaken down (as well as other bgworkers), just like it were a regularbackend crashing. The bgworker itself is restarted, too, within aconfigurable timeframe (which can be configured to be never).More features to add to this framework can be imagined without mucheffort, and have been discussed, but this seems good enough as a usefulunit already.An elementary sample module is supplied.Author: Álvaro HerreraThis patch is loosely based on prior patches submitted by KaiGai Kohei,and unsubmitted code by Simon Riggs.Reviewed by: KaiGai Kohei, Markus Wanner, Andres Freund,Heikki Linnakangas, Simon Riggs, Amit Kapila
1 parente31d524 commitda07a1e

File tree

16 files changed

+1690
-90
lines changed

16 files changed

+1690
-90
lines changed

‎contrib/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ SUBDIRS = \
5050
test_parser\
5151
tsearch2\
5252
unaccent\
53-
vacuumlo
53+
vacuumlo\
54+
worker_spi
5455

5556
ifeq ($(with_openssl),yes)
5657
SUBDIRS += sslinfo

‎contrib/worker_spi/Makefile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# contrib/worker_spi/Makefile
2+
3+
MODULES = worker_spi
4+
5+
ifdefUSE_PGXS
6+
PG_CONFIG = pg_config
7+
PGXS :=$(shell$(PG_CONFIG) --pgxs)
8+
include$(PGXS)
9+
else
10+
subdir = contrib/worker_spi
11+
top_builddir = ../..
12+
include$(top_builddir)/src/Makefile.global
13+
include$(top_srcdir)/contrib/contrib-global.mk
14+
endif

‎contrib/worker_spi/worker_spi.c

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/* -------------------------------------------------------------------------
2+
*
3+
* worker_spi.c
4+
*Sample background worker code that demonstrates usage of a database
5+
*connection.
6+
*
7+
* This code connects to a database, create a schema and table, and summarizes
8+
* the numbers contained therein. To see it working, insert an initial value
9+
* with "total" type and some initial value; then insert some other rows with
10+
* "delta" type. Delta rows will be deleted by this worker and their values
11+
* aggregated into the total.
12+
*
13+
* Copyright (C) 2012, PostgreSQL Global Development Group
14+
*
15+
* IDENTIFICATION
16+
*contrib/worker_spi/worker_spi.c
17+
*
18+
* -------------------------------------------------------------------------
19+
*/
20+
#include"postgres.h"
21+
22+
/* These are always necessary for a bgworker */
23+
#include"miscadmin.h"
24+
#include"postmaster/bgworker.h"
25+
#include"storage/ipc.h"
26+
#include"storage/latch.h"
27+
#include"storage/lwlock.h"
28+
#include"storage/proc.h"
29+
#include"storage/shmem.h"
30+
31+
/* these headers are used by this particular worker's code */
32+
#include"access/xact.h"
33+
#include"executor/spi.h"
34+
#include"fmgr.h"
35+
#include"lib/stringinfo.h"
36+
#include"utils/builtins.h"
37+
#include"utils/snapmgr.h"
38+
39+
PG_MODULE_MAGIC;
40+
41+
void_PG_init(void);
42+
43+
staticboolgot_sigterm= false;
44+
45+
46+
typedefstructworktable
47+
{
48+
constchar*schema;
49+
constchar*name;
50+
}worktable;
51+
52+
staticvoid
53+
worker_spi_sigterm(SIGNAL_ARGS)
54+
{
55+
intsave_errno=errno;
56+
57+
got_sigterm= true;
58+
if (MyProc)
59+
SetLatch(&MyProc->procLatch);
60+
61+
errno=save_errno;
62+
}
63+
64+
staticvoid
65+
worker_spi_sighup(SIGNAL_ARGS)
66+
{
67+
elog(LOG,"got sighup!");
68+
if (MyProc)
69+
SetLatch(&MyProc->procLatch);
70+
}
71+
72+
staticvoid
73+
initialize_worker_spi(worktable*table)
74+
{
75+
intret;
76+
intntup;
77+
boolisnull;
78+
StringInfoDatabuf;
79+
80+
StartTransactionCommand();
81+
SPI_connect();
82+
PushActiveSnapshot(GetTransactionSnapshot());
83+
84+
initStringInfo(&buf);
85+
appendStringInfo(&buf,"select count(*) from pg_namespace where nspname = '%s'",
86+
table->schema);
87+
88+
ret=SPI_execute(buf.data, true,0);
89+
if (ret!=SPI_OK_SELECT)
90+
elog(FATAL,"SPI_execute failed: error code %d",ret);
91+
92+
if (SPI_processed!=1)
93+
elog(FATAL,"not a singleton result");
94+
95+
ntup=DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
96+
SPI_tuptable->tupdesc,
97+
1,&isnull));
98+
if (isnull)
99+
elog(FATAL,"null result");
100+
101+
if (ntup==0)
102+
{
103+
resetStringInfo(&buf);
104+
appendStringInfo(&buf,
105+
"CREATE SCHEMA \"%s\" "
106+
"CREATE TABLE \"%s\" ("
107+
"type text CHECK (type IN ('total', 'delta')), "
108+
"valueinteger)"
109+
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
110+
"WHERE type = 'total'",
111+
table->schema,table->name,table->name,table->name);
112+
113+
ret=SPI_execute(buf.data, false,0);
114+
115+
if (ret!=SPI_OK_UTILITY)
116+
elog(FATAL,"failed to create my schema");
117+
}
118+
119+
SPI_finish();
120+
PopActiveSnapshot();
121+
CommitTransactionCommand();
122+
}
123+
124+
staticvoid
125+
worker_spi_main(void*main_arg)
126+
{
127+
worktable*table= (worktable*)main_arg;
128+
StringInfoDatabuf;
129+
130+
/* We're now ready to receive signals */
131+
BackgroundWorkerUnblockSignals();
132+
133+
/* Connect to our database */
134+
BackgroundWorkerInitializeConnection("postgres",NULL);
135+
136+
elog(LOG,"%s initialized with %s.%s",
137+
MyBgworkerEntry->bgw_name,table->schema,table->name);
138+
initialize_worker_spi(table);
139+
140+
/*
141+
* Quote identifiers passed to us. Note that this must be done after
142+
* initialize_worker_spi, because that routine assumes the names are not
143+
* quoted.
144+
*
145+
* Note some memory might be leaked here.
146+
*/
147+
table->schema=quote_identifier(table->schema);
148+
table->name=quote_identifier(table->name);
149+
150+
initStringInfo(&buf);
151+
appendStringInfo(&buf,
152+
"WITH deleted AS (DELETE "
153+
"FROM %s.%s "
154+
"WHERE type = 'delta' RETURNING value), "
155+
"total AS (SELECT coalesce(sum(value), 0) as sum "
156+
"FROM deleted) "
157+
"UPDATE %s.%s "
158+
"SET value = %s.value + total.sum "
159+
"FROM total WHERE type = 'total' "
160+
"RETURNING %s.value",
161+
table->schema,table->name,
162+
table->schema,table->name,
163+
table->name,
164+
table->name);
165+
166+
while (!got_sigterm)
167+
{
168+
intret;
169+
intrc;
170+
171+
/*
172+
* Background workers mustn't call usleep() or any direct equivalent:
173+
* instead, they may wait on their process latch, which sleeps as
174+
* necessary, but is awakened if postmaster dies. That way the
175+
* background process goes away immediately in an emergency.
176+
*/
177+
rc=WaitLatch(&MyProc->procLatch,
178+
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,
179+
1000L);
180+
ResetLatch(&MyProc->procLatch);
181+
182+
/* emergency bailout if postmaster has died */
183+
if (rc&WL_POSTMASTER_DEATH)
184+
proc_exit(1);
185+
186+
StartTransactionCommand();
187+
SPI_connect();
188+
PushActiveSnapshot(GetTransactionSnapshot());
189+
190+
ret=SPI_execute(buf.data, false,0);
191+
192+
if (ret!=SPI_OK_UPDATE_RETURNING)
193+
elog(FATAL,"cannot select from table %s.%s: error code %d",
194+
table->schema,table->name,ret);
195+
196+
if (SPI_processed>0)
197+
{
198+
boolisnull;
199+
int32val;
200+
201+
val=DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
202+
SPI_tuptable->tupdesc,
203+
1,&isnull));
204+
if (!isnull)
205+
elog(LOG,"%s: count in %s.%s is now %d",
206+
MyBgworkerEntry->bgw_name,
207+
table->schema,table->name,val);
208+
}
209+
210+
SPI_finish();
211+
PopActiveSnapshot();
212+
CommitTransactionCommand();
213+
}
214+
215+
proc_exit(0);
216+
}
217+
218+
/*
219+
* Entrypoint of this module.
220+
*
221+
* We register two worker processes here, to demonstrate how that can be done.
222+
*/
223+
void
224+
_PG_init(void)
225+
{
226+
BackgroundWorkerworker;
227+
worktable*table;
228+
229+
/* register the worker processes. These values are common for both */
230+
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |
231+
BGWORKER_BACKEND_DATABASE_CONNECTION;
232+
worker.bgw_start_time=BgWorkerStart_RecoveryFinished;
233+
worker.bgw_main=worker_spi_main;
234+
worker.bgw_sighup=worker_spi_sighup;
235+
worker.bgw_sigterm=worker_spi_sigterm;
236+
237+
/*
238+
* These values are used for the first worker.
239+
*
240+
* Note these are palloc'd. The reason this works after starting a new
241+
* worker process is that if we only fork, they point to valid allocated
242+
* memory in the child process; and if we fork and then exec, the exec'd
243+
* process will run this code again, and so the memory is also valid there.
244+
*/
245+
table=palloc(sizeof(worktable));
246+
table->schema=pstrdup("schema1");
247+
table->name=pstrdup("counted");
248+
249+
worker.bgw_name="SPI worker 1";
250+
worker.bgw_restart_time=BGW_NEVER_RESTART;
251+
worker.bgw_main_arg= (void*)table;
252+
RegisterBackgroundWorker(&worker);
253+
254+
/* Values for the second worker */
255+
table=palloc(sizeof(worktable));
256+
table->schema=pstrdup("our schema2");
257+
table->name=pstrdup("counted rows");
258+
259+
worker.bgw_name="SPI worker 2";
260+
worker.bgw_restart_time=2;
261+
worker.bgw_main_arg= (void*)table;
262+
RegisterBackgroundWorker(&worker);
263+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp