Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5fcda94

Browse files
committed
Add 2PC support to pgoutput (based on some patch from -hackers).
1 parentef5bb79 commit5fcda94

File tree

5 files changed

+624
-11
lines changed

5 files changed

+624
-11
lines changed

‎src/backend/replication/logical/proto.c

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,11 @@ void
7575
logicalrep_write_commit(StringInfoout,ReorderBufferTXN*txn,
7676
XLogRecPtrcommit_lsn)
7777
{
78-
uint8flags=0;
78+
uint8flags=0;
7979

8080
pq_sendbyte(out,'C');/* sending COMMIT */
8181

82+
flags |=LOGICALREP_IS_COMMIT;
8283
/* send the flags field (unused for now) */
8384
pq_sendbyte(out,flags);
8485

@@ -89,21 +90,106 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
8990
}
9091

9192
/*
92-
* Read transaction COMMIT from the stream.
93+
* Write ABORT to the output stream.
94+
*/
95+
void
96+
logicalrep_write_abort(StringInfoout,ReorderBufferTXN*txn,
97+
XLogRecPtrabort_lsn)
98+
{
99+
uint8flags=0;
100+
101+
pq_sendbyte(out,'C');/* sending ABORT flag below */
102+
103+
flags |=LOGICALREP_IS_ABORT;
104+
/* send the flags field */
105+
pq_sendbyte(out,flags);
106+
107+
/* send fields */
108+
pq_sendint64(out,abort_lsn);
109+
pq_sendint64(out,txn->end_lsn);
110+
pq_sendint64(out,txn->commit_time);
111+
}
112+
113+
/*
114+
* Read transaction COMMIT|ABORT from the stream.
93115
*/
94116
void
95-
logicalrep_read_commit(StringInfoin,LogicalRepCommitData*commit_data)
117+
logicalrep_read_commit(StringInfoin,LogicalRepCommitData*commit_data,
118+
uint8*flags)
96119
{
97-
/* read flags(unused for now)*/
98-
uint8flags=pq_getmsgbyte(in);
120+
/* read flags */
121+
uint8commit_flags=pq_getmsgbyte(in);
99122

100-
if (flags!=0)
101-
elog(ERROR,"unrecognized flags %u in commit message",flags);
123+
if (!(commit_flags&LOGICALREP_COMMIT_MASK))
124+
elog(ERROR,"unrecognized flags %u in commit|abort message",
125+
commit_flags);
102126

103127
/* read fields */
104128
commit_data->commit_lsn=pq_getmsgint64(in);
105129
commit_data->end_lsn=pq_getmsgint64(in);
106130
commit_data->committime=pq_getmsgint64(in);
131+
132+
/* set gid to empty */
133+
commit_data->gid[0]='\0';
134+
135+
*flags=commit_flags;
136+
}
137+
138+
/*
139+
* Write PREPARE to the output stream.
140+
*/
141+
void
142+
logicalrep_write_prepare(StringInfoout,ReorderBufferTXN*txn,
143+
XLogRecPtrprepare_lsn)
144+
{
145+
uint8flags=0;
146+
147+
pq_sendbyte(out,'P');/* sending PREPARE protocol */
148+
149+
if (txn->txn_flags&RBTXN_COMMIT_PREPARED)
150+
flags |=LOGICALREP_IS_COMMIT_PREPARED;
151+
elseif (txn->txn_flags&RBTXN_ROLLBACK_PREPARED)
152+
flags |=LOGICALREP_IS_ROLLBACK_PREPARED;
153+
elseif (txn->txn_flags&RBTXN_PREPARE)
154+
flags |=LOGICALREP_IS_PREPARE;
155+
156+
if (flags==0)
157+
elog(ERROR,"unrecognized flags %u in [commit|rollback] prepare message",flags);
158+
159+
/* send the flags field */
160+
pq_sendbyte(out,flags);
161+
162+
/* send fields */
163+
pq_sendint64(out,prepare_lsn);
164+
pq_sendint64(out,txn->end_lsn);
165+
pq_sendint64(out,txn->commit_time);
166+
167+
/* send gid */
168+
pq_sendstring(out,txn->gid);
169+
}
170+
171+
/*
172+
* Read transaction PREPARE from the stream.
173+
*/
174+
void
175+
logicalrep_read_prepare(StringInfoin,LogicalRepCommitData*commit_data,uint8*flags)
176+
{
177+
/* read flags */
178+
uint8prep_flags=pq_getmsgbyte(in);
179+
180+
if (!(prep_flags&LOGICALREP_PREPARE_MASK))
181+
elog(ERROR,"unrecognized flags %u in prepare message",prep_flags);
182+
183+
/* read fields */
184+
commit_data->commit_lsn=pq_getmsgint64(in);
185+
commit_data->end_lsn=pq_getmsgint64(in);
186+
commit_data->committime=pq_getmsgint64(in);
187+
188+
/* read gid */
189+
strcpy(commit_data->gid,pq_getmsgstring(in));
190+
191+
/* set flags */
192+
*flags=prep_flags;
107193
}
108194

109195
/*

‎src/backend/replication/logical/worker.c

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -474,8 +474,9 @@ static void
474474
apply_handle_commit(StringInfos)
475475
{
476476
LogicalRepCommitDatacommit_data;
477+
uint8flags=0;
477478

478-
logicalrep_read_commit(s,&commit_data);
479+
logicalrep_read_commit(s,&commit_data,&flags);
479480

480481
Assert(commit_data.commit_lsn==remote_final_lsn);
481482

@@ -489,7 +490,11 @@ apply_handle_commit(StringInfo s)
489490
replorigin_session_origin_lsn=commit_data.end_lsn;
490491
replorigin_session_origin_timestamp=commit_data.committime;
491492

492-
CommitTransactionCommand();
493+
if (flags&LOGICALREP_IS_COMMIT)
494+
CommitTransactionCommand();
495+
elseif (flags&LOGICALREP_IS_ABORT)
496+
AbortCurrentTransaction();
497+
493498
pgstat_report_stat(false);
494499

495500
store_flush_position(commit_data.end_lsn);
@@ -509,6 +514,121 @@ apply_handle_commit(StringInfo s)
509514
pgstat_report_activity(STATE_IDLE,NULL);
510515
}
511516

517+
staticvoid
518+
apply_handle_prepare_txn(LogicalRepCommitData*commit_data)
519+
{
520+
Assert(commit_data->commit_lsn==remote_final_lsn);
521+
/* The synchronization worker runs in single transaction. */
522+
if (IsTransactionState()&& !am_tablesync_worker())
523+
{
524+
/* End the earlier transaction and start a new one */
525+
BeginTransactionBlock();
526+
CommitTransactionCommand();
527+
StartTransactionCommand();
528+
/*
529+
* Update origin state so we can restart streaming from correct
530+
* position in case of crash.
531+
*/
532+
replorigin_session_origin_lsn=commit_data->end_lsn;
533+
replorigin_session_origin_timestamp=commit_data->committime;
534+
535+
PrepareTransactionBlock(commit_data->gid);
536+
CommitTransactionCommand();
537+
pgstat_report_stat(false);
538+
539+
store_flush_position(commit_data->end_lsn);
540+
}
541+
else
542+
{
543+
/* Process any invalidation messages that might have accumulated. */
544+
AcceptInvalidationMessages();
545+
maybe_reread_subscription();
546+
}
547+
548+
in_remote_transaction= false;
549+
550+
/* Process any tables that are being synchronized in parallel. */
551+
process_syncing_tables(commit_data->end_lsn);
552+
553+
pgstat_report_activity(STATE_IDLE,NULL);
554+
}
555+
556+
staticvoid
557+
apply_handle_commit_prepared_txn(LogicalRepCommitData*commit_data)
558+
{
559+
/* there is no transaction when COMMIT PREPARED is called */
560+
ensure_transaction();
561+
562+
/*
563+
* Update origin state so we can restart streaming from correct
564+
* position in case of crash.
565+
*/
566+
replorigin_session_origin_lsn=commit_data->end_lsn;
567+
replorigin_session_origin_timestamp=commit_data->committime;
568+
569+
FinishPreparedTransaction(commit_data->gid, true);
570+
CommitTransactionCommand();
571+
pgstat_report_stat(false);
572+
573+
store_flush_position(commit_data->end_lsn);
574+
in_remote_transaction= false;
575+
576+
/* Process any tables that are being synchronized in parallel. */
577+
process_syncing_tables(commit_data->end_lsn);
578+
579+
pgstat_report_activity(STATE_IDLE,NULL);
580+
}
581+
582+
staticvoid
583+
apply_handle_rollback_prepared_txn(LogicalRepCommitData*commit_data)
584+
{
585+
/* there is no transaction when ABORT/ROLLBACK PREPARED is called */
586+
ensure_transaction();
587+
588+
/*
589+
* Update origin state so we can restart streaming from correct
590+
* position in case of crash.
591+
*/
592+
replorigin_session_origin_lsn=commit_data->end_lsn;
593+
replorigin_session_origin_timestamp=commit_data->committime;
594+
595+
/* FIXME: it is ok if xact is absent */
596+
FinishPreparedTransaction(commit_data->gid, false);
597+
CommitTransactionCommand();
598+
pgstat_report_stat(false);
599+
600+
store_flush_position(commit_data->end_lsn);
601+
in_remote_transaction= false;
602+
603+
/* Process any tables that are being synchronized in parallel. */
604+
process_syncing_tables(commit_data->end_lsn);
605+
606+
pgstat_report_activity(STATE_IDLE,NULL);
607+
}
608+
609+
/*
610+
* Handle PREPARE message.
611+
*/
612+
staticvoid
613+
apply_handle_prepare(StringInfos)
614+
{
615+
LogicalRepCommitDatacommit_data;
616+
uint8flags=0;
617+
618+
logicalrep_read_prepare(s,&commit_data,&flags);
619+
620+
if (flags&LOGICALREP_IS_PREPARE)
621+
apply_handle_prepare_txn(&commit_data);
622+
elseif (flags&LOGICALREP_IS_COMMIT_PREPARED)
623+
apply_handle_commit_prepared_txn(&commit_data);
624+
elseif (flags&LOGICALREP_IS_ROLLBACK_PREPARED)
625+
apply_handle_rollback_prepared_txn(&commit_data);
626+
else
627+
ereport(ERROR,
628+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
629+
errmsg("wrong [commit|rollback] prepare message")));
630+
}
631+
512632
/*
513633
* Handle ORIGIN message.
514634
*
@@ -969,10 +1089,14 @@ apply_dispatch(StringInfo s)
9691089
case'B':
9701090
apply_handle_begin(s);
9711091
break;
972-
/* COMMIT */
1092+
/* COMMIT|ABORT */
9731093
case'C':
9741094
apply_handle_commit(s);
9751095
break;
1096+
/* [COMMIT|ROLLBACK] PREPARE */
1097+
case'P':
1098+
apply_handle_prepare(s);
1099+
break;
9761100
/* INSERT */
9771101
case'I':
9781102
apply_handle_insert(s);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp