Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitabaab48

Browse files
knizhnikkelvich
authored andcommitted
Reset memory context after each iteration of pglogical apply
1 parentd3f6b2d commitabaab48

File tree

4 files changed

+52
-48
lines changed

4 files changed

+52
-48
lines changed

‎arbiter.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -724,9 +724,9 @@ static void MtmSender(Datum arg)
724724
MTM_ELOG(LOG,"Start arbiter sender %d",MyProcPid);
725725
InitializeTimeouts();
726726

727-
signal(SIGINT,SetStop);
728-
signal(SIGQUIT,SetStop);
729-
signal(SIGTERM,SetStop);
727+
pqsignal(SIGINT,SetStop);
728+
pqsignal(SIGQUIT,SetStop);
729+
pqsignal(SIGTERM,SetStop);
730730

731731
/* We're now ready to receive signals */
732732
BackgroundWorkerUnblockSignals();
@@ -803,9 +803,9 @@ static bool MtmRecovery()
803803

804804
staticvoidMtmMonitor(Datumarg)
805805
{
806-
signal(SIGINT,SetStop);
807-
signal(SIGQUIT,SetStop);
808-
signal(SIGTERM,SetStop);
806+
pqsignal(SIGINT,SetStop);
807+
pqsignal(SIGQUIT,SetStop);
808+
pqsignal(SIGTERM,SetStop);
809809

810810
/* We're now ready to receive signals */
811811
BackgroundWorkerUnblockSignals();
@@ -840,9 +840,9 @@ static void MtmReceiver(Datum arg)
840840
max_fd=0;
841841
#endif
842842

843-
signal(SIGINT,SetStop);
844-
signal(SIGQUIT,SetStop);
845-
signal(SIGTERM,SetStop);
843+
pqsignal(SIGINT,SetStop);
844+
pqsignal(SIGQUIT,SetStop);
845+
pqsignal(SIGTERM,SetStop);
846846

847847
/* We're now ready to receive signals */
848848
BackgroundWorkerUnblockSignals();

‎bgwpool.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ static void BgwPoolMainLoop(BgwPool* pool)
3535
MtmIsLogicalReceiver= true;
3636
MtmPool=pool;
3737

38-
signal(SIGINT,BgwShutdownWorker);
39-
signal(SIGQUIT,BgwShutdownWorker);
40-
signal(SIGTERM,BgwShutdownWorker);
38+
pqsignal(SIGINT,BgwShutdownWorker);
39+
pqsignal(SIGQUIT,BgwShutdownWorker);
40+
pqsignal(SIGTERM,BgwShutdownWorker);
4141

4242
BackgroundWorkerUnblockSignals();
4343
BackgroundWorkerInitializeConnection(pool->dbname,pool->dbuser);

‎multimaster.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,8 +528,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
528528
if (ts!=NULL/*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
529529
{
530530
if (ts->csn>MtmTx.snapshot) {
531-
MTM_LOG4("%d: tuple with xid=%d(csn=%lld) is invisible in snapshot %lld",
532-
MyProcPid,xid,ts->csn,MtmTx.snapshot);
531+
MTM_LOG4("%d: tuple with xid=%lld(csn=%lld) is invisible in snapshot %lld",
532+
MyProcPid,(long64)xid,ts->csn,MtmTx.snapshot);
533533
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
534534
MTM_ELOG(WARNING,"Backend %d waits for transaction %s (%llu) status %lld usecs",MyProcPid,ts->gid, (long64)xid,MtmGetSystemTime()-start);
535535
}
@@ -569,8 +569,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
569569
else
570570
{
571571
boolinvisible=ts->status!=TRANSACTION_STATUS_COMMITTED;
572-
MTM_LOG4("%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld",
573-
MyProcPid,xid,ts->csn,invisible ?"rollbacked" :"committed",MtmTx.snapshot);
572+
MTM_LOG4("%d: tuple with xid=%lld(csn= %lld) is %s in snapshot %lld",
573+
MyProcPid,(long64)xid,ts->csn,invisible ?"rollbacked" :"committed",MtmTx.snapshot);
574574
MtmUnlock();
575575
if (MtmGetSystemTime()-start>USECS_PER_SEC) {
576576
MTM_ELOG(WARNING,"Backend %d waits for %s transaction %s (%llu) %lld usecs",MyProcPid,invisible ?"rollbacked" :"committed",
@@ -581,7 +581,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
581581
}
582582
else
583583
{
584-
MTM_LOG4("%d: visibility check is skipped for transaction %u in snapshot %llu",MyProcPid,xid,MtmTx.snapshot);
584+
MTM_LOG4("%d: visibility check is skipped for transaction %llu in snapshot %llu",MyProcPid,(long64)xid,MtmTx.snapshot);
585585
MtmUnlock();
586586
returnPgXidInMVCCSnapshot(xid,snapshot);
587587
}
@@ -4945,6 +4945,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
49454945
break;
49464946

49474947
caseT_DropStmt:
4948+
caseT_TruncateStmt:
49484949
{
49494950
DropStmt*stmt= (DropStmt*)parsetree;
49504951
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent)

‎pglogical_apply.c

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -408,16 +408,13 @@ process_remote_message(StringInfo s)
408408
if (MtmVacuumStmt!=NULL) {
409409
ExecVacuum(MtmVacuumStmt,1);
410410
}elseif (MtmIndexStmt!=NULL) {
411-
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
412411
Oidrelid=RangeVarGetRelidExtended(MtmIndexStmt->relation,ShareUpdateExclusiveLock,
413412
false, false,
414413
NULL,
415414
NULL);
416415
/* Run parse analysis ... */
417416
MtmIndexStmt=transformIndexStmt(relid,MtmIndexStmt,messageBody);
418417

419-
MemoryContextSwitchTo(oldContext);
420-
421418
DefineIndex(relid,/* OID of heap relation */
422419
MtmIndexStmt,
423420
InvalidOid,/* no predefined OID */
@@ -618,6 +615,7 @@ read_rel(StringInfo s, LOCKMODE mode)
618615
RangeVar*rv;
619616
Oidremote_relid=pq_getmsgint(s,4);
620617
Oidlocal_relid;
618+
MemoryContextold_context;
621619

622620
local_relid=pglogical_relid_map_get(remote_relid);
623621
if (local_relid==InvalidOid) {
@@ -630,7 +628,9 @@ read_rel(StringInfo s, LOCKMODE mode)
630628
rv->relname= (char*)pq_getmsgbytes(s,relnamelen);
631629

632630
local_relid=RangeVarGetRelidExtended(rv,mode, false, false,NULL,NULL);
631+
old_context=MemoryContextSwitchTo(TopMemoryContext);
633632
pglogical_relid_map_put(remote_relid,local_relid);
633+
MemoryContextSwitchTo(old_context);
634634
}else {
635635
nspnamelen=pq_getmsgbyte(s);
636636
s->cursor+=nspnamelen;
@@ -1060,7 +1060,8 @@ void MtmExecutor(void* work, size_t size)
10601060
intspill_file=-1;
10611061
intsave_cursor=0;
10621062
intsave_len=0;
1063-
MemoryContexttopContext;
1063+
MemoryContextold_context;
1064+
MemoryContexttop_context;
10641065

10651066
s.data=work;
10661067
s.len=size;
@@ -1074,13 +1075,15 @@ void MtmExecutor(void* work, size_t size)
10741075
ALLOCSET_DEFAULT_INITSIZE,
10751076
ALLOCSET_DEFAULT_MAXSIZE);
10761077
}
1077-
topContext=MemoryContextSwitchTo(MtmApplyContext);
1078-
1078+
top_context=MemoryContextSwitchTo(MtmApplyContext);
10791079
replorigin_session_origin=InvalidRepOriginId;
10801080
PG_TRY();
10811081
{
1082-
while (true) {
1082+
boolinside_transaction= true;
1083+
do {
10831084
charaction=pq_getmsgbyte(&s);
1085+
old_context=MemoryContextSwitchTo(MtmApplyContext);
1086+
10841087
MTM_LOG2("%d: REMOTE process action %c",MyProcPid,action);
10851088
#if0
10861089
if (Mtm->status==MTM_RECOVERY) {
@@ -1091,84 +1094,81 @@ void MtmExecutor(void* work, size_t size)
10911094
switch (action) {
10921095
/* BEGIN */
10931096
case'B':
1094-
if (process_remote_begin(&s)) {
1095-
continue;
1096-
}else {
1097-
break;
1098-
}
1097+
inside_transaction=process_remote_begin(&s);
1098+
break;
10991099
/* COMMIT */
11001100
case'C':
11011101
close_rel(rel);
11021102
process_remote_commit(&s);
1103+
inside_transaction= false;
11031104
break;
11041105
/* INSERT */
11051106
case'I':
1106-
process_remote_insert(&s,rel);
1107-
continue;
1107+
process_remote_insert(&s,rel);
1108+
break;
11081109
/* UPDATE */
11091110
case'U':
11101111
process_remote_update(&s,rel);
1111-
continue;
1112+
break;
11121113
/* DELETE */
11131114
case'D':
11141115
process_remote_delete(&s,rel);
1115-
continue;
1116+
break;
11161117
case'R':
11171118
close_rel(rel);
11181119
rel=read_rel(&s,RowExclusiveLock);
1119-
continue;
1120+
break;
11201121
case'F':
11211122
{
11221123
intnode_id=pq_getmsgint(&s,4);
11231124
intfile_id=pq_getmsgint(&s,4);
11241125
Assert(spill_file<0);
11251126
spill_file=MtmOpenSpillFile(node_id,file_id);
1126-
continue;
1127+
break;
11271128
}
11281129
case'(':
11291130
{
11301131
size_tsize=pq_getmsgint(&s,4);
1131-
s.data=palloc(size);
1132+
s.data=MemoryContextAlloc(TopMemoryContext,size);
11321133
save_cursor=s.cursor;
11331134
save_len=s.len;
11341135
s.cursor=0;
11351136
s.len=size;
11361137
MtmReadSpillFile(spill_file,s.data,size);
1137-
continue;
1138+
break;
11381139
}
11391140
case')':
11401141
{
11411142
pfree(s.data);
11421143
s.data=work;
11431144
s.cursor=save_cursor;
11441145
s.len=save_len;
1145-
continue;
1146+
break;
11461147
}
11471148
case'M':
11481149
{
1149-
if (process_remote_message(&s)) {
1150-
break;
1151-
}
1152-
continue;
1150+
inside_transaction= !process_remote_message(&s);
1151+
break;
11531152
}
11541153
case'Z':
11551154
{
11561155
MtmRecoveryCompleted();
1156+
inside_transaction= false;
11571157
break;
11581158
}
11591159
default:
11601160
MTM_ELOG(ERROR,"unknown action of type %c",action);
11611161
}
1162-
break;
1163-
}
1162+
MemoryContextSwitchTo(old_context);
1163+
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1164+
}while (inside_transaction);
11641165
}
11651166
PG_CATCH();
11661167
{
1167-
MemoryContextoldcontext;
11681168
MtmReleaseLock();
1169-
oldcontext=MemoryContextSwitchTo(MtmApplyContext);
1169+
old_context=MemoryContextSwitchTo(MtmApplyContext);
11701170
MtmHandleApplyError();
1171-
MemoryContextSwitchTo(oldcontext);
1171+
MemoryContextSwitchTo(old_context);
11721172
EmitErrorReport();
11731173
FlushErrorState();
11741174
MTM_LOG1("%d: REMOTE begin abort transaction %llu",MyProcPid, (long64)MtmGetCurrentTransactionId());
@@ -1178,12 +1178,15 @@ void MtmExecutor(void* work, size_t size)
11781178
MTM_LOG2("%d: REMOTE end abort transaction %llu",MyProcPid, (long64)MtmGetCurrentTransactionId());
11791179
}
11801180
PG_END_TRY();
1181+
if (s.data!=work) {
1182+
pfree(s.data);
1183+
}
11811184
#if0/* spill file is expecrted to be closed by tranaction commit or rollback */
11821185
if (spill_file >=0) {
11831186
MtmCloseSpillFile(spill_file);
11841187
}
11851188
#endif
1189+
MemoryContextSwitchTo(top_context);
11861190
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1187-
MemoryContextSwitchTo(topContext);
11881191
}
11891192

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp