Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6910089

Browse files
committed
seems to be working
1 parent2ef6fe9 commit6910089

File tree

4 files changed

+107
-5
lines changed

4 files changed

+107
-5
lines changed

‎src/backend/access/transam/twophase.c

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,83 @@ static void RemoveGXact(GlobalTransaction gxact);
200200

201201
staticvoidXlogReadTwoPhaseData(XLogRecPtrlsn,char**buf,int*len);
202202

203+
204+
dlist_headStandbyTwoPhaseStateData=DLIST_STATIC_INIT(StandbyTwoPhaseStateData);
205+
206+
typedefstructStandbyPreparedTransaction
207+
{
208+
TransactionIdxid;
209+
XLogRecPtrprepare_start_lsn;
210+
XLogRecPtrprepare_end_lsn;
211+
dlist_nodelist_node;
212+
}StandbyPreparedTransaction;
213+
214+
void
215+
StandbyCheckPointTwoPhase(XLogRecPtrredo_horizon)
216+
{
217+
dlist_mutable_itermiter;
218+
intserialized_xacts=0;
219+
220+
Assert(RecoveryInProgress());
221+
222+
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
223+
224+
dlist_foreach_modify(miter,&StandbyTwoPhaseStateData)
225+
{
226+
StandbyPreparedTransaction*xact=dlist_container(StandbyPreparedTransaction,
227+
list_node,miter.cur);
228+
229+
if (xact->prepare_end_lsn <=redo_horizon)
230+
{
231+
char*buf;
232+
intlen;
233+
234+
XlogReadTwoPhaseData(xact->prepare_start_lsn,&buf,&len);
235+
RecreateTwoPhaseFile(xact->xid,buf,len);
236+
pfree(buf);
237+
dlist_delete(miter.cur);
238+
serialized_xacts++;
239+
}
240+
}
241+
242+
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
243+
244+
if (log_checkpoints&&serialized_xacts>0)
245+
ereport(LOG,
246+
(errmsg_plural("%u two-phase state file was written "
247+
"for long-running prepared transactions",
248+
"%u two-phase state files were written "
249+
"for long-running prepared transactions",
250+
serialized_xacts,
251+
serialized_xacts)));
252+
}
253+
254+
// XXX: rename to remove_standby_state
255+
void
256+
StandbyAtCommit(TransactionIdxid)
257+
{
258+
dlist_mutable_itermiter;
259+
260+
Assert(RecoveryInProgress());
261+
262+
dlist_foreach_modify(miter,&StandbyTwoPhaseStateData)
263+
{
264+
StandbyPreparedTransaction*xact=dlist_container(StandbyPreparedTransaction,
265+
list_node,miter.cur);
266+
267+
if (xact->xid==xid)
268+
{
269+
// pfree(xact);
270+
dlist_delete(miter.cur);
271+
return;
272+
}
273+
}
274+
275+
RemoveTwoPhaseFile(xid, false);
276+
}
277+
278+
279+
203280
/*
204281
* Initialization of shared memory
205282
*/
@@ -1252,7 +1329,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
12521329
XLogReaderState*xlogreader;
12531330
char*errormsg;
12541331

1255-
Assert(!RecoveryInProgress());
1332+
//Assert(!RecoveryInProgress());
12561333

12571334
xlogreader=XLogReaderAllocate(&read_local_xlog_page,NULL);
12581335
if (!xlogreader)
@@ -1653,6 +1730,20 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16531730
serialized_xacts)));
16541731
}
16551732

1733+
void
1734+
StandbyAtPrepare(XLogReaderState*record)
1735+
{
1736+
StandbyPreparedTransaction*xact;
1737+
TwoPhaseFileHeader*hdr= (TwoPhaseFileHeader*)XLogRecGetData(record);
1738+
1739+
xact= (StandbyPreparedTransaction*)palloc(sizeof(StandbyPreparedTransaction));
1740+
xact->xid=hdr->xid;
1741+
xact->prepare_start_lsn=record->ReadRecPtr;
1742+
xact->prepare_end_lsn=record->EndRecPtr;
1743+
1744+
dlist_push_tail(&StandbyTwoPhaseStateData,&xact->list_node);
1745+
}
1746+
16561747
/*
16571748
* PrescanPreparedTransactions
16581749
*

‎src/backend/access/transam/xact.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5604,7 +5604,8 @@ xact_redo(XLogReaderState *record)
56045604
Assert(TransactionIdIsValid(parsed.twophase_xid));
56055605
xact_redo_commit(&parsed,parsed.twophase_xid,
56065606
record->EndRecPtr,XLogRecGetOrigin(record));
5607-
RemoveTwoPhaseFile(parsed.twophase_xid, false);
5607+
// RemoveTwoPhaseFile(parsed.twophase_xid, false);
5608+
StandbyAtCommit(parsed.twophase_xid);
56085609
}
56095610
}
56105611
elseif (info==XLOG_XACT_ABORT||info==XLOG_XACT_ABORT_PREPARED)
@@ -5624,14 +5625,17 @@ xact_redo(XLogReaderState *record)
56245625
{
56255626
Assert(TransactionIdIsValid(parsed.twophase_xid));
56265627
xact_redo_abort(&parsed,parsed.twophase_xid);
5627-
RemoveTwoPhaseFile(parsed.twophase_xid, false);
5628+
// RemoveTwoPhaseFile(parsed.twophase_xid, false);
5629+
StandbyAtCommit(parsed.twophase_xid);
56285630
}
56295631
}
56305632
elseif (info==XLOG_XACT_PREPARE)
56315633
{
56325634
/* the record contents are exactly the 2PC file */
5633-
RecreateTwoPhaseFile(XLogRecGetXid(record),
5634-
XLogRecGetData(record),XLogRecGetDataLen(record));
5635+
// elog(WARNING, "2PC: RecreateTwoPhaseFile");
5636+
// RecreateTwoPhaseFile(XLogRecGetXid(record),
5637+
// XLogRecGetData(record), XLogRecGetDataLen(record));
5638+
StandbyAtPrepare(record);
56355639
}
56365640
elseif (info==XLOG_XACT_ASSIGNMENT)
56375641
{

‎src/backend/access/transam/xlog.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9395,6 +9395,7 @@ xlog_redo(XLogReaderState *record)
93959395
Assert(info==XLOG_FPI||info==XLOG_FPI_FOR_HINT||
93969396
!XLogRecHasAnyBlockRefs(record));
93979397

9398+
elog(WARNING,"2PC: xlog_redo, info=%x",info);
93989399
if (info==XLOG_NEXTOID)
93999400
{
94009401
OidnextOid;
@@ -9548,6 +9549,7 @@ xlog_redo(XLogReaderState *record)
95489549
checkPoint.ThisTimeLineID,ThisTimeLineID)));
95499550

95509551
RecoveryRestartPoint(&checkPoint);
9552+
StandbyCheckPointTwoPhase(checkPoint.redo);
95519553
}
95529554
elseif (info==XLOG_END_OF_RECOVERY)
95539555
{

‎src/include/access/twophase.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#defineTWOPHASE_H
1616

1717
#include"access/xlogdefs.h"
18+
#include"access/xlogreader.h"
1819
#include"datatype/timestamp.h"
1920
#include"storage/lock.h"
2021

@@ -56,4 +57,8 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
5657

5758
externvoidFinishPreparedTransaction(constchar*gid,boolisCommit);
5859

60+
externvoidStandbyAtCommit(TransactionIdxid);
61+
externvoidStandbyAtPrepare(XLogReaderState*record);
62+
externvoidStandbyCheckPointTwoPhase(XLogRecPtrredo_horizon);
63+
5964
#endif/* TWOPHASE_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp