Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9d60e38

Browse files
committed
Check transaction body (all trasnasction elements are decoded)
1 parent484e3d8 commit9d60e38

File tree

8 files changed

+114
-71
lines changed

8 files changed

+114
-71
lines changed

‎src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
4242
* present */
4343

4444
parsed->xact_time=xlrec->xact_time;
45+
parsed->n_changes=xlrec->n_changes;
4546

4647
if (info&XLOG_XACT_HAS_INFO)
4748
{

‎src/backend/access/transam/twophase.c

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ typedef struct TwoPhaseFileHeader
210210
int32ncommitrels;/* number of delete-on-commit rels */
211211
int32nabortrels;/* number of delete-on-abort rels */
212212
int32ninvalmsgs;/* number of cache invalidation messages */
213+
int64n_changes;/* number of changes done by transaction */
213214
boolinitfileinval;/* does relcache init file need invalidation? */
214215
uint16gidlen;/* length of the GID - GID follows the header */
215216
xl_xact_originxl_origin;/* replication origin information */
@@ -330,7 +331,7 @@ TwoPhaseShmemInit(void)
330331
((char*)TwoPhaseState+
331332
MAXALIGN(offsetof(TwoPhaseStateData,prepXacts)+
332333
sizeof(GlobalTransaction)*2*max_prepared_xacts));
333-
334+
334335
TwoPhaseState->hashTable=&TwoPhaseState->prepXacts[max_prepared_xacts];
335336

336337
for (i=0;i<max_prepared_xacts;i++)
@@ -530,10 +531,10 @@ MarkAsPreparing(TransactionId xid, const char *gid,
530531
proc->lwWaitMode=0;
531532
proc->waitLock=NULL;
532533
proc->waitProcLock=NULL;
533-
534+
534535
cached_xid=xid;
535536
cached_gxact=gxact;
536-
537+
537538
for (i=0;i<NUM_LOCK_PARTITIONS;i++)
538539
SHMQueueInit(&(proc->myProcLocks[i]));
539540
/* subxid data must be filled later by GXactLoadSubxactData */
@@ -547,15 +548,15 @@ MarkAsPreparing(TransactionId xid, const char *gid,
547548
gxact->owner=owner;
548549
gxact->locking_pid=MyProcPid;
549550
gxact->valid= false;
550-
gxact->ondisk= false;
551+
gxact->ondisk= false;
551552
gxact->prep_index=TwoPhaseState->numPrepXacts;
552553
strcpy(gxact->gid,gid);
553554
*gxact->state_3pc='\0';
554555

555556
/* And insert it into the active array */
556557
Assert(TwoPhaseState->numPrepXacts<max_prepared_xacts);
557558
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++]=gxact;
558-
559+
559560
/*
560561
* Remember that we have this GlobalTransaction entry locked for us. If we
561562
* abort after this, we must release it.
@@ -655,7 +656,7 @@ LockGXact(const char *gid, Oid user)
655656
LWLockRelease(TwoPhaseStateLock);
656657

657658
if (MyLockedGxact!=gxact) {
658-
if (MyLockedGxact!=NULL) {
659+
if (MyLockedGxact!=NULL) {
659660
SpinLockRelease(&MyLockedGxact->spinlock);
660661
}
661662
MyLockedGxact=gxact;
@@ -670,13 +671,13 @@ LockGXact(const char *gid, Oid user)
670671
errmsg("prepared transaction with identifier \"%s\" is not valid",
671672
gid)));
672673
}
673-
674+
674675
if (user!=gxact->owner&& !superuser_arg(user)) {
675676
ereport(ERROR,
676677
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
677678
errmsg("permission denied to finish prepared transaction"),
678679
errhint("Must be superuser or the user that prepared the transaction.")));
679-
}
680+
}
680681

681682
/*
682683
* Note: it probably would be possible to allow committing from
@@ -697,11 +698,11 @@ LockGXact(const char *gid, Oid user)
697698
gxact->locking_pid=MyProcPid;
698699

699700
returngxact;
700-
}
701+
}
701702
}
702703

703704
LWLockRelease(TwoPhaseStateLock);
704-
if (MyLockedGxact!=NULL) {
705+
if (MyLockedGxact!=NULL) {
705706
SpinLockRelease(&MyLockedGxact->spinlock);
706707
MyLockedGxact=NULL;
707708
}
@@ -829,13 +830,13 @@ bool GetPreparedTransactionState(char const* gid, char* state)
829830
* Alter 3PC state of prepared transaction
830831
*/
831832
voidSetPreparedTransactionState(charconst*gid,charconst*state)
832-
{
833+
{
833834
GlobalTransactiongxact;
834835
PGXACT*pgxact;
835836
TwoPhaseFileHeader*hdr;
836837
char*buf;
837838
boolreplorigin;
838-
839+
839840

840841
if (strlen(state) >=MAX_3PC_STATE_SIZE)
841842
ereport(ERROR,
@@ -846,7 +847,7 @@ void SetPreparedTransactionState(char const* gid, char const* state)
846847
replorigin= (replorigin_session_origin!=InvalidRepOriginId&&
847848
replorigin_session_origin!=DoNotReplicateId);
848849

849-
gxact=LockGXact(gid,GetUserId());
850+
gxact=LockGXact(gid,GetUserId());
850851
pgxact=&ProcGlobal->allPgXact[gxact->pgprocno];
851852
strcpy(gxact->state_3pc,state);
852853

@@ -904,7 +905,7 @@ pg_precommit_prepared(PG_FUNCTION_ARGS)
904905
charconst*state=PG_GETARG_CSTRING(1);
905906
SetPreparedTransactionState(gid,state);
906907
PG_RETURN_VOID();
907-
}
908+
}
908909

909910
/*
910911
* pg_prepared_xact
@@ -1152,6 +1153,7 @@ StartPrepare(GlobalTransaction gxact)
11521153
hdr.nabortrels=smgrGetPendingDeletes(false,&abortrels);
11531154
hdr.ninvalmsgs=xactGetCommittedInvalidationMessages(&invalmsgs,
11541155
&hdr.initfileinval);
1156+
hdr.n_changes=MyXactLogicalChanges;
11551157
hdr.gidlen=strlen(gxact->gid)+1;/* Include '\0' */
11561158
*hdr.state_3pc='\0';
11571159

@@ -1209,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact)
12091211
Assert(hdr->magic==TWOPHASE_MAGIC);
12101212
hdr->total_len=records.total_len+sizeof(pg_crc32c);
12111213

1212-
if (replorigin) {
1214+
if (replorigin) {
12131215
Assert(replorigin_session_origin_lsn!=InvalidXLogRecPtr);
12141216
hdr->xl_origin.origin_lsn=replorigin_session_origin_lsn;
12151217
hdr->xl_origin.origin_timestamp=replorigin_session_origin_timestamp;
@@ -1445,7 +1447,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
14451447

14461448
hdr= (TwoPhaseFileHeader*)xlrec;
14471449
bufptr=xlrec+MAXALIGN(sizeof(TwoPhaseFileHeader));
1448-
1450+
14491451
parsed->origin_lsn=hdr->xl_origin.origin_lsn;
14501452
parsed->origin_timestamp=hdr->xl_origin.origin_timestamp;
14511453
parsed->xinfo=hdr->xl_origin.origin_lsn!=InvalidXLogRecPtr ?XACT_XINFO_HAS_ORIGIN :0;
@@ -1459,7 +1461,8 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
14591461
parsed->nsubxacts=hdr->nsubxacts;
14601462
parsed->nrels=hdr->ncommitrels;
14611463
parsed->nmsgs=hdr->ninvalmsgs;
1462-
1464+
parsed->n_changes=hdr->n_changes;
1465+
14631466
parsed->subxacts= (TransactionId*)bufptr;
14641467
bufptr+=MAXALIGN(hdr->nsubxacts*sizeof(TransactionId));
14651468

@@ -1575,10 +1578,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
15751578
inti;
15761579

15771580

1578-
if (isCommit)
1579-
{
1581+
if (isCommit)
1582+
{
15801583
CallXactCallbacks(XACT_EVENT_PRE_COMMIT_PREPARED);
1581-
}
1584+
}
15821585

15831586
/*
15841587
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1644,7 +1647,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
16441647
gid);
16451648
CallXactCallbacks(XACT_EVENT_ABORT_PREPARED);
16461649
}
1647-
1650+
16481651

16491652
ProcArrayRemove(proc,latestXid);
16501653

‎src/backend/access/transam/xact.c

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ intDefaultXactIsoLevel = XACT_READ_COMMITTED;
7474
intXactIsoLevel;
7575

7676
boolDefaultXactReadOnly= false;
77-
/*
77+
/*
7878
* We need initialization because only initialized vars appear in
7979
* postges.def and accssible from loadable extension
8080
*/
@@ -119,6 +119,10 @@ TransactionId *ParallelCurrentXids;
119119
*/
120120
boolMyXactAccessedTempRel= false;
121121

122+
/*
123+
* Number of logical changes (insert,delte,update,lgical message) performed by transaction
124+
*/
125+
int64MyXactLogicalChanges=0;
122126

123127
/*
124128
*transaction states - transaction state from server perspective
@@ -1916,6 +1920,8 @@ StartTransaction(void)
19161920
XactIsoLevel=DefaultXactIsoLevel;
19171921
forceSyncCommit= false;
19181922
MyXactAccessedTempRel= false;
1923+
MyXactLogicalChanges=0;
1924+
19191925

19201926
/*
19211927
* reinitialize within-transaction counters
@@ -2181,7 +2187,7 @@ CommitTransaction(void)
21812187
* waiting for lock on a relation we've modified, we want them to know
21822188
* about the catalog change before they start using the relation).
21832189
*/
2184-
xactHasCatcacheInvalidationMessages=HasCatcacheInvalidationMessages();
2190+
xactHasCatcacheInvalidationMessages=HasCatcacheInvalidationMessages();
21852191
xactHasRelcacheInvalidationMessages=HasRelcacheInvalidationMessages();
21862192
AtEOXact_Inval(true);
21872193

@@ -2196,7 +2202,7 @@ CommitTransaction(void)
21962202
RESOURCE_RELEASE_AFTER_LOCKS,
21972203
true, true);
21982204

2199-
if (!is_autonomous_transaction)
2205+
if (!is_autonomous_transaction)
22002206
{
22012207
/*
22022208
* Likewise, dropping of files deleted during the transaction is best done
@@ -2215,13 +2221,13 @@ CommitTransaction(void)
22152221

22162222
AtCommit_Notify();
22172223
AtEOXact_GUC(true,s->gucNestLevel);
2218-
if (!is_autonomous_transaction)
2224+
if (!is_autonomous_transaction)
22192225
{
22202226
AtEOXact_SPI(true);
22212227
}
22222228
AtEOXact_on_commit_actions(true);
22232229
AtEOXact_Namespace(true,is_parallel_worker);
2224-
if (!is_autonomous_transaction)
2230+
if (!is_autonomous_transaction)
22252231
{
22262232
AtEOXact_SMgr();
22272233
AtEOXact_Files();
@@ -2689,7 +2695,7 @@ AbortTransaction(void)
26892695
AtEOXact_Buffers(false);
26902696
}
26912697
AtEOXact_RelationCache(false);
2692-
xactHasCatcacheInvalidationMessages=HasCatcacheInvalidationMessages();
2698+
xactHasCatcacheInvalidationMessages=HasCatcacheInvalidationMessages();
26932699
xactHasRelcacheInvalidationMessages=HasRelcacheInvalidationMessages();
26942700
AtEOXact_Inval(false);
26952701
AtEOXact_MultiXact();
@@ -3537,14 +3543,14 @@ void SuspendTransaction(void)
35373543
sus->TopTransactionStateData=TopTransactionStateData;
35383544

35393545
sus->SnapshotState=SuspendSnapshot();/* only before the resource-owner stuff */
3540-
3541-
if (HasCatcacheInvalidationMessages())
3546+
3547+
if (HasCatcacheInvalidationMessages())
35423548
{
35433549
ResetCatalogCaches();
35443550
}
3545-
if (HasRelcacheInvalidationMessages())
3551+
if (HasRelcacheInvalidationMessages())
35463552
{
3547-
RelationCacheInvalidate();
3553+
RelationCacheInvalidate();
35483554
}
35493555
sus->InvalidationInfo=SuspendInvalidationInfo();
35503556
xactHasCatcacheInvalidationMessages= false;
@@ -3601,7 +3607,7 @@ void SuspendTransaction(void)
36013607

36023608
sus->PgStatState=PgStatSuspend();
36033609
sus->TriggerState=TriggerSuspend();
3604-
sus->SPIState=SuspendSPI();
3610+
sus->SPIState=SuspendSPI();
36053611
}
36063612

36073613
AtStart_Memory();
@@ -3660,13 +3666,13 @@ bool ResumeTransaction(void)
36603666

36613667
ResumeSnapshot(sus->SnapshotState);/* only after the resource-owner stuff */
36623668
ResumeInvalidationInfo(sus->InvalidationInfo);
3663-
if (xactHasCatcacheInvalidationMessages)
3669+
if (xactHasCatcacheInvalidationMessages)
36643670
{
36653671
ResetCatalogCaches();
36663672
}
3667-
if (xactHasRelcacheInvalidationMessages)
3673+
if (xactHasRelcacheInvalidationMessages)
36683674
{
3669-
RelationCacheInvalidate();
3675+
RelationCacheInvalidate();
36703676
}
36713677

36723678
MyProc->backendId=sus->vxid.backendId;
@@ -3864,7 +3870,7 @@ EndTransactionBlock(bool autonomous)
38643870
* to COMMIT.
38653871
*/
38663872
caseTBLOCK_INPROGRESS:
3867-
if (autonomous&&getNestLevelATX()==0) {
3873+
if (autonomous&&getNestLevelATX()==0) {
38683874
ereport(WARNING,
38693875
(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
38703876
errmsg("there is no autonomous transaction in progress")));
@@ -3993,7 +3999,7 @@ UserAbortTransactionBlock(bool autonomous)
39933999
* exit the transaction block.
39944000
*/
39954001
caseTBLOCK_INPROGRESS:
3996-
if (autonomous&&getNestLevelATX()==0) {
4002+
if (autonomous&&getNestLevelATX()==0) {
39974003
ereport(WARNING,
39984004
(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
39994005
errmsg("there is no autonomous transaction in progress")));
@@ -5173,7 +5179,7 @@ EstimateTransactionStateSpace(void)
51735179
nxids=add_size(nxids,s->nChildXids);
51745180
}
51755181

5176-
nxids=add_size(nxids,nParallelCurrentXids);
5182+
nxids=add_size(nxids,nParallelCurrentXids);
51775183
nxids=mul_size(nxids,sizeof(TransactionId));
51785184
returnadd_size(nxids,TM->GetTransactionStateSize());
51795185
}
@@ -5489,6 +5495,7 @@ XactLogCommitRecord(TimestampTz commit_time,
54895495
/* First figure out and collect all the information needed */
54905496

54915497
xlrec.xact_time=commit_time;
5498+
xlrec.n_changes=MyXactLogicalChanges;
54925499

54935500
if (relcacheInval)
54945501
xl_xinfo.xinfo |=XACT_COMPLETION_UPDATE_RELCACHE_FILE;
@@ -5890,7 +5897,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
58905897
* because subtransaction commit is never WAL logged.
58915898
*/
58925899
staticvoid
5893-
xact_redo_abort(xl_xact_parsed_abort*parsed,
5900+
xact_redo_abort(xl_xact_parsed_abort*parsed,
58945901
TransactionIdxid,
58955902
XLogRecPtrlsn,
58965903
RepOriginIdorigin_id)
@@ -6036,9 +6043,9 @@ xact_redo(XLogReaderState *record)
60366043
RecreateTwoPhaseFile(XLogRecGetXid(record),
60376044
XLogRecGetData(record),XLogRecGetDataLen(record));
60386045

6039-
if (originId!=InvalidRepOriginId&&originId!=DoNotReplicateId)
6046+
if (originId!=InvalidRepOriginId&&originId!=DoNotReplicateId)
60406047
{
6041-
xl_xact_parsed_prepareparsed;
6048+
xl_xact_parsed_prepareparsed;
60426049
ParsePrepareRecord(XLogRecGetXid(record),XLogRecGetData(record),&parsed);
60436050
Assert(parsed.origin_lsn!=InvalidXLogRecPtr);
60446051
/* recover apply progress */
@@ -6059,17 +6066,17 @@ xact_redo(XLogReaderState *record)
60596066
}
60606067

60616068
Datumpg_current_tx_nest_level(PG_FUNCTION_ARGS)
6062-
{
6069+
{
60636070
PG_RETURN_INT64(GetCurrentTransactionNestLevel());
60646071
}
60656072

60666073
Datumpg_current_atx_nest_level(PG_FUNCTION_ARGS)
6067-
{
6074+
{
60686075
PG_RETURN_INT64(getNestLevelATX());
60696076
}
60706077

60716078
Datumpg_current_atx_has_ancestor(PG_FUNCTION_ARGS)
6072-
{
6079+
{
60736080
int64xid=PG_GETARG_INT64(0);
60746081
PG_RETURN_BOOL(TransactionIdIsAncestorOfCurrentATX(xid));
60756082
}

‎src/backend/access/transam/xloginsert.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,12 @@ XLogInsert(RmgrId rmid, uint8 info)
423423

424424
TRACE_POSTGRESQL_XLOG_INSERT(rmid,info);
425425

426+
if (rmid==RM_LOGICALMSG_ID||rmid==RM_HEAP_ID)
427+
{
428+
MyXactLogicalChanges+=1;
429+
}
430+
431+
426432
/*
427433
* In bootstrap mode, we don't actually log anything but XLOG resources;
428434
* return a phony record pointer.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp