Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2a8b40e

Browse files
author
Amit Kapila
committed
Simplify determining logical replication worker types.
We deduce a LogicalRepWorker's type from the values of several differentfields ('relid' and 'leader_pid') whenever logic needs to know it.In fact, the logical replication worker type is already known at the timeof launching the LogicalRepWorker and it never changes for the lifetime ofthat process. Instead of deducing the type, it is simpler to just store itone time, and access it directly thereafter.Author: Peter SmithReviewed-by: Amit Kapila, Bharath RupireddyDiscussion:http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
1 parent3d8d217 commit2a8b40e

File tree

5 files changed

+43
-17
lines changed

5 files changed

+43
-17
lines changed

‎src/backend/replication/logical/applyparallelworker.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
435435
returnNULL;
436436
}
437437

438-
launched=logicalrep_worker_launch(MyLogicalRepWorker->dbid,
438+
launched=logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
439+
MyLogicalRepWorker->dbid,
439440
MySubscription->oid,
440441
MySubscription->name,
441442
MyLogicalRepWorker->userid,

‎src/backend/replication/logical/launcher.c

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
303303
* Returns true on success, false on failure.
304304
*/
305305
bool
306-
logicalrep_worker_launch(Oiddbid,Oidsubid,constchar*subname,Oiduserid,
306+
logicalrep_worker_launch(LogicalRepWorkerTypewtype,
307+
Oiddbid,Oidsubid,constchar*subname,Oiduserid,
307308
Oidrelid,dsm_handlesubworker_dsm)
308309
{
309310
BackgroundWorkerbgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
315316
intnsyncworkers;
316317
intnparallelapplyworkers;
317318
TimestampTznow;
318-
boolis_parallel_apply_worker= (subworker_dsm!=DSM_HANDLE_INVALID);
319-
320-
/* Sanity check - tablesync worker cannot be a subworker */
321-
Assert(!(is_parallel_apply_worker&&OidIsValid(relid)));
319+
boolis_tablesync_worker= (wtype==WORKERTYPE_TABLESYNC);
320+
boolis_parallel_apply_worker= (wtype==WORKERTYPE_PARALLEL_APPLY);
321+
322+
/*----------
323+
* Sanity checks:
324+
* - must be valid worker type
325+
* - tablesync workers are only ones to have relid
326+
* - parallel apply worker is the only kind of subworker
327+
*/
328+
Assert(wtype!=WORKERTYPE_UNKNOWN);
329+
Assert(is_tablesync_worker==OidIsValid(relid));
330+
Assert(is_parallel_apply_worker== (subworker_dsm!=DSM_HANDLE_INVALID));
322331

323332
ereport(DEBUG1,
324333
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
393402
* sync worker limit per subscription. So, just return silently as we
394403
* might get here because of an otherwise harmless race condition.
395404
*/
396-
if (OidIsValid(relid)&&nsyncworkers >=max_sync_workers_per_subscription)
405+
if (is_tablesync_worker&&nsyncworkers >=max_sync_workers_per_subscription)
397406
{
398407
LWLockRelease(LogicalRepWorkerLock);
399408
return false;
@@ -427,6 +436,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
427436
}
428437

429438
/* Prepare the worker slot. */
439+
worker->type=wtype;
430440
worker->launch_time=now;
431441
worker->in_use= true;
432442
worker->generation++;
@@ -466,7 +476,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
466476
subid);
467477
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication parallel worker");
468478
}
469-
elseif (OidIsValid(relid))
479+
elseif (is_tablesync_worker)
470480
{
471481
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"TablesyncWorkerMain");
472482
snprintf(bgw.bgw_name,BGW_MAXLEN,
@@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
847857
{
848858
LogicalRepWorker*w=&LogicalRepCtx->workers[i];
849859

850-
if (w->subid==subid&&OidIsValid(w->relid))
860+
if (w->subid==subid&&isTablesyncWorker(w))
851861
res++;
852862
}
853863

@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
11801190
(elapsed=TimestampDifferenceMilliseconds(last_start,now)) >=wal_retrieve_retry_interval)
11811191
{
11821192
ApplyLauncherSetWorkerStartTime(sub->oid,now);
1183-
logicalrep_worker_launch(sub->dbid,sub->oid,sub->name,
1193+
logicalrep_worker_launch(WORKERTYPE_APPLY,
1194+
sub->dbid,sub->oid,sub->name,
11841195
sub->owner,InvalidOid,
11851196
DSM_HANDLE_INVALID);
11861197
}
@@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
12901301
worker_pid=worker.proc->pid;
12911302

12921303
values[0]=ObjectIdGetDatum(worker.subid);
1293-
if (OidIsValid(worker.relid))
1304+
if (isTablesyncWorker(&worker))
12941305
values[1]=ObjectIdGetDatum(worker.relid);
12951306
else
12961307
nulls[1]= true;

‎src/backend/replication/logical/tablesync.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
587587
TimestampDifferenceExceeds(hentry->last_start_time,now,
588588
wal_retrieve_retry_interval))
589589
{
590-
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
590+
logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
591+
MyLogicalRepWorker->dbid,
591592
MySubscription->oid,
592593
MySubscription->name,
593594
MyLogicalRepWorker->userid,

‎src/include/replication/worker_internal.h

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,20 @@
2727
#include"storage/shm_toc.h"
2828
#include"storage/spin.h"
2929

30+
/* Different types of worker */
31+
typedefenumLogicalRepWorkerType
32+
{
33+
WORKERTYPE_UNKNOWN=0,
34+
WORKERTYPE_TABLESYNC,
35+
WORKERTYPE_APPLY,
36+
WORKERTYPE_PARALLEL_APPLY
37+
}LogicalRepWorkerType;
3038

3139
typedefstructLogicalRepWorker
3240
{
41+
/* What type of worker is this? */
42+
LogicalRepWorkerTypetype;
43+
3344
/* Time at which this worker was launched. */
3445
TimestampTzlaunch_time;
3546

@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
232243
externLogicalRepWorker*logicalrep_worker_find(Oidsubid,Oidrelid,
233244
boolonly_running);
234245
externList*logicalrep_workers_find(Oidsubid,boolonly_running);
235-
externboollogicalrep_worker_launch(Oiddbid,Oidsubid,constchar*subname,
246+
externboollogicalrep_worker_launch(LogicalRepWorkerTypewtype,
247+
Oiddbid,Oidsubid,constchar*subname,
236248
Oiduserid,Oidrelid,
237249
dsm_handlesubworker_dsm);
238250
externvoidlogicalrep_worker_stop(Oidsubid,Oidrelid);
@@ -315,19 +327,19 @@ extern void pa_decr_and_wait_stream_block(void);
315327
externvoidpa_xact_finish(ParallelApplyWorkerInfo*winfo,
316328
XLogRecPtrremote_lsn);
317329

318-
#defineisParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
330+
#defineisParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
331+
#defineisTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
319332

320333
staticinlinebool
321334
am_tablesync_worker(void)
322335
{
323-
returnOidIsValid(MyLogicalRepWorker->relid);
336+
returnisTablesyncWorker(MyLogicalRepWorker);
324337
}
325338

326339
staticinlinebool
327340
am_leader_apply_worker(void)
328341
{
329-
return (!am_tablesync_worker()&&
330-
!isParallelApplyWorker(MyLogicalRepWorker));
342+
return (MyLogicalRepWorker->type==WORKERTYPE_APPLY);
331343
}
332344

333345
staticinlinebool

‎src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
15001500
LogicalRepTupleData
15011501
LogicalRepTyp
15021502
LogicalRepWorker
1503+
LogicalRepWorkerType
15031504
LogicalRewriteMappingData
15041505
LogicalTape
15051506
LogicalTapeSet

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp