Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitac4645c

Browse files
author
Amit Kapila
committed
Allow pgoutput to send logical decoding messages.
The output plugin accepts a new parameter (messages) that controls iflogical decoding messages are written into the replication stream. It isuseful for those clients that use pgoutput as an output plugin and needsto process messages that were written by pg_logical_emit_message().Although logical streaming replication protocol supports logicaldecoding messages now, logical replication does not use this feature yet.Author: David Pirotte, Euler TaveiraReviewed-by: Euler Taveira, Andres Freund, Ashutosh Bapat, Amit KapilaDiscussion:https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com
1 parent531737d commitac4645c

File tree

7 files changed

+312
-0
lines changed

7 files changed

+312
-0
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6433,6 +6433,82 @@ Begin
64336433
</listitem>
64346434
</varlistentry>
64356435

6436+
<varlistentry>
6437+
<term>
6438+
Message
6439+
</term>
6440+
<listitem>
6441+
<para>
6442+
6443+
<variablelist>
6444+
<varlistentry>
6445+
<term>
6446+
Byte1('M')
6447+
</term>
6448+
<listitem>
6449+
<para>
6450+
Identifies the message as a logical decoding message.
6451+
</para>
6452+
</listitem>
6453+
</varlistentry>
6454+
<varlistentry>
6455+
<term>
6456+
Int32
6457+
</term>
6458+
<listitem>
6459+
<para>
6460+
Xid of the transaction. The XID is sent only when user has
6461+
requested streaming of in-progress transactions.
6462+
</para>
6463+
</listitem>
6464+
</varlistentry>
6465+
<varlistentry>
6466+
<term>
6467+
Int8
6468+
</term>
6469+
<listitem>
6470+
<para>
6471+
Flags; Either 0 for no flags or 1 if the logical decoding
6472+
message is transactional.
6473+
</para>
6474+
</listitem>
6475+
</varlistentry>
6476+
<varlistentry>
6477+
<term>
6478+
Int64
6479+
</term>
6480+
<listitem>
6481+
<para>
6482+
The LSN of the logical decoding message.
6483+
</para>
6484+
</listitem>
6485+
</varlistentry>
6486+
<varlistentry>
6487+
<term>
6488+
String
6489+
</term>
6490+
<listitem>
6491+
<para>
6492+
The prefix of the logical decoding message.
6493+
</para>
6494+
</listitem>
6495+
</varlistentry>
6496+
<varlistentry>
6497+
<term>
6498+
Byte<replaceable>n</replaceable>
6499+
</term>
6500+
<listitem>
6501+
<para>
6502+
The content of the logical decoding message.
6503+
</para>
6504+
</listitem>
6505+
</varlistentry>
6506+
6507+
</variablelist>
6508+
</para>
6509+
</listitem>
6510+
</varlistentry>
6511+
64366512
<varlistentry>
64376513
<term>
64386514
Commit

‎src/backend/replication/logical/proto.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
*/
2626
#defineLOGICALREP_IS_REPLICA_IDENTITY 1
2727

28+
#defineMESSAGE_TRANSACTIONAL (1<<0)
2829
#defineTRUNCATE_CASCADE(1<<0)
2930
#defineTRUNCATE_RESTART_SEQS(1<<1)
3031

@@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in,
361362
returnrelids;
362363
}
363364

365+
/*
366+
* Write MESSAGE to stream
367+
*/
368+
void
369+
logicalrep_write_message(StringInfoout,TransactionIdxid,XLogRecPtrlsn,
370+
booltransactional,constchar*prefix,Sizesz,
371+
constchar*message)
372+
{
373+
uint8flags=0;
374+
375+
pq_sendbyte(out,LOGICAL_REP_MSG_MESSAGE);
376+
377+
/* encode and send message flags */
378+
if (transactional)
379+
flags |=MESSAGE_TRANSACTIONAL;
380+
381+
/* transaction ID (if not valid, we're not streaming) */
382+
if (TransactionIdIsValid(xid))
383+
pq_sendint32(out,xid);
384+
385+
pq_sendint8(out,flags);
386+
pq_sendint64(out,lsn);
387+
pq_sendstring(out,prefix);
388+
pq_sendint32(out,sz);
389+
pq_sendbytes(out,message,sz);
390+
}
391+
364392
/*
365393
* Write relation description to the output stream.
366394
*/

‎src/backend/replication/logical/worker.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
19391939
apply_handle_origin(s);
19401940
return;
19411941

1942+
caseLOGICAL_REP_MSG_MESSAGE:
1943+
1944+
/*
1945+
* Logical replication does not use generic logical messages yet.
1946+
* Although, it could be used by other applications that use this
1947+
* output plugin.
1948+
*/
1949+
return;
1950+
19421951
caseLOGICAL_REP_MSG_STREAM_START:
19431952
apply_handle_stream_start(s);
19441953
return;

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
4545
staticvoidpgoutput_truncate(LogicalDecodingContext*ctx,
4646
ReorderBufferTXN*txn,intnrelations,Relationrelations[],
4747
ReorderBufferChange*change);
48+
staticvoidpgoutput_message(LogicalDecodingContext*ctx,
49+
ReorderBufferTXN*txn,XLogRecPtrmessage_lsn,
50+
booltransactional,constchar*prefix,
51+
Sizesz,constchar*message);
4852
staticboolpgoutput_origin_filter(LogicalDecodingContext*ctx,
4953
RepOriginIdorigin_id);
5054
staticvoidpgoutput_stream_start(structLogicalDecodingContext*ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
142146
cb->begin_cb=pgoutput_begin_txn;
143147
cb->change_cb=pgoutput_change;
144148
cb->truncate_cb=pgoutput_truncate;
149+
cb->message_cb=pgoutput_message;
145150
cb->commit_cb=pgoutput_commit_txn;
146151
cb->filter_by_origin_cb=pgoutput_origin_filter;
147152
cb->shutdown_cb=pgoutput_shutdown;
@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
152157
cb->stream_abort_cb=pgoutput_stream_abort;
153158
cb->stream_commit_cb=pgoutput_stream_commit;
154159
cb->stream_change_cb=pgoutput_change;
160+
cb->stream_message_cb=pgoutput_message;
155161
cb->stream_truncate_cb=pgoutput_truncate;
156162
}
157163

@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
162168
boolprotocol_version_given= false;
163169
boolpublication_names_given= false;
164170
boolbinary_option_given= false;
171+
boolmessages_option_given= false;
165172
boolstreaming_given= false;
166173

167174
data->binary= false;
168175
data->streaming= false;
176+
data->messages= false;
169177

170178
foreach(lc,options)
171179
{
@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
221229

222230
data->binary=defGetBoolean(defel);
223231
}
232+
elseif (strcmp(defel->defname,"messages")==0)
233+
{
234+
if (messages_option_given)
235+
ereport(ERROR,
236+
(errcode(ERRCODE_SYNTAX_ERROR),
237+
errmsg("conflicting or redundant options")));
238+
messages_option_given= true;
239+
240+
data->messages=defGetBoolean(defel);
241+
}
224242
elseif (strcmp(defel->defname,"streaming")==0)
225243
{
226244
if (streaming_given)
@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
689707
MemoryContextReset(data->context);
690708
}
691709

710+
staticvoid
711+
pgoutput_message(LogicalDecodingContext*ctx,ReorderBufferTXN*txn,
712+
XLogRecPtrmessage_lsn,booltransactional,constchar*prefix,Sizesz,
713+
constchar*message)
714+
{
715+
PGOutputData*data= (PGOutputData*)ctx->output_plugin_private;
716+
TransactionIdxid=InvalidTransactionId;
717+
718+
if (!data->messages)
719+
return;
720+
721+
/*
722+
* Remember the xid for the message in streaming mode. See
723+
* pgoutput_change.
724+
*/
725+
if (in_streaming)
726+
xid=txn->xid;
727+
728+
OutputPluginPrepareWrite(ctx, true);
729+
logicalrep_write_message(ctx->out,
730+
xid,
731+
message_lsn,
732+
transactional,
733+
prefix,
734+
sz,
735+
message);
736+
OutputPluginWrite(ctx, true);
737+
}
738+
692739
/*
693740
* Currently we always forward.
694741
*/

‎src/include/replication/logicalproto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
5454
LOGICAL_REP_MSG_TRUNCATE='T',
5555
LOGICAL_REP_MSG_RELATION='R',
5656
LOGICAL_REP_MSG_TYPE='Y',
57+
LOGICAL_REP_MSG_MESSAGE='M',
5758
LOGICAL_REP_MSG_STREAM_START='S',
5859
LOGICAL_REP_MSG_STREAM_END='E',
5960
LOGICAL_REP_MSG_STREAM_COMMIT='c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
151152
boolcascade,boolrestart_seqs);
152153
externList*logicalrep_read_truncate(StringInfoin,
153154
bool*cascade,bool*restart_seqs);
155+
externvoidlogicalrep_write_message(StringInfoout,TransactionIdxid,XLogRecPtrlsn,
156+
booltransactional,constchar*prefix,Sizesz,constchar*message);
154157
externvoidlogicalrep_write_rel(StringInfoout,TransactionIdxid,
155158
Relationrel);
156159
externLogicalRepRelation*logicalrep_read_rel(StringInfoin);

‎src/include/replication/pgoutput.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ typedef struct PGOutputData
2626
List*publications;
2727
boolbinary;
2828
boolstreaming;
29+
boolmessages;
2930
}PGOutputData;
3031

3132
#endif/* PGOUTPUT_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp