Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbcf79b5

Browse files
committed
Split the SetSubscriptionRelState function into two
We don't actually need the insert-or-update logic, so it's clearer tohave separate functions for the inserting and updating.Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
1 parentc25304a commitbcf79b5

File tree

4 files changed

+96
-81
lines changed

4 files changed

+96
-81
lines changed

‎src/backend/catalog/pg_subscription.c

Lines changed: 72 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
227227
}
228228

229229
/*
230-
* Set the state of a subscription table.
231-
*
232-
* If update_only is true and the record for given table doesn't exist, do
233-
* nothing. This can be used to avoid inserting a new record that was deleted
234-
* by someone else. Generally, subscription DDL commands should use false,
235-
* workers should use true.
236-
*
237-
* The insert-or-update logic in this function is not concurrency safe so it
238-
* might raise an error in rare circumstances. But if we took a stronger lock
239-
* such as ShareRowExclusiveLock, we would risk more deadlocks.
230+
* Add new state record for a subscription table.
240231
*/
241232
Oid
242-
SetSubscriptionRelState(Oidsubid,Oidrelid,charstate,
243-
XLogRecPtrsublsn,boolupdate_only)
233+
AddSubscriptionRelState(Oidsubid,Oidrelid,charstate,
234+
XLogRecPtrsublsn)
244235
{
245236
Relationrel;
246237
HeapTupletup;
247-
Oidsubrelid=InvalidOid;
238+
Oidsubrelid;
248239
boolnulls[Natts_pg_subscription_rel];
249240
Datumvalues[Natts_pg_subscription_rel];
250241

@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
256247
tup=SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
257248
ObjectIdGetDatum(relid),
258249
ObjectIdGetDatum(subid));
250+
if (HeapTupleIsValid(tup))
251+
elog(ERROR,"subscription table %u in subscription %u already exists",
252+
relid,subid);
259253

260-
/*
261-
* If the record for given table does not exist yet create new record,
262-
* otherwise update the existing one.
263-
*/
264-
if (!HeapTupleIsValid(tup)&& !update_only)
265-
{
266-
/* Form the tuple. */
267-
memset(values,0,sizeof(values));
268-
memset(nulls, false,sizeof(nulls));
269-
values[Anum_pg_subscription_rel_srsubid-1]=ObjectIdGetDatum(subid);
270-
values[Anum_pg_subscription_rel_srrelid-1]=ObjectIdGetDatum(relid);
271-
values[Anum_pg_subscription_rel_srsubstate-1]=CharGetDatum(state);
272-
if (sublsn!=InvalidXLogRecPtr)
273-
values[Anum_pg_subscription_rel_srsublsn-1]=LSNGetDatum(sublsn);
274-
else
275-
nulls[Anum_pg_subscription_rel_srsublsn-1]= true;
276-
277-
tup=heap_form_tuple(RelationGetDescr(rel),values,nulls);
278-
279-
/* Insert tuple into catalog. */
280-
subrelid=CatalogTupleInsert(rel,tup);
281-
282-
heap_freetuple(tup);
283-
}
284-
elseif (HeapTupleIsValid(tup))
285-
{
286-
boolreplaces[Natts_pg_subscription_rel];
254+
/* Form the tuple. */
255+
memset(values,0,sizeof(values));
256+
memset(nulls, false,sizeof(nulls));
257+
values[Anum_pg_subscription_rel_srsubid-1]=ObjectIdGetDatum(subid);
258+
values[Anum_pg_subscription_rel_srrelid-1]=ObjectIdGetDatum(relid);
259+
values[Anum_pg_subscription_rel_srsubstate-1]=CharGetDatum(state);
260+
if (sublsn!=InvalidXLogRecPtr)
261+
values[Anum_pg_subscription_rel_srsublsn-1]=LSNGetDatum(sublsn);
262+
else
263+
nulls[Anum_pg_subscription_rel_srsublsn-1]= true;
287264

288-
/* Update the tuple. */
289-
memset(values,0,sizeof(values));
290-
memset(nulls, false,sizeof(nulls));
291-
memset(replaces, false,sizeof(replaces));
265+
tup=heap_form_tuple(RelationGetDescr(rel),values,nulls);
292266

293-
replaces[Anum_pg_subscription_rel_srsubstate-1]= true;
294-
values[Anum_pg_subscription_rel_srsubstate-1]=CharGetDatum(state);
267+
/* Insert tuple into catalog. */
268+
subrelid=CatalogTupleInsert(rel,tup);
295269

296-
replaces[Anum_pg_subscription_rel_srsublsn-1]= true;
297-
if (sublsn!=InvalidXLogRecPtr)
298-
values[Anum_pg_subscription_rel_srsublsn-1]=LSNGetDatum(sublsn);
299-
else
300-
nulls[Anum_pg_subscription_rel_srsublsn-1]= true;
270+
heap_freetuple(tup);
301271

302-
tup=heap_modify_tuple(tup,RelationGetDescr(rel),values,nulls,
303-
replaces);
272+
/* Cleanup. */
273+
heap_close(rel,NoLock);
304274

305-
/* Update the catalog. */
306-
CatalogTupleUpdate(rel,&tup->t_self,tup);
275+
returnsubrelid;
276+
}
307277

308-
subrelid=HeapTupleGetOid(tup);
309-
}
278+
/*
279+
* Update the state of a subscription table.
280+
*/
281+
Oid
282+
UpdateSubscriptionRelState(Oidsubid,Oidrelid,charstate,
283+
XLogRecPtrsublsn)
284+
{
285+
Relationrel;
286+
HeapTupletup;
287+
Oidsubrelid;
288+
boolnulls[Natts_pg_subscription_rel];
289+
Datumvalues[Natts_pg_subscription_rel];
290+
boolreplaces[Natts_pg_subscription_rel];
291+
292+
LockSharedObject(SubscriptionRelationId,subid,0,AccessShareLock);
293+
294+
rel=heap_open(SubscriptionRelRelationId,RowExclusiveLock);
295+
296+
/* Try finding existing mapping. */
297+
tup=SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
298+
ObjectIdGetDatum(relid),
299+
ObjectIdGetDatum(subid));
300+
if (!HeapTupleIsValid(tup))
301+
elog(ERROR,"subscription table %u in subscription %u does not exist",
302+
relid,subid);
303+
304+
/* Update the tuple. */
305+
memset(values,0,sizeof(values));
306+
memset(nulls, false,sizeof(nulls));
307+
memset(replaces, false,sizeof(replaces));
308+
309+
replaces[Anum_pg_subscription_rel_srsubstate-1]= true;
310+
values[Anum_pg_subscription_rel_srsubstate-1]=CharGetDatum(state);
311+
312+
replaces[Anum_pg_subscription_rel_srsublsn-1]= true;
313+
if (sublsn!=InvalidXLogRecPtr)
314+
values[Anum_pg_subscription_rel_srsublsn-1]=LSNGetDatum(sublsn);
315+
else
316+
nulls[Anum_pg_subscription_rel_srsublsn-1]= true;
317+
318+
tup=heap_modify_tuple(tup,RelationGetDescr(rel),values,nulls,
319+
replaces);
320+
321+
/* Update the catalog. */
322+
CatalogTupleUpdate(rel,&tup->t_self,tup);
323+
324+
subrelid=HeapTupleGetOid(tup);
310325

311326
/* Cleanup. */
312327
heap_close(rel,NoLock);

‎src/backend/commands/subscriptioncmds.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
450450
CheckSubscriptionRelkind(get_rel_relkind(relid),
451451
rv->schemaname,rv->relname);
452452

453-
SetSubscriptionRelState(subid,relid,table_state,
454-
InvalidXLogRecPtr, false);
453+
AddSubscriptionRelState(subid,relid,table_state,
454+
InvalidXLogRecPtr);
455455
}
456456

457457
/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
569569
if (!bsearch(&relid,subrel_local_oids,
570570
list_length(subrel_states),sizeof(Oid),oid_cmp))
571571
{
572-
SetSubscriptionRelState(sub->oid,relid,
572+
AddSubscriptionRelState(sub->oid,relid,
573573
copy_data ?SUBREL_STATE_INIT :SUBREL_STATE_READY,
574-
InvalidXLogRecPtr, false);
574+
InvalidXLogRecPtr);
575575
ereport(DEBUG1,
576576
(errmsg("table \"%s.%s\" added to subscription \"%s\"",
577577
rv->schemaname,rv->relname,sub->name)));

‎src/backend/replication/logical/tablesync.c

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
298298

299299
SpinLockRelease(&MyLogicalRepWorker->relmutex);
300300

301-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
302-
MyLogicalRepWorker->relid,
303-
MyLogicalRepWorker->relstate,
304-
MyLogicalRepWorker->relstate_lsn,
305-
true);
301+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
302+
MyLogicalRepWorker->relid,
303+
MyLogicalRepWorker->relstate,
304+
MyLogicalRepWorker->relstate_lsn);
306305

307306
walrcv_endstreaming(wrconn,&tli);
308307
finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
427426
StartTransactionCommand();
428427
started_tx= true;
429428
}
430-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
431-
rstate->relid,rstate->state,
432-
rstate->lsn, true);
429+
430+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
431+
rstate->relid,rstate->state,
432+
rstate->lsn);
433433
}
434434
}
435435
else
@@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
870870

871871
/* Update the state and make it visible to others. */
872872
StartTransactionCommand();
873-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
874-
MyLogicalRepWorker->relid,
875-
MyLogicalRepWorker->relstate,
876-
MyLogicalRepWorker->relstate_lsn,
877-
true);
873+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
874+
MyLogicalRepWorker->relid,
875+
MyLogicalRepWorker->relstate,
876+
MyLogicalRepWorker->relstate_lsn);
878877
CommitTransactionCommand();
879878
pgstat_report_stat(false);
880879

@@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
961960
* Update the new state in catalog. No need to bother
962961
* with the shmem state as we are exiting for good.
963962
*/
964-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
965-
MyLogicalRepWorker->relid,
966-
SUBREL_STATE_SYNCDONE,
967-
*origin_startpos,
968-
true);
963+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
964+
MyLogicalRepWorker->relid,
965+
SUBREL_STATE_SYNCDONE,
966+
*origin_startpos);
969967
finish_sync_worker();
970968
}
971969
break;

‎src/include/catalog/pg_subscription_rel.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ typedef struct SubscriptionRelState
6767
charstate;
6868
}SubscriptionRelState;
6969

70-
externOidSetSubscriptionRelState(Oidsubid,Oidrelid,charstate,
71-
XLogRecPtrsublsn,boolupdate_only);
70+
externOidAddSubscriptionRelState(Oidsubid,Oidrelid,charstate,
71+
XLogRecPtrsublsn);
72+
externOidUpdateSubscriptionRelState(Oidsubid,Oidrelid,charstate,
73+
XLogRecPtrsublsn);
7274
externcharGetSubscriptionRelState(Oidsubid,Oidrelid,
7375
XLogRecPtr*sublsn,boolmissing_ok);
7476
externvoidRemoveSubscriptionRel(Oidsubid,Oidrelid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp