Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit644ea35

Browse files
committed
Fix updating of pg_subscription_rel from workers
A logical replication worker should not insert new rows intopg_subscription_rel, only update existing rows, so that there are noraces if a concurrent refresh removes rows. Adjust the API to be ableto choose that behavior.Author: Masahiko Sawada <sawada.mshk@gmail.com>Reported-by: tushar <tushar.ahuja@enterprisedb.com>
1 parent15ce775 commit644ea35

File tree

4 files changed

+19
-11
lines changed

4 files changed

+19
-11
lines changed

‎src/backend/catalog/pg_subscription.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray)
227227
/*
228228
* Set the state of a subscription table.
229229
*
230+
* If update_only is true and the record for given table doesn't exist, do
231+
* nothing. This can be used to avoid inserting a new record that was deleted
232+
* by someone else. Generally, subscription DDL commands should use false,
233+
* workers should use true.
234+
*
230235
* The insert-or-update logic in this function is not concurrency safe so it
231236
* might raise an error in rare circumstances. But if we took a stronger lock
232237
* such as ShareRowExclusiveLock, we would risk more deadlocks.
233238
*/
234239
Oid
235240
SetSubscriptionRelState(Oidsubid,Oidrelid,charstate,
236-
XLogRecPtrsublsn)
241+
XLogRecPtrsublsn,boolupdate_only)
237242
{
238243
Relationrel;
239244
HeapTupletup;
240-
Oidsubrelid;
245+
Oidsubrelid=InvalidOid;
241246
boolnulls[Natts_pg_subscription_rel];
242247
Datumvalues[Natts_pg_subscription_rel];
243248

@@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
252257
* If the record for given table does not exist yet create new record,
253258
* otherwise update the existing one.
254259
*/
255-
if (!HeapTupleIsValid(tup))
260+
if (!HeapTupleIsValid(tup)&& !update_only)
256261
{
257262
/* Form the tuple. */
258263
memset(values,0,sizeof(values));
@@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
272277

273278
heap_freetuple(tup);
274279
}
275-
else
280+
elseif (HeapTupleIsValid(tup))
276281
{
277282
boolreplaces[Natts_pg_subscription_rel];
278283

‎src/backend/commands/subscriptioncmds.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
451451
rv->schemaname,rv->relname);
452452

453453
SetSubscriptionRelState(subid,relid,table_state,
454-
InvalidXLogRecPtr);
454+
InvalidXLogRecPtr, false);
455455
}
456456

457457
ereport(NOTICE,
@@ -574,7 +574,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
574574
{
575575
SetSubscriptionRelState(sub->oid,relid,
576576
copy_data ?SUBREL_STATE_INIT :SUBREL_STATE_READY,
577-
InvalidXLogRecPtr);
577+
InvalidXLogRecPtr, false);
578578
ereport(NOTICE,
579579
(errmsg("added subscription for table %s.%s",
580580
quote_identifier(rv->schemaname),

‎src/backend/replication/logical/tablesync.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
287287
SetSubscriptionRelState(MyLogicalRepWorker->subid,
288288
MyLogicalRepWorker->relid,
289289
MyLogicalRepWorker->relstate,
290-
MyLogicalRepWorker->relstate_lsn);
290+
MyLogicalRepWorker->relstate_lsn,
291+
true);
291292

292293
walrcv_endstreaming(wrconn,&tli);
293294
finish_sync_worker();
@@ -414,7 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
414415
}
415416
SetSubscriptionRelState(MyLogicalRepWorker->subid,
416417
rstate->relid,rstate->state,
417-
rstate->lsn);
418+
rstate->lsn, true);
418419
}
419420
}
420421
else
@@ -845,7 +846,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
845846
SetSubscriptionRelState(MyLogicalRepWorker->subid,
846847
MyLogicalRepWorker->relid,
847848
MyLogicalRepWorker->relstate,
848-
MyLogicalRepWorker->relstate_lsn);
849+
MyLogicalRepWorker->relstate_lsn,
850+
true);
849851
CommitTransactionCommand();
850852
pgstat_report_stat(false);
851853

@@ -932,7 +934,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
932934
SetSubscriptionRelState(MyLogicalRepWorker->subid,
933935
MyLogicalRepWorker->relid,
934936
SUBREL_STATE_SYNCDONE,
935-
*origin_startpos);
937+
*origin_startpos,
938+
true);
936939
finish_sync_worker();
937940
}
938941
break;

‎src/include/catalog/pg_subscription_rel.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ typedef struct SubscriptionRelState
7171
}SubscriptionRelState;
7272

7373
externOidSetSubscriptionRelState(Oidsubid,Oidrelid,charstate,
74-
XLogRecPtrsublsn);
74+
XLogRecPtrsublsn,boolupdate_only);
7575
externcharGetSubscriptionRelState(Oidsubid,Oidrelid,
7676
XLogRecPtr*sublsn,boolmissing_ok);
7777
externvoidRemoveSubscriptionRel(Oidsubid,Oidrelid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp