Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0621b7b

Browse files
knizhnikkelvich
authored andcommitted
Spill applied transactions to the disk
1 parentb48c191 commit0621b7b

File tree

7 files changed

+217
-72
lines changed

7 files changed

+217
-72
lines changed

‎Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o
33

44
overrideCPPFLAGS += -I../raftable
55

@@ -15,7 +15,7 @@ all: multimaster.so
1515
tests/dtmbench:
1616
make -C tests
1717

18-
PG_CPPFLAGS = -I$(libpq_srcdir) -DUSE_PGLOGICAL_OUTPUT
18+
PG_CPPFLAGS = -I$(libpq_srcdir)
1919
SHLIB_LINK =$(libpq)
2020

2121
ifdefUSE_PGXS

‎multimaster.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ int MtmConnectTimeout;
184184
intMtmKeepaliveTimeout;
185185
intMtmReconnectAttempts;
186186
intMtmNodeDisableDelay;
187+
intMtmTransSpillThreshold;
187188
boolMtmUseRaftable;
188189
boolMtmUseDtm;
189190
MtmConnectionInfo*MtmConnections;
@@ -1247,6 +1248,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
12471248
matrix[i] |= ((matrix[j] >>i)&1) <<j;
12481249
matrix[j] |= ((matrix[i] >>j)&1) <<i;
12491250
}
1251+
matrix[i] &= ~((nodemask_t)1 <<i);
12501252
}
12511253
return true;
12521254
}
@@ -1630,6 +1632,21 @@ _PG_init(void)
16301632
if (!process_shared_preload_libraries_in_progress)
16311633
return;
16321634

1635+
DefineCustomIntVariable(
1636+
"multimaster.trans_spill_threshold",
1637+
"Maximal size (Mb) of transaction after which transaction is written to the disk",
1638+
NULL,
1639+
&MtmTransSpillThreshold,
1640+
1000,/* 1Gb */
1641+
0,
1642+
INT_MAX,
1643+
PGC_BACKEND,
1644+
0,
1645+
NULL,
1646+
NULL,
1647+
NULL
1648+
);
1649+
16331650
DefineCustomIntVariable(
16341651
"multimaster.twopc_min_timeout",
16351652
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ extern int MtmConnectTimeout;
205205
externintMtmReconnectAttempts;
206206
externintMtmKeepaliveTimeout;
207207
externintMtmNodeDisableDelay;
208+
externintMtmTransSpillThreshold;
208209
externboolMtmUseDtm;
209210
externHTAB*MtmXid2State;
210211

‎pglogical_apply.c

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
#include"multimaster.h"
5252
#include"pglogical_relid_map.h"
53+
#include"spill.h"
5354

5455
typedefstructTupleData
5556
{
@@ -901,6 +902,9 @@ void MtmExecutor(int id, void* work, size_t size)
901902
{
902903
StringInfoDatas;
903904
Relationrel=NULL;
905+
intspill_file=-1;
906+
intsave_cursor;
907+
intsave_len;
904908
s.data=work;
905909
s.len=size;
906910
s.maxlen=-1;
@@ -944,6 +948,33 @@ void MtmExecutor(int id, void* work, size_t size)
944948
case'R':
945949
rel=read_rel(&s,RowExclusiveLock);
946950
continue;
951+
case'F':
952+
{
953+
intnode_id=pq_getmsgint(&s,4);
954+
intfile_id=pq_getmsgint(&s,4);
955+
Assert(spill_file<0);
956+
spill_file=MtmOpenSpillFile(node_id,file_id);
957+
continue;
958+
}
959+
case'(':
960+
{
961+
int64size=pq_getmsgint(&s,4);
962+
s.data=palloc(size);
963+
save_cursor=s.cursor;
964+
save_len=s.len;
965+
s.cursor=0;
966+
s.len=size;
967+
MtmReadSpillFile(spill_file,s.data,size);
968+
continue;
969+
}
970+
case')':
971+
{
972+
pfree(s.data);
973+
s.data=work;
974+
s.cursor=save_cursor;
975+
s.len=save_len;
976+
continue;
977+
}
947978
default:
948979
elog(ERROR,"unknown action of type %c",action);
949980
}
@@ -963,7 +994,9 @@ void MtmExecutor(int id, void* work, size_t size)
963994
MTM_LOG2("%d: REMOTE end abort transaction %d",MyProcPid,MtmGetCurrentTransactionId());
964995
}
965996
PG_END_TRY();
966-
997+
if (spill_file >=0) {
998+
MtmCloseSpillFile(spill_file);
999+
}
9671000
MemoryContextResetAndDeleteChildren(ApplyContext);
9681001
}
9691002

‎pglogical_receiver.c

Lines changed: 38 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include"access/clog.h"
2525
#include"access/transam.h"
2626
#include"lib/stringinfo.h"
27+
#include"libpq/pqformat.h"
2728
#include"pgstat.h"
2829
#include"postmaster/bgworker.h"
2930
#include"storage/ipc.h"
@@ -35,6 +36,7 @@
3536
#include"replication/origin.h"
3637

3738
#include"multimaster.h"
39+
#include"spill.h"
3840

3941
/* Allow load of this module in shared libs */
4042

@@ -213,20 +215,23 @@ pglogical_receiver_main(Datum main_arg)
213215
PGresult*res;
214216
MtmSlotModemode;
215217

216-
#ifndefUSE_PGLOGICAL_OUTPUT
217-
boolinsideTrans= false;
218-
#endif
219218
ByteBufferbuf;
220219
XLogRecPtroriginStartPos=0;
221220
RepOriginIdoriginId;
222221
char*originName;
223222
/* Buffer for COPY data */
224223
char*copybuf=NULL;
224+
intspill_file=-1;
225+
StringInfoDataspill_info;
226+
227+
initStringInfo(&spill_info);
225228

226229
/* Register functions for SIGTERM/SIGHUP management */
227230
pqsignal(SIGHUP,receiver_raw_sighup);
228231
pqsignal(SIGTERM,receiver_raw_sigterm);
229232

233+
MtmCreateSpillDirectory(args->remote_node);
234+
230235
sprintf(worker_proc,"mtm_pglogical_receiver_%d_%d",args->local_node,args->remote_node);
231236

232237
/* We're now ready to receive signals */
@@ -449,34 +454,38 @@ pglogical_receiver_main(Datum main_arg)
449454
if (rc>hdr_len)
450455
{
451456
stmt=copybuf+hdr_len;
452-
453-
#ifdefUSE_PGLOGICAL_OUTPUT
457+
458+
if (buf.used >=MtmTransSpillThreshold) {
459+
if (spill_file<0) {
460+
intfile_id;
461+
spill_file=MtmCreateSpillFile(args->remote_node,&file_id);
462+
pq_sendbyte(&spill_info,'F');
463+
pq_sendint(&spill_info,args->remote_node,4);
464+
pq_sendint(&spill_info,file_id,4);
465+
}
466+
ByteBufferAppend(&buf,")",1);
467+
pq_sendbyte(&spill_info,'(');
468+
pq_sendint(&spill_info,buf.used,4);
469+
MtmSpillToFile(spill_file,buf.data,buf.used);
470+
ByteBufferReset(&buf);
471+
}
454472
ByteBufferAppend(&buf,stmt,rc-hdr_len);
455473
if (stmt[0]=='C')/* commit */
456-
{
457-
MtmExecute(buf.data,buf.used);
474+
{
475+
if (spill_file >=0) {
476+
ByteBufferAppend(&buf,")",1);
477+
pq_sendbyte(&spill_info,'(');
478+
pq_sendint(&spill_info,buf.used,4);
479+
MtmSpillToFile(spill_file,buf.data,buf.used);
480+
MtmCloseSpillFile(spill_file);
481+
MtmExecute(spill_info.data,spill_info.len);
482+
spill_file=-1;
483+
resetStringInfo(&spill_info);
484+
}else {
485+
MtmExecute(buf.data,buf.used);
486+
}
458487
ByteBufferReset(&buf);
459488
}
460-
#else
461-
if (strncmp(stmt,"BEGIN ",6)==0) {
462-
TransactionIdxid;
463-
intrc=sscanf(stmt+6,"%u",&xid);
464-
Assert(rc==1);
465-
ByteBufferAppendInt32(&buf,xid);
466-
Assert(!insideTrans);
467-
insideTrans= true;
468-
}elseif (strncmp(stmt,"COMMIT;",7)==0) {
469-
Assert(insideTrans);
470-
Assert(buf.used>4);
471-
buf.data[buf.used-1]='\0';/* replace last ';' with '\0' to make string zero terminated */
472-
MMExecute(buf.data,buf.used);
473-
ByteBufferReset(&buf);
474-
insideTrans= false;
475-
}else {
476-
Assert(insideTrans);
477-
ByteBufferAppend(&buf,stmt,rc-hdr_len/*strlen(stmt)*/);
478-
}
479-
#endif
480489
}
481490
/* Update written position */
482491
output_written_lsn=Max(walEnd,output_written_lsn);
@@ -575,6 +584,7 @@ void MtmStartReceivers(void)
575584
{
576585
inti;
577586
BackgroundWorkerworker;
587+
578588
MemSet(&worker,0,sizeof(BackgroundWorker));
579589
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION;
580590
worker.bgw_start_time=BgWorkerStart_ConsistentState;
@@ -586,6 +596,7 @@ void MtmStartReceivers(void)
586596
ReceiverArgs*ctx= (ReceiverArgs*)palloc(sizeof(ReceiverArgs));
587597
ctx->receiver_conn_string=psprintf("replication=database %s",MtmConnections[i].connStr);
588598
sprintf(ctx->receiver_slot,MULTIMASTER_SLOT_PATTERN,MtmNodeId);
599+
589600
ctx->local_node=MtmNodeId;
590601
ctx->remote_node=i+1;
591602

@@ -598,45 +609,3 @@ void MtmStartReceivers(void)
598609
}
599610
}
600611

601-
#ifndefUSE_PGLOGICAL_OUTPUT
602-
voidMMExecutor(intid,void*work,size_tsize)
603-
{
604-
TransactionIdxid=*(TransactionId*)work;
605-
char*stmts= (char*)work+4;
606-
boolfinished= false;
607-
608-
MMJoinTransaction(xid);
609-
610-
SetCurrentStatementStartTimestamp();
611-
StartTransactionCommand();
612-
SPI_connect();
613-
PushActiveSnapshot(GetTransactionSnapshot());
614-
615-
PG_TRY();
616-
{
617-
intrc=SPI_execute(stmts, false,0);
618-
SPI_finish();
619-
PopActiveSnapshot();
620-
finished= true;
621-
if (rc!=SPI_OK_INSERT&&rc!=SPI_OK_UPDATE&&rc!=SPI_OK_DELETE) {
622-
ereport(LOG, (errmsg("Executor %d: failed to apply transaction %u",
623-
id,xid)));
624-
AbortCurrentTransaction();
625-
}else {
626-
CommitTransactionCommand();
627-
}
628-
}
629-
PG_CATCH();
630-
{
631-
FlushErrorState();
632-
if (!finished) {
633-
SPI_finish();
634-
if (ActiveSnapshotSet()) {
635-
PopActiveSnapshot();
636-
}
637-
}
638-
AbortCurrentTransaction();
639-
}
640-
PG_END_TRY();
641-
}
642-
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp