Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita4bddc7

Browse files
knizhnikkelvich
authored andcommitted
Limit maximal size of MtmTransSpillThreshold
1 parentd307417 commita4bddc7

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

‎multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2848,7 +2848,7 @@ _PG_init(void)
28482848
&MtmTransSpillThreshold,
28492849
1000,/* 1Gb */
28502850
0,
2851-
(MaxAllocSize-1)/MB,
2851+
MaxAllocSize/MB,
28522852
PGC_BACKEND,
28532853
0,
28542854
NULL,

‎pglogical_receiver.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -522,16 +522,18 @@ pglogical_receiver_main(Datum main_arg)
522522

523523
if (rc>hdr_len)
524524
{
525+
intmsg_len=rc-hdr_len;
525526
stmt=copybuf+hdr_len;
526527
if (mode==REPLMODE_RECOVERED) {
528+
/* Ingore all incompleted transactions from recovered node */
527529
if (stmt[0]!='B') {
528530
output_written_lsn=Max(walEnd,output_written_lsn);
529531
continue;
530532
}
531533
mode=REPLMODE_OPEN_EXISTED;
532534
}
533535
MTM_LOG3("Receive message %c from node %d",stmt[0],nodeId);
534-
if (buf.used >=MtmTransSpillThreshold*MB) {
536+
if (buf.used+msg_len+1>=MtmTransSpillThreshold*MB) {
535537
if (spill_file<0) {
536538
intfile_id;
537539
spill_file=MtmCreateSpillFile(nodeId,&file_id);
@@ -548,15 +550,15 @@ pglogical_receiver_main(Datum main_arg)
548550
if (stmt[0]=='Z'|| (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='C'))) {
549551
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
550552
if (stmt[0]=='M'&&stmt[1]=='C') {/* concurrent DDL should be executed by parallel workers */
551-
MtmExecute(stmt,rc-hdr_len);
553+
MtmExecute(stmt,msg_len);
552554
}else {
553-
MtmExecutor(stmt,rc-hdr_len);/* all other messages can be processed by receiver itself */
555+
MtmExecutor(stmt,msg_len);/* all other messages can be processed by receiver itself */
554556
}
555557
}else {
556-
ByteBufferAppend(&buf,stmt,rc-hdr_len);
558+
ByteBufferAppend(&buf,stmt,msg_len);
557559
if (stmt[0]=='C')/* commit */
558560
{
559-
if (!MtmFilterTransaction(stmt,rc-hdr_len))
561+
if (!MtmFilterTransaction(stmt,msg_len))
560562
{
561563
if (spill_file >=0) {
562564
ByteBufferAppend(&buf,")",1);
@@ -568,7 +570,7 @@ pglogical_receiver_main(Datum main_arg)
568570
spill_file=-1;
569571
resetStringInfo(&spill_info);
570572
}else {
571-
if (MtmPreserveCommitOrder&&buf.used==rc-hdr_len) {
573+
if (MtmPreserveCommitOrder&&buf.used==msg_len) {
572574
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
573575
timestamp_tstop,start=MtmGetSystemTime();
574576
MtmExecutor(buf.data,buf.used);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp