Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita564a72

Browse files
knizhnikkelvich
authored andcommitted
Buffer pglogical write requests
1 parent08d32c6 commita564a72

File tree

2 files changed

+44
-24
lines changed

2 files changed

+44
-24
lines changed

‎multimaster.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,7 +1127,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
11271127
CHECK_FOR_INTERRUPTS();
11281128
now=MtmGetSystemTime();
11291129
MtmLock(LW_EXCLUSIVE);
1130-
if (now>deadline) {
1130+
if (MtmMin2PCTimeout!=0&&now>deadline) {
11311131
if (ts->isPrepared) {
11321132
MTM_ELOG(LOG,"Distributed transaction %s (%llu) is not committed in %lld msec",ts->gid, (long64)ts->xid,USEC_TO_MSEC(now-start));
11331133
}else {
@@ -2846,7 +2846,7 @@ _PG_init(void)
28462846
"Maximal size (Mb) of transaction after which transaction is written to the disk",
28472847
NULL,
28482848
&MtmTransSpillThreshold,
2849-
1000,/*1Gb */
2849+
100,/*100Mb */
28502850
0,
28512851
MaxAllocSize/MB,
28522852
PGC_BACKEND,
@@ -3030,8 +3030,8 @@ _PG_init(void)
30303030
"Minimal timeout between receiving PREPARED message from nodes participated in transaction to coordinator (milliseconds)",
30313031
NULL,
30323032
&MtmMin2PCTimeout,
3033-
2000,/*2 seconds */
3034-
1,
3033+
0,/*disabled */
3034+
0,
30353035
INT_MAX,
30363036
PGC_BACKEND,
30373037
0,

‎pglogical_output.c

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,26 @@ static void send_startup_message(LogicalDecodingContext *ctx,
7979

8080
staticboolstartup_message_sent= false;
8181

82+
#defineOUTPUT_BUFFER_SIZE (16*1024*1024)
83+
84+
staticvoidMtmOutputPluginWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
85+
{
86+
if (flush) {
87+
OutputPluginWrite(ctx,last_write);
88+
}
89+
}
90+
91+
staticvoidMtmOutputPluginPrepareWrite(LogicalDecodingContext*ctx,boollast_write,boolflush)
92+
{
93+
if (!ctx->prepared_write) {
94+
OutputPluginPrepareWrite(ctx,last_write);
95+
}elseif (flush||ctx->out->len>OUTPUT_BUFFER_SIZE) {
96+
OutputPluginWrite(ctx, false);
97+
OutputPluginPrepareWrite(ctx,last_write);
98+
}
99+
}
100+
101+
82102
/* specify output plugin callbacks */
83103
void
84104
_PG_output_plugin_init(OutputPluginCallbacks*cb)
@@ -388,16 +408,16 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
388408
send_replication_origin &=txn->origin_id!=InvalidRepOriginId;
389409

390410
if (data->api) {
391-
OutputPluginPrepareWrite(ctx, !send_replication_origin);
411+
MtmOutputPluginPrepareWrite(ctx, !send_replication_origin, true);
392412
data->api->write_begin(ctx->out,data,txn);
393413

394414
if (send_replication_origin)
395415
{
396416
char*origin;
397417

398418
/* Message boundary */
399-
OutputPluginWrite(ctx, false);
400-
OutputPluginPrepareWrite(ctx, true);
419+
MtmOutputPluginWrite(ctx, false, false);
420+
MtmOutputPluginPrepareWrite(ctx, true, false);
401421

402422
/*
403423
* XXX: which behaviour we want here?
@@ -412,7 +432,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
412432
replorigin_by_oid(txn->origin_id, true,&origin))
413433
data->api->write_origin(ctx->out,origin,txn->origin_lsn);
414434
}
415-
OutputPluginWrite(ctx, true);
435+
MtmOutputPluginWrite(ctx, true, false);
416436
}
417437
}
418438

@@ -422,9 +442,9 @@ pg_decode_caughtup(LogicalDecodingContext *ctx)
422442
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
423443

424444
if (data->api) {
425-
OutputPluginPrepareWrite(ctx, true);
445+
MtmOutputPluginPrepareWrite(ctx, true, true);
426446
data->api->write_caughtup(ctx->out,data,ctx->reader->EndRecPtr);
427-
OutputPluginWrite(ctx, true);
447+
MtmOutputPluginWrite(ctx, true, true);
428448
}
429449
}
430450

@@ -439,9 +459,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
439459
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
440460

441461
if (data->api) {
442-
OutputPluginPrepareWrite(ctx, true);
462+
MtmOutputPluginPrepareWrite(ctx, true, true);
443463
data->api->write_commit(ctx->out,data,txn,commit_lsn);
444-
OutputPluginWrite(ctx, true);
464+
MtmOutputPluginWrite(ctx, true, true);
445465
}
446466
}
447467

@@ -462,38 +482,38 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
462482
/* TODO: add caching (send only if changed) */
463483
if (data->api->write_rel)
464484
{
465-
OutputPluginPrepareWrite(ctx, false);
485+
MtmOutputPluginPrepareWrite(ctx, false, false);
466486
data->api->write_rel(ctx->out,data,relation);
467-
OutputPluginWrite(ctx, false);
487+
MtmOutputPluginWrite(ctx, false, false);
468488
}
469489

470490
/* Send the data */
471491
switch (change->action)
472492
{
473493
caseREORDER_BUFFER_CHANGE_INSERT:
474-
OutputPluginPrepareWrite(ctx, true);
494+
MtmOutputPluginPrepareWrite(ctx, true, false);
475495
data->api->write_insert(ctx->out,data,relation,
476496
&change->data.tp.newtuple->tuple);
477-
OutputPluginWrite(ctx, true);
497+
MtmOutputPluginWrite(ctx, true, false);
478498
break;
479499
caseREORDER_BUFFER_CHANGE_UPDATE:
480500
{
481501
HeapTupleoldtuple=change->data.tp.oldtuple ?
482502
&change->data.tp.oldtuple->tuple :NULL;
483503

484-
OutputPluginPrepareWrite(ctx, true);
504+
MtmOutputPluginPrepareWrite(ctx, true, false);
485505
data->api->write_update(ctx->out,data,relation,oldtuple,
486506
&change->data.tp.newtuple->tuple);
487-
OutputPluginWrite(ctx, true);
507+
MtmOutputPluginWrite(ctx, true, false);
488508
break;
489509
}
490510
caseREORDER_BUFFER_CHANGE_DELETE:
491511
if (change->data.tp.oldtuple)
492512
{
493-
OutputPluginPrepareWrite(ctx, true);
513+
MtmOutputPluginPrepareWrite(ctx, true, false);
494514
data->api->write_delete(ctx->out,data,relation,
495515
&change->data.tp.oldtuple->tuple);
496-
OutputPluginWrite(ctx, true);
516+
MtmOutputPluginWrite(ctx, true, false);
497517
}
498518
else
499519
elog(DEBUG1,"didn't send DELETE change because of missing oldtuple");
@@ -536,9 +556,9 @@ pg_decode_message(LogicalDecodingContext *ctx,
536556
{
537557
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
538558

539-
OutputPluginPrepareWrite(ctx, true);
559+
MtmOutputPluginPrepareWrite(ctx, true, !transactional);
540560
data->api->write_message(ctx->out,prefix,sz,message);
541-
OutputPluginWrite(ctx, true);
561+
MtmOutputPluginWrite(ctx, true, !transactional);
542562
}
543563

544564
staticvoid
@@ -559,9 +579,9 @@ send_startup_message(LogicalDecodingContext *ctx,
559579
*/
560580

561581
if (data->api) {
562-
OutputPluginPrepareWrite(ctx,last_message);
582+
MtmOutputPluginPrepareWrite(ctx,last_message, true);
563583
data->api->write_startup_message(ctx->out,msg);
564-
OutputPluginWrite(ctx,last_message);
584+
MtmOutputPluginWrite(ctx,last_message, true);
565585
}
566586

567587
pfree(msg);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp