Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit516814f

Browse files
michaelpqpull[bot]
authored andcommitted
Add flush option to pg_logical_emit_message()
Since its introduction, LogLogicalMessage() (via the SQL interfacepg_logical_emit_message()) has never included a call to XLogFlush(),causing it to potentially lose messages on a crash when used innon-transactional mode. This has come up to me as a problem whileplaying with ideas to design a test suite for what has become039_end_of_wal.pl introduced inbae868c by Thomas Munro, becausethere are no direct ways to force a WAL flush via SQL.The default is false, to not flush messages and influence existinguse-cases where this function could be used. If set to true, themessage emitted is flushed before returning back to the caller, makingthe message durable on crash. This new option has no effect when usingpg_logical_emit_message() in transactional mode, as the record's flushis guaranteed by the WAL record generated by the transaction committed.Two queries of test_decoding are tweaked to cover the new code path forthe flush.Bump catalog version.Author: Michael PaquierReviewed-by: Andres Freund, Amit Kapila, Fujii Masao, Tung Nguyen, TomasVondraDiscussion:https://postgr.es/m/ZNsdThSe2qgsfs7R@paquier.xyz
1 parentb77a9aa commit516814f

File tree

9 files changed

+51
-13
lines changed

9 files changed

+51
-13
lines changed

‎contrib/test_decoding/expected/messages.out

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
66
init
77
(1 row)
88

9-
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
9+
-- These two cover the path for the flush variant.
10+
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
1011
?column?
1112
----------
1213
msg1
1314
(1 row)
1415

15-
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
16+
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
1617
?column?
1718
----------
1819
msg2

‎contrib/test_decoding/sql/messages.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ SET synchronous_commit = on;
33

44
SELECT'init'FROM pg_create_logical_replication_slot('regression_slot','test_decoding');
55

6-
SELECT'msg1'FROM pg_logical_emit_message(true,'test','msg1');
7-
SELECT'msg2'FROM pg_logical_emit_message(false,'test','msg2');
6+
-- These two cover the path for the flush variant.
7+
SELECT'msg1'FROM pg_logical_emit_message(true,'test','msg1', true);
8+
SELECT'msg2'FROM pg_logical_emit_message(false,'test','msg2', true);
89

910
BEGIN;
1011
SELECT'msg3'FROM pg_logical_emit_message(true,'test','msg3');

‎doc/src/sgml/func.sgml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27740,11 +27740,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2774027740
<indexterm>
2774127741
<primary>pg_logical_emit_message</primary>
2774227742
</indexterm>
27743-
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> )
27743+
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type>[, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>])
2774427744
<returnvalue>pg_lsn</returnvalue>
2774527745
</para>
2774627746
<para role="func_signature">
27747-
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> )
27747+
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type>[, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>])
2774827748
<returnvalue>pg_lsn</returnvalue>
2774927749
</para>
2775027750
<para>
@@ -27758,6 +27758,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2775827758
recognize messages that are interesting for them.
2775927759
The <parameter>content</parameter> parameter is the content of the
2776027760
message, given either in text or binary form.
27761+
The <parameter>flush</parameter> parameter (default set to
27762+
<literal>false</literal>) controls if the message is immediately
27763+
flushed to WAL or not. <parameter>flush</parameter> has no effect
27764+
with <parameter>transactional</parameter>, as the message's WAL
27765+
record is flushed along with its transaction.
2776127766
</para></entry>
2776227767
</row>
2776327768
</tbody>

‎src/backend/catalog/system_functions.sql

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,26 @@ LANGUAGE INTERNAL
446446
VOLATILE ROWS1000 COST1000
447447
AS'pg_logical_slot_peek_binary_changes';
448448

449+
CREATE OR REPLACEFUNCTIONpg_logical_emit_message(
450+
transactionalboolean,
451+
prefixtext,
452+
messagetext,
453+
flushboolean DEFAULT false)
454+
RETURNS pg_lsn
455+
LANGUAGE INTERNAL
456+
STRICT VOLATILE
457+
AS'pg_logical_emit_message_text';
458+
459+
CREATE OR REPLACEFUNCTIONpg_logical_emit_message(
460+
transactionalboolean,
461+
prefixtext,
462+
messagebytea,
463+
flushboolean DEFAULT false)
464+
RETURNS pg_lsn
465+
LANGUAGE INTERNAL
466+
STRICT VOLATILE
467+
AS'pg_logical_emit_message_bytea';
468+
449469
CREATE OR REPLACEFUNCTIONpg_create_physical_replication_slot(
450470
IN slot_name name,IN immediately_reserveboolean DEFAULT false,
451471
IN temporaryboolean DEFAULT false,

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
362362
booltransactional=PG_GETARG_BOOL(0);
363363
char*prefix=text_to_cstring(PG_GETARG_TEXT_PP(1));
364364
bytea*data=PG_GETARG_BYTEA_PP(2);
365+
boolflush=PG_GETARG_BOOL(3);
365366
XLogRecPtrlsn;
366367

367368
lsn=LogLogicalMessage(prefix,VARDATA_ANY(data),VARSIZE_ANY_EXHDR(data),
368-
transactional);
369+
transactional,flush);
369370
PG_RETURN_LSN(lsn);
370371
}
371372

‎src/backend/replication/logical/message.c

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@
4444
*/
4545
XLogRecPtr
4646
LogLogicalMessage(constchar*prefix,constchar*message,size_tsize,
47-
booltransactional)
47+
booltransactional,boolflush)
4848
{
4949
xl_logical_messagexlrec;
50+
XLogRecPtrlsn;
5051

5152
/*
5253
* Force xid to be allocated if we're emitting a transactional message.
@@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
7172
/* allow origin filtering */
7273
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
7374

74-
returnXLogInsert(RM_LOGICALMSG_ID,XLOG_LOGICAL_MESSAGE);
75+
lsn=XLogInsert(RM_LOGICALMSG_ID,XLOG_LOGICAL_MESSAGE);
76+
77+
/*
78+
* Make sure that the message hits disk before leaving if emitting a
79+
* non-transactional message when flush is requested.
80+
*/
81+
if (!transactional&&flush)
82+
XLogFlush(lsn);
83+
returnlsn;
7584
}
7685

7786
/*

‎src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/*yyyymmddN */
60-
#defineCATALOG_VERSION_NO202310161
60+
#defineCATALOG_VERSION_NO202310181
6161

6262
#endif

‎src/include/catalog/pg_proc.dat

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11167,11 +11167,11 @@
1116711167
prosrc => 'pg_replication_slot_advance' },
1116811168
{ oid => '3577', descr => 'emit a textual logical decoding message',
1116911169
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
11170-
prorettype => 'pg_lsn', proargtypes => 'bool text text',
11170+
prorettype => 'pg_lsn', proargtypes => 'bool text text bool',
1117111171
prosrc => 'pg_logical_emit_message_text' },
1117211172
{ oid => '3578', descr => 'emit a binary logical decoding message',
1117311173
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
11174-
prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
11174+
prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
1117511175
prosrc => 'pg_logical_emit_message_bytea' },
1117611176

1117711177
# event triggers

‎src/include/replication/message.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ typedef struct xl_logical_message
3030
#defineSizeOfLogicalMessage(offsetof(xl_logical_message, message))
3131

3232
externXLogRecPtrLogLogicalMessage(constchar*prefix,constchar*message,
33-
size_tsize,booltransactional);
33+
size_tsize,booltransactional,
34+
boolflush);
3435

3536
/* RMGR API */
3637
#defineXLOG_LOGICAL_MESSAGE0x00

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp