Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb3f24e7

Browse files
knizhnikkelvich
authored andcommitted
Fix memory leak in MMTS
1 parentdc5ae4a commitb3f24e7

File tree

6 files changed

+92
-41
lines changed

6 files changed

+92
-41
lines changed

‎decoder_raw.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
108108
Assert(lastXid!=txn->xid);
109109
lastXid=txn->xid;
110110
if (MMIsLocalTransaction(txn->xid)) {
111-
MTM_INFO("Skip local transaction %u\n",txn->xid);
111+
MTM_TRACE("Skip local transaction %u\n",txn->xid);
112112
data->isLocal= true;
113113
}else {
114114
OutputPluginPrepareWrite(ctx, true);
115-
MTM_INFO("Send transaction %u to replica\n",txn->xid);
115+
MTM_TRACE("Send transaction %u to replica\n",txn->xid);
116116
appendStringInfo(ctx->out,"BEGIN %u;",txn->xid);
117117
OutputPluginWrite(ctx, true);
118118
data->isLocal= false;
@@ -126,12 +126,12 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
126126
{
127127
DecoderRawData*data=ctx->output_plugin_private;
128128
if (!data->isLocal) {
129-
MTM_INFO("Send commit of transaction %u to replica\n",txn->xid);
129+
MTM_TRACE("Send commit of transaction %u to replica\n",txn->xid);
130130
OutputPluginPrepareWrite(ctx, true);
131131
appendStringInfoString(ctx->out,"COMMIT;");
132132
OutputPluginWrite(ctx, true);
133133
}else {
134-
MTM_INFO("Skip commit of transaction %u\n",txn->xid);
134+
MTM_TRACE("Skip commit of transaction %u\n",txn->xid);
135135
}
136136
}
137137

@@ -483,10 +483,10 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
483483

484484
data=ctx->output_plugin_private;
485485
if (data->isLocal) {
486-
MTM_INFO("Skip action %d in transaction %u\n",change->action,txn->xid);
486+
MTM_TRACE("Skip action %d in transaction %u\n",change->action,txn->xid);
487487
return;
488488
}
489-
MTM_INFO("Send action %d in transaction %u to replica\n",change->action,txn->xid);
489+
MTM_TRACE("Send action %d in transaction %u to replica\n",change->action,txn->xid);
490490

491491
/* Avoid leaking memory by using and resetting our own context */
492492
old=MemoryContextSwitchTo(data->context);

‎multimaster.c

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,9 @@ MtmResetTransaction(MtmCurrentTrans* x)
527527
x->xid=InvalidTransactionId;
528528
x->gtid.xid=InvalidTransactionId;
529529
x->isDistributed= false;
530+
x->isPrepared= false;
531+
x->isPrepared= false;
532+
x->status=TRANSACTION_STATUS_UNKNOWN;
530533
}
531534

532535
staticvoid
@@ -625,16 +628,15 @@ static void
625628
MtmPostPrepareTransaction(MtmCurrentTrans*x)
626629
{
627630
MtmTransState*ts;
628-
MtmTransMap*tm;
629631

630632
MtmLock(LW_EXCLUSIVE);
631633
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
632634
Assert(ts!=NULL);
633-
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
634-
Assert(x->gid[0]);
635-
tm->state=ts;
636635

637636
if (!MtmIsCoordinator(ts)) {
637+
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
638+
Assert(x->gid[0]);
639+
tm->state=ts;
638640
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
639641
MtmUnlock();
640642
MtmResetTransaction(x);
@@ -658,21 +660,23 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
658660
{
659661
MtmTransMap*tm;
660662

661-
MtmLock(LW_EXCLUSIVE);
662-
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_REMOVE,NULL);
663-
Assert(tm!=NULL);
664-
tm->state->status=TRANSACTION_STATUS_ABORTED;
665-
MtmAdjustSubtransactions(tm->state);
666-
MtmUnlock();
667-
MtmResetTransaction(x);
663+
if (x->status!=TRANSACTION_STATUS_ABORTED) {
664+
MtmLock(LW_EXCLUSIVE);
665+
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_REMOVE,NULL);
666+
Assert(tm!=NULL);
667+
tm->state->status=TRANSACTION_STATUS_ABORTED;
668+
MtmAdjustSubtransactions(tm->state);
669+
MtmUnlock();
670+
x->status=TRANSACTION_STATUS_ABORTED;
671+
}
668672
}
669673

670674
staticvoid
671675
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
672676
{
673677
MTM_TRACE("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, gid=%s -> %s\n",
674678
MyProcPid,x->xid,x->isPrepared,x->isReplicated,x->isDistributed,x->gid,commit ?"commit" :"abort");
675-
if (x->isDistributed&& (x->isPrepared||x->isReplicated)) {
679+
if (x->status!=TRANSACTION_STATUS_ABORTED&&x->isDistributed&& (x->isPrepared||x->isReplicated)) {
676680
MtmTransState*ts=NULL;
677681
MtmLock(LW_EXCLUSIVE);
678682
if (x->isPrepared) {
@@ -690,6 +694,10 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
690694
if (commit) {
691695
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN);
692696
ts->status=TRANSACTION_STATUS_COMMITTED;
697+
if (x->csn>ts->csn) {
698+
ts->csn=x->csn;
699+
MtmSyncClock(ts->csn);
700+
}
693701
}else {
694702
ts->status=TRANSACTION_STATUS_ABORTED;
695703
}
@@ -799,6 +807,26 @@ XidStatus MtmGetGlobalTransactionStatus(char const* gid)
799807
returnstatus;
800808
}
801809

810+
voidMtmSetCurrentTransactionCSN(csn_tcsn)
811+
{
812+
MTM_TRACE("Set current transaction CSN %ld\n",csn);
813+
dtmTx.csn=csn;
814+
dtmTx.isDistributed= true;
815+
dtmTx.isReplicated= true;
816+
}
817+
818+
819+
csn_tMtmGetTransactionCSN(TransactionIdxid)
820+
{
821+
MtmTransState*ts;
822+
csn_tcsn;
823+
MtmLock(LW_SHARED);
824+
ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
825+
Assert(ts!=NULL);
826+
csn=ts->csn;
827+
MtmUnlock();
828+
returncsn;
829+
}
802830

803831
/*
804832
* -------------------------------------------

‎multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1111
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1212
*/
13+
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1314
#defineMTM_TRACE(fmt, ...)
14-
#define MTM_TUPLE_TRACE(fmt, ...)
1515
/* */
1616

1717
#defineMULTIMASTER_NAME "multimaster"
@@ -162,6 +162,8 @@ extern MtmState* MtmGetState(void);
162162
externtimestamp_tMtmGetCurrentTime(void);
163163
externvoidMtmSleep(timestamp_tinterval);
164164
externvoidMtmSetCurrentTransactionGID(charconst*gid);
165+
externcsn_tMtmGetTransactionCSN(TransactionIdxid);
166+
externvoidMtmSetCurrentTransactionCSN(csn_tcsn);
165167
externTransactionIdMtmGetCurrentTransactionId(void);
166168
externXidStatusMtmGetCurrentTransactionStatus(void);
167169
externXidStatusMtmGetGlobalTransactionStatus(charconst*gid);

‎pglogical_apply.c

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -465,20 +465,37 @@ read_rel(StringInfo s, LOCKMODE mode)
465465
}
466466

467467
staticvoid
468-
MtmSetCurrentSession(intnodeId)
468+
MtmBeginSession(intnodeId)
469469
{
470+
#if0
470471
charslot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
471472
sprintf(slot_name,MULTIMASTER_SLOT_PATTERN,nodeId);
473+
Assert(replorigin_session_origin==InvalidRepOriginId);
472474
replorigin_session_origin=replorigin_by_name(slot_name, false);
475+
MTM_INFO("%d: Begin setup replorigin session: %d\n",MyProcPid,replorigin_session_origin);
473476
replorigin_session_setup(replorigin_session_origin);
477+
MTM_INFO("%d: End setup replorigin session: %d\n",MyProcPid,replorigin_session_origin);
478+
#endif
479+
}
480+
481+
staticvoid
482+
MtmEndSession(void)
483+
{
484+
if (replorigin_session_origin!=InvalidRepOriginId) {
485+
MTM_INFO("%d: Begin reset replorigin session: %d\n",MyProcPid,replorigin_session_origin);
486+
replorigin_session_origin=InvalidRepOriginId;
487+
replorigin_session_reset();
488+
MTM_INFO("%d: End reset replorigin session: %d\n",MyProcPid,replorigin_session_origin);
489+
}
474490
}
475491

476492
staticvoid
477493
process_remote_commit(StringInfoin)
478494
{
479-
uint8flags;
480-
uint8nodeId;
481-
constchar*gid=NULL;
495+
uint8flags;
496+
uint8nodeId;
497+
csn_tcsn;
498+
constchar*gid=NULL;
482499

483500
/* read flags */
484501
flags=pq_getmsgbyte(in);
@@ -489,14 +506,16 @@ process_remote_commit(StringInfo in)
489506
pq_getmsgint64(in);/* end_lsn */
490507
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
491508

509+
Assert(replorigin_session_origin==InvalidRepOriginId);
510+
492511
switch(PGLOGICAL_XACT_EVENT(flags))
493512
{
494513
casePGLOGICAL_COMMIT:
495514
{
496515
MTM_TRACE("%d: PGLOGICAL_COMMIT commit\n",MyProcPid);
497516
if (IsTransactionState()) {
498517
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
499-
MtmSetCurrentSession(nodeId);
518+
MtmBeginSession(nodeId);
500519
CommitTransactionCommand();
501520
}
502521
break;
@@ -510,7 +529,7 @@ process_remote_commit(StringInfo in)
510529
BeginTransactionBlock();
511530
CommitTransactionCommand();
512531
StartTransactionCommand();
513-
MtmSetCurrentSession(nodeId);
532+
MtmBeginSession(nodeId);
514533
/* PREPARE itself */
515534
MtmSetCurrentTransactionGID(gid);
516535
PrepareTransactionBlock(gid);
@@ -520,10 +539,12 @@ process_remote_commit(StringInfo in)
520539
casePGLOGICAL_COMMIT_PREPARED:
521540
{
522541
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
542+
csn=pq_getmsgint64(in);
523543
gid=pq_getmsgstring(in);
524544
MTM_TRACE("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n",MyProcPid,csn,gid);
525545
StartTransactionCommand();
526-
MtmSetCurrentSession(nodeId);
546+
MtmBeginSession(nodeId);
547+
MtmSetCurrentTransactionCSN(csn);
527548
MtmSetCurrentTransactionGID(gid);
528549
FinishPreparedTransaction(gid, true);
529550
CommitTransactionCommand();
@@ -545,8 +566,7 @@ process_remote_commit(StringInfo in)
545566
default:
546567
Assert(false);
547568
}
548-
replorigin_session_reset();
549-
replorigin_session_origin=InvalidRepOriginId;
569+
MtmEndSession();
550570
}
551571

552572
staticvoid
@@ -859,10 +879,10 @@ void MtmExecutor(int id, void* work, size_t size)
859879
{
860880
StringInfoDatas;
861881
Relationrel=NULL;
862-
initStringInfo(&s);
863882
s.data=work;
864883
s.len=size;
865884
s.maxlen=-1;
885+
s.cursor=0;
866886

867887
if (ApplyContext==NULL) {
868888
ApplyContext=AllocSetContextCreate(TopMemoryContext,
@@ -910,12 +930,10 @@ void MtmExecutor(int id, void* work, size_t size)
910930
}
911931
PG_CATCH();
912932
{
913-
if (replorigin_session_origin!=InvalidRepOriginId) {
914-
replorigin_session_reset();
915-
}
916933
EmitErrorReport();
917934
FlushErrorState();
918935
MTM_TRACE("%d: REMOTE begin abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
936+
MtmEndSession();
919937
AbortCurrentTransaction();
920938
MTM_TRACE("%d: REMOTE end abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
921939
}

‎pglogical_proto.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
169169
pq_sendint64(out,txn->end_lsn);
170170
pq_sendint64(out,txn->commit_time);
171171

172+
if (flags==PGLOGICAL_COMMIT_PREPARED) {
173+
pq_sendint64(out,MtmGetTransactionCSN(txn->xid));
174+
}
172175
if (flags!=PGLOGICAL_COMMIT) {
173176
pq_sendstring(out,txn->gid);
174177
}

‎pglogical_receiver.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ pglogical_receiver_main(Datum main_arg)
214214
XLogRecPtroriginStartPos=0;
215215
RepOriginIdoriginId;
216216
char*originName;
217+
/* Buffer for COPY data */
218+
char*copybuf=NULL;
217219

218220
/* Register functions for SIGTERM/SIGHUP management */
219221
pqsignal(SIGHUP,receiver_raw_sighup);
@@ -314,8 +316,6 @@ pglogical_receiver_main(Datum main_arg)
314316
while (!got_sigterm)
315317
{
316318
intrc,hdr_len;
317-
/* Buffer for COPY data */
318-
char*copybuf=NULL;
319319
/* Wait necessary amount of time */
320320
rc=WaitLatch(&MyProc->procLatch,
321321
WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH,
@@ -347,13 +347,6 @@ pglogical_receiver_main(Datum main_arg)
347347
}
348348

349349

350-
/* Some cleanup */
351-
if (copybuf!=NULL)
352-
{
353-
PQfreemem(copybuf);
354-
copybuf=NULL;
355-
}
356-
357350
/*
358351
* Receive data.
359352
*/
@@ -362,6 +355,13 @@ pglogical_receiver_main(Datum main_arg)
362355
XLogRecPtrwalEnd;
363356
char*stmt;
364357

358+
/* Some cleanup */
359+
if (copybuf!=NULL)
360+
{
361+
PQfreemem(copybuf);
362+
copybuf=NULL;
363+
}
364+
365365
rc=PQgetCopyData(conn,&copybuf,1);
366366
if (rc <=0) {
367367
break;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp