Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit288462c

Browse files
committed
Reset memory context after each iteration of pglogical apply
1 parentcc596b6 commit288462c

File tree

4 files changed

+52
-48
lines changed

4 files changed

+52
-48
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -724,9 +724,9 @@ static void MtmSender(Datum arg)
724724
MTM_ELOG(LOG,"Start arbiter sender %d",MyProcPid);
725725
InitializeTimeouts();
726726

727-
signal(SIGINT,SetStop);
728-
signal(SIGQUIT,SetStop);
729-
signal(SIGTERM,SetStop);
727+
pqsignal(SIGINT,SetStop);
728+
pqsignal(SIGQUIT,SetStop);
729+
pqsignal(SIGTERM,SetStop);
730730

731731
/* We're now ready to receive signals */
732732
BackgroundWorkerUnblockSignals();
@@ -803,9 +803,9 @@ static bool MtmRecovery()
803803

804804
staticvoidMtmMonitor(Datumarg)
805805
{
806-
signal(SIGINT,SetStop);
807-
signal(SIGQUIT,SetStop);
808-
signal(SIGTERM,SetStop);
806+
pqsignal(SIGINT,SetStop);
807+
pqsignal(SIGQUIT,SetStop);
808+
pqsignal(SIGTERM,SetStop);
809809

810810
/* We're now ready to receive signals */
811811
BackgroundWorkerUnblockSignals();
@@ -840,9 +840,9 @@ static void MtmReceiver(Datum arg)
840840
max_fd=0;
841841
#endif
842842

843-
signal(SIGINT,SetStop);
844-
signal(SIGQUIT,SetStop);
845-
signal(SIGTERM,SetStop);
843+
pqsignal(SIGINT,SetStop);
844+
pqsignal(SIGQUIT,SetStop);
845+
pqsignal(SIGTERM,SetStop);
846846

847847
/* We're now ready to receive signals */
848848
BackgroundWorkerUnblockSignals();

‎contrib/mmts/bgwpool.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ static void BgwPoolMainLoop(BgwPool* pool)
3535
MtmIsLogicalReceiver= true;
3636
MtmPool=pool;
3737

38-
signal(SIGINT,BgwShutdownWorker);
39-
signal(SIGQUIT,BgwShutdownWorker);
40-
signal(SIGTERM,BgwShutdownWorker);
38+
pqsignal(SIGINT,BgwShutdownWorker);
39+
pqsignal(SIGQUIT,BgwShutdownWorker);
40+
pqsignal(SIGTERM,BgwShutdownWorker);
4141

4242
BackgroundWorkerUnblockSignals();
4343
BackgroundWorkerInitializeConnection(pool->dbname,pool->dbuser);

‎contrib/mmts/multimaster.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -526,8 +526,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
526526
if (ts!=NULL/*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
527527
{
528528
if (ts->csn>MtmTx.snapshot) {
529-
MTM_LOG4("%d: tuple with xid=%d(csn=%lld) is invisible in snapshot %lld",
530-
MyProcPid,xid,ts->csn,MtmTx.snapshot);
529+
MTM_LOG4("%d: tuple with xid=%lld(csn=%lld) is invisible in snapshot %lld",
530+
MyProcPid,(long64)xid,ts->csn,MtmTx.snapshot);
531531
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
532532
MTM_ELOG(WARNING,"Backend %d waits for transaction %s (%llu) status %lld usecs",MyProcPid,ts->gid, (long64)xid,MtmGetSystemTime()-start);
533533
}
@@ -567,8 +567,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
567567
else
568568
{
569569
boolinvisible=ts->status!=TRANSACTION_STATUS_COMMITTED;
570-
MTM_LOG4("%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld",
571-
MyProcPid,xid,ts->csn,invisible ?"rollbacked" :"committed",MtmTx.snapshot);
570+
MTM_LOG4("%d: tuple with xid=%lld(csn= %lld) is %s in snapshot %lld",
571+
MyProcPid,(long64)xid,ts->csn,invisible ?"rollbacked" :"committed",MtmTx.snapshot);
572572
MtmUnlock();
573573
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
574574
MTM_ELOG(WARNING,"Backend %d waits for %s transaction %s (%llu) %lld usecs",MyProcPid,invisible ?"rollbacked" :"committed",
@@ -579,7 +579,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
579579
}
580580
else
581581
{
582-
MTM_LOG4("%d: visibility check is skipped for transaction %u in snapshot %llu",MyProcPid,xid,MtmTx.snapshot);
582+
MTM_LOG4("%d: visibility check is skipped for transaction %llu in snapshot %llu",MyProcPid,(long64)xid,MtmTx.snapshot);
583583
MtmUnlock();
584584
returnPgXidInMVCCSnapshot(xid,snapshot);
585585
}
@@ -4894,6 +4894,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
48944894
break;
48954895

48964896
caseT_DropStmt:
4897+
caseT_TruncateStmt:
48974898
{
48984899
DropStmt*stmt= (DropStmt*)parsetree;
48994900
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent)

‎contrib/mmts/pglogical_apply.c

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -405,16 +405,13 @@ process_remote_message(StringInfo s)
405405
if (MtmVacuumStmt!=NULL) {
406406
ExecVacuum(MtmVacuumStmt,1);
407407
}elseif (MtmIndexStmt!=NULL) {
408-
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
409408
Oidrelid=RangeVarGetRelidExtended(MtmIndexStmt->relation,ShareUpdateExclusiveLock,
410409
false, false,
411410
NULL,
412411
NULL);
413412
/* Run parse analysis ... */
414413
MtmIndexStmt=transformIndexStmt(relid,MtmIndexStmt,messageBody);
415414

416-
MemoryContextSwitchTo(oldContext);
417-
418415
DefineIndex(relid,/* OID of heap relation */
419416
MtmIndexStmt,
420417
InvalidOid,/* no predefined OID */
@@ -599,6 +596,7 @@ read_rel(StringInfo s, LOCKMODE mode)
599596
RangeVar*rv;
600597
Oidremote_relid=pq_getmsgint(s,4);
601598
Oidlocal_relid;
599+
MemoryContextold_context;
602600

603601
local_relid=pglogical_relid_map_get(remote_relid);
604602
if (local_relid==InvalidOid) {
@@ -611,7 +609,9 @@ read_rel(StringInfo s, LOCKMODE mode)
611609
rv->relname= (char*)pq_getmsgbytes(s,relnamelen);
612610

613611
local_relid=RangeVarGetRelidExtended(rv,mode, false, false,NULL,NULL);
612+
old_context=MemoryContextSwitchTo(TopMemoryContext);
614613
pglogical_relid_map_put(remote_relid,local_relid);
614+
MemoryContextSwitchTo(old_context);
615615
}else {
616616
nspnamelen=pq_getmsgbyte(s);
617617
s->cursor+=nspnamelen;
@@ -1041,7 +1041,8 @@ void MtmExecutor(void* work, size_t size)
10411041
intspill_file=-1;
10421042
intsave_cursor=0;
10431043
intsave_len=0;
1044-
MemoryContexttopContext;
1044+
MemoryContextold_context;
1045+
MemoryContexttop_context;
10451046

10461047
s.data=work;
10471048
s.len=size;
@@ -1055,13 +1056,15 @@ void MtmExecutor(void* work, size_t size)
10551056
ALLOCSET_DEFAULT_INITSIZE,
10561057
ALLOCSET_DEFAULT_MAXSIZE);
10571058
}
1058-
topContext=MemoryContextSwitchTo(MtmApplyContext);
1059-
1059+
top_context=MemoryContextSwitchTo(MtmApplyContext);
10601060
replorigin_session_origin=InvalidRepOriginId;
10611061
PG_TRY();
10621062
{
1063-
while (true) {
1063+
boolinside_transaction= true;
1064+
do {
10641065
charaction=pq_getmsgbyte(&s);
1066+
old_context=MemoryContextSwitchTo(MtmApplyContext);
1067+
10651068
MTM_LOG2("%d: REMOTE process action %c",MyProcPid,action);
10661069
#if0
10671070
if (Mtm->status==MTM_RECOVERY) {
@@ -1072,84 +1075,81 @@ void MtmExecutor(void* work, size_t size)
10721075
switch (action) {
10731076
/* BEGIN */
10741077
case'B':
1075-
if (process_remote_begin(&s)) {
1076-
continue;
1077-
}else {
1078-
break;
1079-
}
1078+
inside_transaction=process_remote_begin(&s);
1079+
break;
10801080
/* COMMIT */
10811081
case'C':
10821082
close_rel(rel);
10831083
process_remote_commit(&s);
1084+
inside_transaction= false;
10841085
break;
10851086
/* INSERT */
10861087
case'I':
1087-
process_remote_insert(&s,rel);
1088-
continue;
1088+
process_remote_insert(&s,rel);
1089+
break;
10891090
/* UPDATE */
10901091
case'U':
10911092
process_remote_update(&s,rel);
1092-
continue;
1093+
break;
10931094
/* DELETE */
10941095
case'D':
10951096
process_remote_delete(&s,rel);
1096-
continue;
1097+
break;
10971098
case'R':
10981099
close_rel(rel);
10991100
rel=read_rel(&s,RowExclusiveLock);
1100-
continue;
1101+
break;
11011102
case'F':
11021103
{
11031104
intnode_id=pq_getmsgint(&s,4);
11041105
intfile_id=pq_getmsgint(&s,4);
11051106
Assert(spill_file<0);
11061107
spill_file=MtmOpenSpillFile(node_id,file_id);
1107-
continue;
1108+
break;
11081109
}
11091110
case'(':
11101111
{
11111112
size_tsize=pq_getmsgint(&s,4);
1112-
s.data=palloc(size);
1113+
s.data=MemoryContextAlloc(TopMemoryContext,size);
11131114
save_cursor=s.cursor;
11141115
save_len=s.len;
11151116
s.cursor=0;
11161117
s.len=size;
11171118
MtmReadSpillFile(spill_file,s.data,size);
1118-
continue;
1119+
break;
11191120
}
11201121
case')':
11211122
{
11221123
pfree(s.data);
11231124
s.data=work;
11241125
s.cursor=save_cursor;
11251126
s.len=save_len;
1126-
continue;
1127+
break;
11271128
}
11281129
case'M':
11291130
{
1130-
if (process_remote_message(&s)) {
1131-
break;
1132-
}
1133-
continue;
1131+
inside_transaction= !process_remote_message(&s);
1132+
break;
11341133
}
11351134
case'Z':
11361135
{
11371136
MtmRecoveryCompleted();
1137+
inside_transaction= false;
11381138
break;
11391139
}
11401140
default:
11411141
MTM_ELOG(ERROR,"unknown action of type %c",action);
11421142
}
1143-
break;
1144-
}
1143+
MemoryContextSwitchTo(old_context);
1144+
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1145+
}while (inside_transaction);
11451146
}
11461147
PG_CATCH();
11471148
{
1148-
MemoryContextoldcontext;
11491149
MtmReleaseLock();
1150-
oldcontext=MemoryContextSwitchTo(MtmApplyContext);
1150+
old_context=MemoryContextSwitchTo(MtmApplyContext);
11511151
MtmHandleApplyError();
1152-
MemoryContextSwitchTo(oldcontext);
1152+
MemoryContextSwitchTo(old_context);
11531153
EmitErrorReport();
11541154
FlushErrorState();
11551155
MTM_LOG1("%d: REMOTE begin abort transaction %llu",MyProcPid, (long64)MtmGetCurrentTransactionId());
@@ -1159,12 +1159,15 @@ void MtmExecutor(void* work, size_t size)
11591159
MTM_LOG2("%d: REMOTE end abort transaction %llu",MyProcPid, (long64)MtmGetCurrentTransactionId());
11601160
}
11611161
PG_END_TRY();
1162+
if (s.data!=work) {
1163+
pfree(s.data);
1164+
}
11621165
#if0/* spill file is expecrted to be closed by tranaction commit or rollback */
11631166
if (spill_file >=0) {
11641167
MtmCloseSpillFile(spill_file);
11651168
}
11661169
#endif
1170+
MemoryContextSwitchTo(top_context);
11671171
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1168-
MemoryContextSwitchTo(topContext);
11691172
}
11701173

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp