Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit91f9861

Browse files
author
Amit Kapila
committed
Refactor to make common functions in proto.c and worker.c.
This is a non-functional change only to refactor code to extract somereplication logic into static functions.This is done as preparation for the 2PC streaming patch which also sharesthis common logic.Author: Peter SmithReviewed-By: Amit KapilaDiscussion:https://postgr.es/m/CAHut+PuiSA8AiLcE2N5StzSKs46SQEP_vDOUD5fX2XCVtfZ7mQ@mail.gmail.com
1 parent454ae15 commit91f9861

File tree

2 files changed

+95
-48
lines changed

2 files changed

+95
-48
lines changed

‎src/backend/replication/logical/proto.c

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,22 +145,23 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
145145
}
146146

147147
/*
148-
*Write PREPARE to the output stream.
148+
*The core functionality for logicalrep_write_prepare.
149149
*/
150-
void
151-
logicalrep_write_prepare(StringInfoout,ReorderBufferTXN*txn,
152-
XLogRecPtrprepare_lsn)
150+
staticvoid
151+
logicalrep_write_prepare_common(StringInfoout,LogicalRepMsgTypetype,
152+
ReorderBufferTXN*txn,XLogRecPtrprepare_lsn)
153153
{
154154
uint8flags=0;
155155

156-
pq_sendbyte(out,LOGICAL_REP_MSG_PREPARE);
156+
pq_sendbyte(out,type);
157157

158158
/*
159159
* This should only ever happen for two-phase commit transactions, in
160160
* which case we expect to have a valid GID.
161161
*/
162162
Assert(txn->gid!=NULL);
163163
Assert(rbtxn_prepared(txn));
164+
Assert(TransactionIdIsValid(txn->xid));
164165

165166
/* send the flags field */
166167
pq_sendbyte(out,flags);
@@ -176,31 +177,52 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
176177
}
177178

178179
/*
179-
*Read transactionPREPAREfrom the stream.
180+
*WritePREPAREto the output stream.
180181
*/
181182
void
182-
logicalrep_read_prepare(StringInfoin,LogicalRepPreparedTxnData*prepare_data)
183+
logicalrep_write_prepare(StringInfoout,ReorderBufferTXN*txn,
184+
XLogRecPtrprepare_lsn)
185+
{
186+
logicalrep_write_prepare_common(out,LOGICAL_REP_MSG_PREPARE,
187+
txn,prepare_lsn);
188+
}
189+
190+
/*
191+
* The core functionality for logicalrep_read_prepare.
192+
*/
193+
staticvoid
194+
logicalrep_read_prepare_common(StringInfoin,char*msgtype,
195+
LogicalRepPreparedTxnData*prepare_data)
183196
{
184197
/* read flags */
185198
uint8flags=pq_getmsgbyte(in);
186199

187200
if (flags!=0)
188-
elog(ERROR,"unrecognized flags %u inprepare message",flags);
201+
elog(ERROR,"unrecognized flags %u in%s message",flags,msgtype);
189202

190203
/* read fields */
191204
prepare_data->prepare_lsn=pq_getmsgint64(in);
192205
if (prepare_data->prepare_lsn==InvalidXLogRecPtr)
193-
elog(ERROR,"prepare_lsn is not set inprepare message");
206+
elog(ERROR,"prepare_lsn is not set in%s message",msgtype);
194207
prepare_data->end_lsn=pq_getmsgint64(in);
195208
if (prepare_data->end_lsn==InvalidXLogRecPtr)
196-
elog(ERROR,"end_lsn is not set inprepare message");
209+
elog(ERROR,"end_lsn is not set in%s message",msgtype);
197210
prepare_data->prepare_time=pq_getmsgint64(in);
198211
prepare_data->xid=pq_getmsgint(in,4);
199212

200213
/* read gid (copy it into a pre-allocated buffer) */
201214
strlcpy(prepare_data->gid,pq_getmsgstring(in),sizeof(prepare_data->gid));
202215
}
203216

217+
/*
218+
* Read transaction PREPARE from the stream.
219+
*/
220+
void
221+
logicalrep_read_prepare(StringInfoin,LogicalRepPreparedTxnData*prepare_data)
222+
{
223+
logicalrep_read_prepare_common(in,"prepare",prepare_data);
224+
}
225+
204226
/*
205227
* Write COMMIT PREPARED to the output stream.
206228
*/

‎src/backend/replication/logical/worker.c

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
333333
/* Compute GID for two_phase transactions */
334334
staticvoidTwoPhaseTransactionGid(Oidsubid,TransactionIdxid,char*gid,intszgid);
335335

336+
/* Common streaming function to apply all the spooled messages */
337+
staticvoidapply_spooled_messages(TransactionIdxid,XLogRecPtrlsn);
336338

337339
/*
338340
* Should this worker apply changes for given relation.
@@ -884,14 +886,47 @@ apply_handle_begin_prepare(StringInfo s)
884886
pgstat_report_activity(STATE_RUNNING,NULL);
885887
}
886888

889+
/*
890+
* Common function to prepare the GID.
891+
*/
892+
staticvoid
893+
apply_handle_prepare_internal(LogicalRepPreparedTxnData*prepare_data)
894+
{
895+
chargid[GIDSIZE];
896+
897+
/*
898+
* Compute unique GID for two_phase transactions. We don't use GID of
899+
* prepared transaction sent by server as that can lead to deadlock when
900+
* we have multiple subscriptions from same node point to publications on
901+
* the same node. See comments atop worker.c
902+
*/
903+
TwoPhaseTransactionGid(MySubscription->oid,prepare_data->xid,
904+
gid,sizeof(gid));
905+
906+
/*
907+
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
908+
* called within the PrepareTransactionBlock below.
909+
*/
910+
BeginTransactionBlock();
911+
CommitTransactionCommand();/* Completes the preceding Begin command. */
912+
913+
/*
914+
* Update origin state so we can restart streaming from correct position
915+
* in case of crash.
916+
*/
917+
replorigin_session_origin_lsn=prepare_data->end_lsn;
918+
replorigin_session_origin_timestamp=prepare_data->prepare_time;
919+
920+
PrepareTransactionBlock(gid);
921+
}
922+
887923
/*
888924
* Handle PREPARE message.
889925
*/
890926
staticvoid
891927
apply_handle_prepare(StringInfos)
892928
{
893929
LogicalRepPreparedTxnDataprepare_data;
894-
chargid[GIDSIZE];
895930

896931
logicalrep_read_prepare(s,&prepare_data);
897932

@@ -902,15 +937,6 @@ apply_handle_prepare(StringInfo s)
902937
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
903938
LSN_FORMAT_ARGS(remote_final_lsn))));
904939

905-
/*
906-
* Compute unique GID for two_phase transactions. We don't use GID of
907-
* prepared transaction sent by server as that can lead to deadlock when
908-
* we have multiple subscriptions from same node point to publications on
909-
* the same node. See comments atop worker.c
910-
*/
911-
TwoPhaseTransactionGid(MySubscription->oid,prepare_data.xid,
912-
gid,sizeof(gid));
913-
914940
/*
915941
* Unlike commit, here, we always prepare the transaction even though no
916942
* change has happened in this transaction. It is done this way because at
@@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
923949
*/
924950
begin_replication_step();
925951

926-
/*
927-
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
928-
* called within the PrepareTransactionBlock below.
929-
*/
930-
BeginTransactionBlock();
931-
CommitTransactionCommand();/* Completes the preceding Begin command. */
932-
933-
/*
934-
* Update origin state so we can restart streaming from correct position
935-
* in case of crash.
936-
*/
937-
replorigin_session_origin_lsn=prepare_data.end_lsn;
938-
replorigin_session_origin_timestamp=prepare_data.prepare_time;
952+
apply_handle_prepare_internal(&prepare_data);
939953

940-
PrepareTransactionBlock(gid);
941954
end_replication_step();
942955
CommitTransactionCommand();
943956
pgstat_report_stat(false);
@@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
12561269
}
12571270

12581271
/*
1259-
*Handle STREAM COMMIT message.
1272+
*Common spoolfile processing.
12601273
*/
12611274
staticvoid
1262-
apply_handle_stream_commit(StringInfos)
1275+
apply_spooled_messages(TransactionIdxid,XLogRecPtrlsn)
12631276
{
1264-
TransactionIdxid;
12651277
StringInfoDatas2;
12661278
intnchanges;
12671279
charpath[MAXPGPATH];
12681280
char*buffer=NULL;
1269-
LogicalRepCommitDatacommit_data;
12701281
StreamXidHash*ent;
12711282
MemoryContextoldcxt;
12721283
BufFile*fd;
12731284

1274-
if (in_streamed_transaction)
1275-
ereport(ERROR,
1276-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1277-
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1278-
1279-
xid=logicalrep_read_stream_commit(s,&commit_data);
1280-
1281-
elog(DEBUG1,"received commit for streamed transaction %u",xid);
1282-
12831285
/* Make sure we have an open transaction */
12841286
begin_replication_step();
12851287

@@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)
13111313

13121314
MemoryContextSwitchTo(oldcxt);
13131315

1314-
remote_final_lsn=commit_data.commit_lsn;
1316+
remote_final_lsn=lsn;
13151317

13161318
/*
13171319
* Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
13901392
elog(DEBUG1,"replayed %d (all) changes from file \"%s\"",
13911393
nchanges,path);
13921394

1395+
return;
1396+
}
1397+
1398+
/*
1399+
* Handle STREAM COMMIT message.
1400+
*/
1401+
staticvoid
1402+
apply_handle_stream_commit(StringInfos)
1403+
{
1404+
TransactionIdxid;
1405+
LogicalRepCommitDatacommit_data;
1406+
1407+
if (in_streamed_transaction)
1408+
ereport(ERROR,
1409+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1410+
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1411+
1412+
xid=logicalrep_read_stream_commit(s,&commit_data);
1413+
1414+
elog(DEBUG1,"received commit for streamed transaction %u",xid);
1415+
1416+
apply_spooled_messages(xid,commit_data.commit_lsn);
1417+
13931418
apply_handle_commit_internal(s,&commit_data);
13941419

13951420
/* unlink the files with serialized changes and subxact info */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp