Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3a09d75

Browse files
committed
Rearrange logrep worker's snapshot handling some more.
It turns out that worker.c's code path for TRUNCATE was alsocareless about establishing a snapshot while executing user-definedcode, allowing the checks added by commit84f5c29 to fail whena trigger is fired in that context.We could just wrap Push/PopActiveSnapshot around the truncate call,but it seems better to establish a policy of holding a snapshotthroughout execution of a replication step. To help with that andpossible future requirements, replace the previous ensure_transactioncalls with pairs of begin/end_replication_step calls.Per report from Mark Dilger. Back-patch to v11, like the previouschanges.Discussion:https://postgr.es/m/B4A3AF82-79ED-4F4C-A4E5-CD2622098972@enterprisedb.com
1 parentbb4aed4 commit3a09d75

File tree

1 file changed

+48
-37
lines changed

1 file changed

+48
-37
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -282,30 +282,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
282282
}
283283

284284
/*
285-
*Make sure that we started local transaction.
285+
*Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
286286
*
287-
* Also switches to ApplyMessageContext as necessary.
287+
* Start a transaction, if this is the first step (else we keep using the
288+
* existing transaction).
289+
* Also provide a global snapshot and ensure we run in ApplyMessageContext.
288290
*/
289-
staticbool
290-
ensure_transaction(void)
291+
staticvoid
292+
begin_replication_step(void)
291293
{
292-
if (IsTransactionState())
293-
{
294-
SetCurrentStatementStartTimestamp();
295-
296-
if (CurrentMemoryContext!=ApplyMessageContext)
297-
MemoryContextSwitchTo(ApplyMessageContext);
294+
SetCurrentStatementStartTimestamp();
298295

299-
return false;
296+
if (!IsTransactionState())
297+
{
298+
StartTransactionCommand();
299+
maybe_reread_subscription();
300300
}
301301

302-
SetCurrentStatementStartTimestamp();
303-
StartTransactionCommand();
304-
305-
maybe_reread_subscription();
302+
PushActiveSnapshot(GetTransactionSnapshot());
306303

307304
MemoryContextSwitchTo(ApplyMessageContext);
308-
return true;
305+
}
306+
307+
/*
308+
* Finish up one step of a replication transaction.
309+
* Callers of begin_replication_step() must also call this.
310+
*
311+
* We don't close out the transaction here, but we should increment
312+
* the command counter to make the effects of this step visible.
313+
*/
314+
staticvoid
315+
end_replication_step(void)
316+
{
317+
PopActiveSnapshot();
318+
319+
CommandCounterIncrement();
309320
}
310321

311322
/*
@@ -359,13 +370,6 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
359370
RangeTblEntry*rte;
360371
ResultRelInfo*resultRelInfo;
361372

362-
/*
363-
* Input functions may need an active snapshot, as may AFTER triggers
364-
* invoked during finish_edata. For safety, ensure an active snapshot
365-
* exists throughout all our usage of the executor.
366-
*/
367-
PushActiveSnapshot(GetTransactionSnapshot());
368-
369373
edata= (ApplyExecutionData*)palloc0(sizeof(ApplyExecutionData));
370374
edata->targetRel=rel;
371375

@@ -433,8 +437,6 @@ finish_edata(ApplyExecutionData *edata)
433437
ExecResetTupleTable(estate->es_tupleTable, false);
434438
FreeExecutorState(estate);
435439
pfree(edata);
436-
437-
PopActiveSnapshot();
438440
}
439441

440442
/*
@@ -831,7 +833,7 @@ apply_handle_stream_start(StringInfo s)
831833
* transaction for handling the buffile, used for serializing the
832834
* streaming data and subxact info.
833835
*/
834-
ensure_transaction();
836+
begin_replication_step();
835837

836838
/* notify handle methods we're processing a remote transaction */
837839
in_streamed_transaction= true;
@@ -861,6 +863,8 @@ apply_handle_stream_start(StringInfo s)
861863
subxact_info_read(MyLogicalRepWorker->subid,stream_xid);
862864

863865
pgstat_report_activity(STATE_RUNNING,NULL);
866+
867+
end_replication_step();
864868
}
865869

866870
/*
@@ -937,7 +941,7 @@ apply_handle_stream_abort(StringInfo s)
937941
StreamXidHash*ent;
938942

939943
subidx=-1;
940-
ensure_transaction();
944+
begin_replication_step();
941945
subxact_info_read(MyLogicalRepWorker->subid,xid);
942946

943947
for (i=subxact_data.nsubxacts;i>0;i--)
@@ -958,7 +962,7 @@ apply_handle_stream_abort(StringInfo s)
958962
{
959963
/* Cleanup the subxact info */
960964
cleanup_subxact_info();
961-
965+
end_replication_step();
962966
CommitTransactionCommand();
963967
return;
964968
}
@@ -986,6 +990,7 @@ apply_handle_stream_abort(StringInfo s)
986990
/* write the updated subxact list */
987991
subxact_info_write(MyLogicalRepWorker->subid,xid);
988992

993+
end_replication_step();
989994
CommitTransactionCommand();
990995
}
991996
}
@@ -1013,7 +1018,8 @@ apply_handle_stream_commit(StringInfo s)
10131018

10141019
elog(DEBUG1,"received commit for streamed transaction %u",xid);
10151020

1016-
ensure_transaction();
1021+
/* Make sure we have an open transaction */
1022+
begin_replication_step();
10171023

10181024
/*
10191025
* Allocate file handle and memory required to process all the messages in
@@ -1046,6 +1052,8 @@ apply_handle_stream_commit(StringInfo s)
10461052
in_remote_transaction= true;
10471053
pgstat_report_activity(STATE_RUNNING,NULL);
10481054

1055+
end_replication_step();
1056+
10491057
/*
10501058
* Read the entries one by one and pass them through the same logic as in
10511059
* apply_dispatch.
@@ -1227,7 +1235,7 @@ apply_handle_insert(StringInfo s)
12271235
if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT,s))
12281236
return;
12291237

1230-
ensure_transaction();
1238+
begin_replication_step();
12311239

12321240
relid=logicalrep_read_insert(s,&newtup);
12331241
rel=logicalrep_rel_open(relid,RowExclusiveLock);
@@ -1238,6 +1246,7 @@ apply_handle_insert(StringInfo s)
12381246
* transaction so it's safe to unlock it.
12391247
*/
12401248
logicalrep_rel_close(rel,RowExclusiveLock);
1249+
end_replication_step();
12411250
return;
12421251
}
12431252

@@ -1266,7 +1275,7 @@ apply_handle_insert(StringInfo s)
12661275

12671276
logicalrep_rel_close(rel,NoLock);
12681277

1269-
CommandCounterIncrement();
1278+
end_replication_step();
12701279
}
12711280

12721281
/*
@@ -1346,7 +1355,7 @@ apply_handle_update(StringInfo s)
13461355
if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE,s))
13471356
return;
13481357

1349-
ensure_transaction();
1358+
begin_replication_step();
13501359

13511360
relid=logicalrep_read_update(s,&has_oldtup,&oldtup,
13521361
&newtup);
@@ -1358,6 +1367,7 @@ apply_handle_update(StringInfo s)
13581367
* transaction so it's safe to unlock it.
13591368
*/
13601369
logicalrep_rel_close(rel,RowExclusiveLock);
1370+
end_replication_step();
13611371
return;
13621372
}
13631373

@@ -1416,7 +1426,7 @@ apply_handle_update(StringInfo s)
14161426

14171427
logicalrep_rel_close(rel,NoLock);
14181428

1419-
CommandCounterIncrement();
1429+
end_replication_step();
14201430
}
14211431

14221432
/*
@@ -1501,7 +1511,7 @@ apply_handle_delete(StringInfo s)
15011511
if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE,s))
15021512
return;
15031513

1504-
ensure_transaction();
1514+
begin_replication_step();
15051515

15061516
relid=logicalrep_read_delete(s,&oldtup);
15071517
rel=logicalrep_rel_open(relid,RowExclusiveLock);
@@ -1512,6 +1522,7 @@ apply_handle_delete(StringInfo s)
15121522
* transaction so it's safe to unlock it.
15131523
*/
15141524
logicalrep_rel_close(rel,RowExclusiveLock);
1525+
end_replication_step();
15151526
return;
15161527
}
15171528

@@ -1542,7 +1553,7 @@ apply_handle_delete(StringInfo s)
15421553

15431554
logicalrep_rel_close(rel,NoLock);
15441555

1545-
CommandCounterIncrement();
1556+
end_replication_step();
15461557
}
15471558

15481559
/*
@@ -1867,7 +1878,7 @@ apply_handle_truncate(StringInfo s)
18671878
if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE,s))
18681879
return;
18691880

1870-
ensure_transaction();
1881+
begin_replication_step();
18711882

18721883
remote_relids=logicalrep_read_truncate(s,&cascade,&restart_seqs);
18731884

@@ -1958,7 +1969,7 @@ apply_handle_truncate(StringInfo s)
19581969
table_close(rel,NoLock);
19591970
}
19601971

1961-
CommandCounterIncrement();
1972+
end_replication_step();
19621973
}
19631974

19641975

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp