Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit45fdc97

Browse files
author
Amit Kapila
committed
Extend the logical decoding output plugin API with stream methods.
This adds seven methods to the output plugin API, adding support forstreaming changes of large in-progress transactions.* stream_start* stream_stop* stream_abort* stream_commit* stream_change* stream_message* stream_truncateMost of this is a simple extension of the existing methods, withthe semantic difference that the transaction (or subtransaction)is incomplete and may be aborted later (which is something theregular API does not really need to deal with).This also extends the 'test_decoding' plugin, implementing thesenew stream methods.The stream_start/start_stop are used to demarcate a chunk of changesstreamed for a particular toplevel transaction.This commit simply adds these new APIs and the upcoming patch to "allowthe streaming mode in ReorderBuffer" will use these APIs.Author: Tomas Vondra, Dilip Kumar, Amit KapilaReviewed-by: Amit KapilaTested-by: Neha Sharma and Mahendra Singh ThalorDiscussion:https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent1383874 commit45fdc97

File tree

6 files changed

+878
-0
lines changed

6 files changed

+878
-0
lines changed

‎contrib/test_decoding/test_decoding.c

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
6262
ReorderBufferTXN*txn,XLogRecPtrmessage_lsn,
6363
booltransactional,constchar*prefix,
6464
Sizesz,constchar*message);
65+
staticvoidpg_decode_stream_start(LogicalDecodingContext*ctx,
66+
ReorderBufferTXN*txn);
67+
staticvoidpg_decode_stream_stop(LogicalDecodingContext*ctx,
68+
ReorderBufferTXN*txn);
69+
staticvoidpg_decode_stream_abort(LogicalDecodingContext*ctx,
70+
ReorderBufferTXN*txn,
71+
XLogRecPtrabort_lsn);
72+
staticvoidpg_decode_stream_commit(LogicalDecodingContext*ctx,
73+
ReorderBufferTXN*txn,
74+
XLogRecPtrcommit_lsn);
75+
staticvoidpg_decode_stream_change(LogicalDecodingContext*ctx,
76+
ReorderBufferTXN*txn,
77+
Relationrelation,
78+
ReorderBufferChange*change);
79+
staticvoidpg_decode_stream_message(LogicalDecodingContext*ctx,
80+
ReorderBufferTXN*txn,XLogRecPtrmessage_lsn,
81+
booltransactional,constchar*prefix,
82+
Sizesz,constchar*message);
83+
staticvoidpg_decode_stream_truncate(LogicalDecodingContext*ctx,
84+
ReorderBufferTXN*txn,
85+
intnrelations,Relationrelations[],
86+
ReorderBufferChange*change);
6587

6688
void
6789
_PG_init(void)
@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
83105
cb->filter_by_origin_cb=pg_decode_filter;
84106
cb->shutdown_cb=pg_decode_shutdown;
85107
cb->message_cb=pg_decode_message;
108+
cb->stream_start_cb=pg_decode_stream_start;
109+
cb->stream_stop_cb=pg_decode_stream_stop;
110+
cb->stream_abort_cb=pg_decode_stream_abort;
111+
cb->stream_commit_cb=pg_decode_stream_commit;
112+
cb->stream_change_cb=pg_decode_stream_change;
113+
cb->stream_message_cb=pg_decode_stream_message;
114+
cb->stream_truncate_cb=pg_decode_stream_truncate;
86115
}
87116

88117

@@ -540,3 +569,150 @@ pg_decode_message(LogicalDecodingContext *ctx,
540569
appendBinaryStringInfo(ctx->out,message,sz);
541570
OutputPluginWrite(ctx, true);
542571
}
572+
573+
/*
574+
* We never try to stream any empty xact so we don't need any special handling
575+
* for skip_empty_xacts in streaming mode APIs.
576+
*/
577+
staticvoid
578+
pg_decode_stream_start(LogicalDecodingContext*ctx,
579+
ReorderBufferTXN*txn)
580+
{
581+
TestDecodingData*data=ctx->output_plugin_private;
582+
583+
OutputPluginPrepareWrite(ctx, true);
584+
if (data->include_xids)
585+
appendStringInfo(ctx->out,"opening a streamed block for transaction TXN %u",txn->xid);
586+
else
587+
appendStringInfo(ctx->out,"opening a streamed block for transaction");
588+
OutputPluginWrite(ctx, true);
589+
}
590+
591+
/*
592+
* We never try to stream any empty xact so we don't need any special handling
593+
* for skip_empty_xacts in streaming mode APIs.
594+
*/
595+
staticvoid
596+
pg_decode_stream_stop(LogicalDecodingContext*ctx,
597+
ReorderBufferTXN*txn)
598+
{
599+
TestDecodingData*data=ctx->output_plugin_private;
600+
601+
OutputPluginPrepareWrite(ctx, true);
602+
if (data->include_xids)
603+
appendStringInfo(ctx->out,"closing a streamed block for transaction TXN %u",txn->xid);
604+
else
605+
appendStringInfo(ctx->out,"closing a streamed block for transaction");
606+
OutputPluginWrite(ctx, true);
607+
}
608+
609+
/*
610+
* We never try to stream any empty xact so we don't need any special handling
611+
* for skip_empty_xacts in streaming mode APIs.
612+
*/
613+
staticvoid
614+
pg_decode_stream_abort(LogicalDecodingContext*ctx,
615+
ReorderBufferTXN*txn,
616+
XLogRecPtrabort_lsn)
617+
{
618+
TestDecodingData*data=ctx->output_plugin_private;
619+
620+
OutputPluginPrepareWrite(ctx, true);
621+
if (data->include_xids)
622+
appendStringInfo(ctx->out,"aborting streamed (sub)transaction TXN %u",txn->xid);
623+
else
624+
appendStringInfo(ctx->out,"aborting streamed (sub)transaction");
625+
OutputPluginWrite(ctx, true);
626+
}
627+
628+
/*
629+
* We never try to stream any empty xact so we don't need any special handling
630+
* for skip_empty_xacts in streaming mode APIs.
631+
*/
632+
staticvoid
633+
pg_decode_stream_commit(LogicalDecodingContext*ctx,
634+
ReorderBufferTXN*txn,
635+
XLogRecPtrcommit_lsn)
636+
{
637+
TestDecodingData*data=ctx->output_plugin_private;
638+
639+
OutputPluginPrepareWrite(ctx, true);
640+
641+
if (data->include_xids)
642+
appendStringInfo(ctx->out,"committing streamed transaction TXN %u",txn->xid);
643+
else
644+
appendStringInfo(ctx->out,"committing streamed transaction");
645+
646+
if (data->include_timestamp)
647+
appendStringInfo(ctx->out," (at %s)",
648+
timestamptz_to_str(txn->commit_time));
649+
650+
OutputPluginWrite(ctx, true);
651+
}
652+
653+
/*
654+
* In streaming mode, we don't display the changes as the transaction can abort
655+
* at a later point in time. We don't want users to see the changes until the
656+
* transaction is committed.
657+
*/
658+
staticvoid
659+
pg_decode_stream_change(LogicalDecodingContext*ctx,
660+
ReorderBufferTXN*txn,
661+
Relationrelation,
662+
ReorderBufferChange*change)
663+
{
664+
TestDecodingData*data=ctx->output_plugin_private;
665+
666+
OutputPluginPrepareWrite(ctx, true);
667+
if (data->include_xids)
668+
appendStringInfo(ctx->out,"streaming change for TXN %u",txn->xid);
669+
else
670+
appendStringInfo(ctx->out,"streaming change for transaction");
671+
OutputPluginWrite(ctx, true);
672+
}
673+
674+
/*
675+
* In streaming mode, we don't display the contents for transactional messages
676+
* as the transaction can abort at a later point in time. We don't want users to
677+
* see the message contents until the transaction is committed.
678+
*/
679+
staticvoid
680+
pg_decode_stream_message(LogicalDecodingContext*ctx,
681+
ReorderBufferTXN*txn,XLogRecPtrlsn,booltransactional,
682+
constchar*prefix,Sizesz,constchar*message)
683+
{
684+
OutputPluginPrepareWrite(ctx, true);
685+
686+
if (transactional)
687+
{
688+
appendStringInfo(ctx->out,"streaming message: transactional: %d prefix: %s, sz: %zu",
689+
transactional,prefix,sz);
690+
}
691+
else
692+
{
693+
appendStringInfo(ctx->out,"streaming message: transactional: %d prefix: %s, sz: %zu content:",
694+
transactional,prefix,sz);
695+
appendBinaryStringInfo(ctx->out,message,sz);
696+
}
697+
698+
OutputPluginWrite(ctx, true);
699+
}
700+
701+
/*
702+
* In streaming mode, we don't display the detailed information of Truncate.
703+
* See pg_decode_stream_change.
704+
*/
705+
staticvoid
706+
pg_decode_stream_truncate(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
707+
intnrelations,Relationrelations[],
708+
ReorderBufferChange*change)
709+
{
710+
TestDecodingData*data=ctx->output_plugin_private;
711+
712+
OutputPluginPrepareWrite(ctx, true);
713+
if (data->include_xids)
714+
appendStringInfo(ctx->out,"streaming truncate for TXN %u",txn->xid);
715+
else
716+
appendStringInfo(ctx->out,"streaming truncate for transaction");
717+
OutputPluginWrite(ctx, true);
718+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp