Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1cdc6d8

Browse files
author
Amit Kapila
committed
Simplify the logical worker type checks by using the switch on worker type.
The current code uses if/else statements at various places to take workerspecific actions. Change those to use the switch on worker type added bycommit2a8b40e. This makes code easier to read and understand.Author: Peter SmithReviewed-by: Amit Kapila, Hou ZhijieDiscussion:http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
1 parent6fde2d9 commit1cdc6d8

File tree

3 files changed

+79
-54
lines changed

3 files changed

+79
-54
lines changed

‎src/backend/replication/logical/launcher.c

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -468,39 +468,44 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
468468
bgw.bgw_start_time=BgWorkerStart_RecoveryFinished;
469469
snprintf(bgw.bgw_library_name,MAXPGPATH,"postgres");
470470

471-
if (is_parallel_apply_worker)
471+
switch (worker->type)
472472
{
473-
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"ParallelApplyWorkerMain");
474-
snprintf(bgw.bgw_name,BGW_MAXLEN,
475-
"logical replication parallel apply worker for subscription %u",
476-
subid);
477-
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication parallel worker");
478-
}
479-
elseif (is_tablesync_worker)
480-
{
481-
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"TablesyncWorkerMain");
482-
snprintf(bgw.bgw_name,BGW_MAXLEN,
483-
"logical replication tablesync worker for subscription %u sync %u",
484-
subid,
485-
relid);
486-
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication tablesync worker");
487-
}
488-
else
489-
{
490-
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"ApplyWorkerMain");
491-
snprintf(bgw.bgw_name,BGW_MAXLEN,
492-
"logical replication apply worker for subscription %u",
493-
subid);
494-
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication apply worker");
473+
caseWORKERTYPE_APPLY:
474+
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"ApplyWorkerMain");
475+
snprintf(bgw.bgw_name,BGW_MAXLEN,
476+
"logical replication apply worker for subscription %u",
477+
subid);
478+
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication apply worker");
479+
break;
480+
481+
caseWORKERTYPE_PARALLEL_APPLY:
482+
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"ParallelApplyWorkerMain");
483+
snprintf(bgw.bgw_name,BGW_MAXLEN,
484+
"logical replication parallel apply worker for subscription %u",
485+
subid);
486+
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication parallel worker");
487+
488+
memcpy(bgw.bgw_extra,&subworker_dsm,sizeof(dsm_handle));
489+
break;
490+
491+
caseWORKERTYPE_TABLESYNC:
492+
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"TablesyncWorkerMain");
493+
snprintf(bgw.bgw_name,BGW_MAXLEN,
494+
"logical replication tablesync worker for subscription %u sync %u",
495+
subid,
496+
relid);
497+
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication tablesync worker");
498+
break;
499+
500+
caseWORKERTYPE_UNKNOWN:
501+
/* Should never happen. */
502+
elog(ERROR,"unknown worker type");
495503
}
496504

497505
bgw.bgw_restart_time=BGW_NEVER_RESTART;
498506
bgw.bgw_notify_pid=MyProcPid;
499507
bgw.bgw_main_arg=Int32GetDatum(slot);
500508

501-
if (is_parallel_apply_worker)
502-
memcpy(bgw.bgw_extra,&subworker_dsm,sizeof(dsm_handle));
503-
504509
if (!RegisterDynamicBackgroundWorker(&bgw,&bgw_handle))
505510
{
506511
/* Failed to start worker, so clean up the worker slot. */

‎src/backend/replication/logical/tablesync.c

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -649,18 +649,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
649649
void
650650
process_syncing_tables(XLogRecPtrcurrent_lsn)
651651
{
652-
/*
653-
* Skip for parallel apply workers because they only operate on tables
654-
* that are in a READY state. See pa_can_start() and
655-
* should_apply_changes_for_rel().
656-
*/
657-
if (am_parallel_apply_worker())
658-
return;
652+
switch (MyLogicalRepWorker->type)
653+
{
654+
caseWORKERTYPE_PARALLEL_APPLY:
659655

660-
if (am_tablesync_worker())
661-
process_syncing_tables_for_sync(current_lsn);
662-
else
663-
process_syncing_tables_for_apply(current_lsn);
656+
/*
657+
* Skip for parallel apply workers because they only operate on
658+
* tables that are in a READY state. See pa_can_start() and
659+
* should_apply_changes_for_rel().
660+
*/
661+
break;
662+
663+
caseWORKERTYPE_TABLESYNC:
664+
process_syncing_tables_for_sync(current_lsn);
665+
break;
666+
667+
caseWORKERTYPE_APPLY:
668+
process_syncing_tables_for_apply(current_lsn);
669+
break;
670+
671+
caseWORKERTYPE_UNKNOWN:
672+
/* Should never happen. */
673+
elog(ERROR,"Unknown worker type");
674+
}
664675
}
665676

666677
/*

‎src/backend/replication/logical/worker.c

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -485,25 +485,34 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
485485
staticbool
486486
should_apply_changes_for_rel(LogicalRepRelMapEntry*rel)
487487
{
488-
if (am_tablesync_worker())
489-
returnMyLogicalRepWorker->relid==rel->localreloid;
490-
elseif (am_parallel_apply_worker())
488+
switch (MyLogicalRepWorker->type)
491489
{
492-
/* We don't synchronize rel's that are in unknown state. */
493-
if (rel->state!=SUBREL_STATE_READY&&
494-
rel->state!=SUBREL_STATE_UNKNOWN)
495-
ereport(ERROR,
496-
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
497-
errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
498-
MySubscription->name),
499-
errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
500-
501-
returnrel->state==SUBREL_STATE_READY;
490+
caseWORKERTYPE_TABLESYNC:
491+
returnMyLogicalRepWorker->relid==rel->localreloid;
492+
493+
caseWORKERTYPE_PARALLEL_APPLY:
494+
/* We don't synchronize rel's that are in unknown state. */
495+
if (rel->state!=SUBREL_STATE_READY&&
496+
rel->state!=SUBREL_STATE_UNKNOWN)
497+
ereport(ERROR,
498+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
499+
errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
500+
MySubscription->name),
501+
errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
502+
503+
returnrel->state==SUBREL_STATE_READY;
504+
505+
caseWORKERTYPE_APPLY:
506+
return (rel->state==SUBREL_STATE_READY||
507+
(rel->state==SUBREL_STATE_SYNCDONE&&
508+
rel->statelsn <=remote_final_lsn));
509+
510+
caseWORKERTYPE_UNKNOWN:
511+
/* Should never happen. */
512+
elog(ERROR,"Unknown worker type");
502513
}
503-
else
504-
return (rel->state==SUBREL_STATE_READY||
505-
(rel->state==SUBREL_STATE_SYNCDONE&&
506-
rel->statelsn <=remote_final_lsn));
514+
515+
return false;/* dummy for compiler */
507516
}
508517

509518
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp