Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc22de58

Browse files
knizhnikkelvich
authored andcommitted
Fix MtmAdjustOldestXid
1 parent990ccfc commitc22de58

File tree

3 files changed

+52
-30
lines changed

3 files changed

+52
-30
lines changed

‎multimaster.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -517,9 +517,16 @@ MtmAdjustOldestXid(TransactionId xid)
517517
MTM_LOG2("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d",MyProcPid,xid,ts!=NULL ?ts->snapshot :0,ts!=NULL ?ts->csn :0,ts!=NULL ?ts->status :-1);
518518
Mtm->gcCount=0;
519519

520+
//return FirstNormalTransactionId;
521+
520522
if (ts!=NULL) {
521523
oldestSnapshot=ts->snapshot;
522-
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
524+
Assert(oldestSnapshot!=INVALID_CSN);
525+
if (Mtm->nodes[MtmNodeId-1].oldestSnapshot<oldestSnapshot) {
526+
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
527+
}else {
528+
oldestSnapshot=Mtm->nodes[MtmNodeId-1].oldestSnapshot;
529+
}
523530
for (i=0;i<Mtm->nAllNodes;i++) {
524531
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
525532
&&Mtm->nodes[i].oldestSnapshot<oldestSnapshot)
@@ -532,9 +539,11 @@ MtmAdjustOldestXid(TransactionId xid)
532539
for (ts=Mtm->transListHead;
533540
ts!=NULL
534541
&&ts->csn<oldestSnapshot
535-
&&TransactionIdPrecedes(ts->xid,xid)
542+
&&TransactionIdPrecedes(ts->xid,xid);
543+
/*
536544
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
537545
ts->status == TRANSACTION_STATUS_ABORTED);
546+
*/
538547
prev=ts,ts=ts->next)
539548
{
540549
if (prev!=NULL) {
@@ -547,9 +556,10 @@ MtmAdjustOldestXid(TransactionId xid)
547556
if (MtmUseDtm)
548557
{
549558
if (prev!=NULL) {
559+
MTM_LOG1("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%d, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
560+
MyProcPid,xid,prev->xid,prev->status,prev->snapshot, (ts ?ts->xid :0), (ts ?ts->status :-1), (ts ?ts->snapshot :-1),oldestSnapshot);
550561
Mtm->transListHead=prev;
551562
Mtm->oldestXid=xid=prev->xid;
552-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, oldestSnapshot=%ld",MyProcPid,xid,oldestSnapshot);
553563
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
554564
xid=Mtm->oldestXid;
555565
}

‎pglogical_output.c

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -384,30 +384,31 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
384384
send_replication_origin &=txn->origin_id!=InvalidRepOriginId;
385385

386386
OutputPluginPrepareWrite(ctx, !send_replication_origin);
387-
data->api->write_begin(ctx->out,data,txn);
387+
if (data->api) {
388+
data->api->write_begin(ctx->out,data,txn);
388389

389-
if (send_replication_origin)
390-
{
391-
char*origin;
392-
393-
/* Message boundary */
394-
OutputPluginWrite(ctx, false);
395-
OutputPluginPrepareWrite(ctx, true);
396-
397-
/*
398-
* XXX: which behaviour we want here?
399-
*
400-
* Alternatives:
401-
* - don't send origin message if origin name not found
402-
* (that's what we do now)
403-
* - throw error - that will break replication, not good
404-
* - send some special "unknown" origin
405-
*/
406-
if (data->api->write_origin&&
407-
replorigin_by_oid(txn->origin_id, true,&origin))
390+
if (send_replication_origin)
391+
{
392+
char*origin;
393+
394+
/* Message boundary */
395+
OutputPluginWrite(ctx, false);
396+
OutputPluginPrepareWrite(ctx, true);
397+
398+
/*
399+
* XXX: which behaviour we want here?
400+
*
401+
* Alternatives:
402+
* - don't send origin message if origin name not found
403+
* (that's what we do now)
404+
* - throw error - that will break replication, not good
405+
* - send some special "unknown" origin
406+
*/
407+
if (data->api->write_origin&&
408+
replorigin_by_oid(txn->origin_id, true,&origin))
408409
data->api->write_origin(ctx->out,origin,txn->origin_lsn);
410+
}
409411
}
410-
411412
OutputPluginWrite(ctx, true);
412413
}
413414

@@ -421,7 +422,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
421422
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
422423

423424
OutputPluginPrepareWrite(ctx, true);
424-
data->api->write_commit(ctx->out,data,txn,commit_lsn);
425+
if (data->api) {
426+
data->api->write_commit(ctx->out,data,txn,commit_lsn);
427+
}
425428
OutputPluginWrite(ctx, true);
426429
}
427430

@@ -433,7 +436,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
433436
MemoryContextold;
434437

435438
/* First check the table filter */
436-
if (!call_row_filter_hook(data,txn,relation,change))
439+
if (!call_row_filter_hook(data,txn,relation,change)||data->api==NULL)
437440
return;
438441

439442
/* Avoid leaking memory by using and resetting our own context */
@@ -539,7 +542,9 @@ send_startup_message(LogicalDecodingContext *ctx,
539542
*/
540543

541544
OutputPluginPrepareWrite(ctx,last_message);
542-
data->api->write_startup_message(ctx->out,msg);
545+
if (data->api) {
546+
data->api->write_startup_message(ctx->out,msg);
547+
}
543548
OutputPluginWrite(ctx,last_message);
544549

545550
pfree(msg);

‎tests2/lib/bank_client.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,13 @@ def check_total(self):
153153
deftx(conn,cur):
154154
cur.execute('select sum(amount) from bank_test')
155155
res=cur.fetchone()
156+
total=res[0]
157+
iftotal!=0:
158+
cur.execute('select mtm.get_snapshot()')
159+
res=cur.fetchone()
160+
print("Isolation error, total = %d, node = %d, snapshot = %d"% (total,self.node_id,res[0]))
161+
#raise BaseException
156162
conn.commit()
157-
ifres[0]!=0:
158-
print("Isolation error, total = %d, node = %d"% (res[0],self.node_id))
159-
raiseBaseException
160163

161164
self.exec_tx('total',tx)
162165

@@ -175,10 +178,14 @@ def tx(conn, cur):
175178
set amount = amount - %s
176179
where uid = %s''',
177180
(amount,from_uid))
181+
if (cur.rowcount!=1):
182+
raiseBaseException
178183
cur.execute('''update bank_test
179184
set amount = amount + %s
180185
where uid = %s''',
181186
(amount,to_uid))
187+
if (cur.rowcount!=1):
188+
raiseBaseException
182189
conn.commit()
183190

184191
self.exec_tx('transfer',tx)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp