Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf04783a

Browse files
committed
kinda lock in filter
1 parent7d2f872 commitf04783a

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

‎contrib/test_decoding/sql/prepared.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ DROP TABLE test_prepared1;
4545
DROPTABLE test_prepared2;
4646

4747
-- show results
48-
SELECT dataFROM pg_logical_slot_get_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
48+
SELECT dataFROM pg_logical_slot_peek_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1');
49+
50+
-- same but with twophase decoding
51+
SELECT dataFROM pg_logical_slot_peek_changes('regression_slot',NULL,NULL,'include-xids','0','skip-empty-xacts','1','twophase-decoding','1') ;
4952

5053
SELECT pg_drop_replication_slot('regression_slot');

‎contrib/test_decoding/test_decoding.c

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include"replication/message.h"
2525
#include"replication/origin.h"
2626

27+
#include"storage/procarray.h"
28+
2729
#include"utils/builtins.h"
2830
#include"utils/lsyscache.h"
2931
#include"utils/memutils.h"
@@ -283,15 +285,35 @@ pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
283285
{
284286
TestDecodingData*data=ctx->output_plugin_private;
285287

286-
// has_catalog_changes?
287-
// LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
288+
if (!data->twophase_decoding)
289+
return true;
288290

289-
// OutputPluginPrepareWrite(ctx, true);
291+
if (txn->has_catalog_changes)
292+
{
293+
LWLockAcquire(TwoPhaseStateLock,LW_SHARED);
294+
295+
if (TransactionIdIsInProgress(txn->xid))
296+
{
297+
/*
298+
* XXX
299+
*/
300+
LWLockRelease(TwoPhaseStateLock);
301+
return true;
302+
}
303+
elseif (TransactionIdDidAbort(txn->xid))
304+
{
305+
/*
306+
* Here we know that it is already aborted and should humble
307+
* ourselves.
308+
*/
309+
LWLockRelease(TwoPhaseStateLock);
310+
return true;
311+
}
290312

291-
// appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
313+
LWLockRelease(TwoPhaseStateLock);
314+
}
292315

293-
// OutputPluginWrite(ctx, true);
294-
return true;
316+
return false;
295317
}
296318

297319

@@ -307,7 +329,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
307329

308330
OutputPluginPrepareWrite(ctx, true);
309331

310-
appendStringInfo(ctx->out,"PREPARE! '%s'",txn->gid);
332+
appendStringInfo(ctx->out,"PREPARE '%s'",txn->gid);
311333

312334
if (data->include_xids)
313335
appendStringInfo(ctx->out," %u",txn->xid);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp