Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8e90ec5

Browse files
author
Amit Kapila
committed
Track statistics for streaming of changes from ReorderBuffer.
This adds the statistics about transactions streamed to the decodingoutput plugin from ReorderBuffer. Users can query thepg_stat_replication_slots view to check these stats and callpg_stat_reset_replication_slot to reset the stats of a particular slot.Users can pass NULL in pg_stat_reset_replication_slot to reset stats ofall the slots.Commit9868167 has added the basic infrastructure to capture the statsof slot and this commit extends the statistics collector to trackadditional information about slots.Bump the catversion as we have added new columns in the catalog entry.Author: Ajin Cherian and Amit KapilaReviewed-by: Sawada Masahiko and Dilip KumarDiscussion:https://postgr.es/m/CAA4eK1+chpEomLzgSoky-D31qev19AmECNiEAietPQUGEFhtVA@mail.gmail.com
1 parent94bc27b commit8e90ec5

File tree

12 files changed

+111
-17
lines changed

12 files changed

+111
-17
lines changed

‎doc/src/sgml/monitoring.sgml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2632,6 +2632,44 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
26322632
</para></entry>
26332633
</row>
26342634

2635+
<row>
2636+
<entry role="catalog_table_entry"><para role="column_definition">
2637+
<structfield>stream_txns</structfield> <type>bigint</type>
2638+
</para>
2639+
<para>
2640+
Number of in-progress transactions streamed to the decoding output plugin
2641+
after the memory used by logical decoding of changes from WAL for this
2642+
slot exceeds <literal>logical_decoding_work_mem</literal>. Streaming only
2643+
works with toplevel transactions (subtransactions can't be streamed
2644+
independently), so the counter does not get incremented for subtransactions.
2645+
</para></entry>
2646+
</row>
2647+
2648+
<row>
2649+
<entry role="catalog_table_entry"><para role="column_definition">
2650+
<structfield>stream_count</structfield><type>bigint</type>
2651+
</para>
2652+
<para>
2653+
Number of times in-progress transactions were streamed to the decoding
2654+
output plugin while decoding changes from WAL for this slot. Transactions
2655+
may get streamed repeatedly, and this counter gets incremented on every
2656+
such invocation.
2657+
</para></entry>
2658+
</row>
2659+
2660+
<row>
2661+
<entry role="catalog_table_entry"><para role="column_definition">
2662+
<structfield>stream_bytes</structfield><type>bigint</type>
2663+
</para>
2664+
<para>
2665+
Amount of decoded in-progress transaction data streamed to the decoding
2666+
output plugin while decoding changes from WAL for this slot. This and other
2667+
streaming counters for this slot can be used to gauge the network I/O which
2668+
occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>.
2669+
</para>
2670+
</entry>
2671+
</row>
2672+
26352673
<row>
26362674
<entry role="catalog_table_entry"><para role="column_definition">
26372675
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>

‎src/backend/catalog/system_views.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
802802
s.spill_txns,
803803
s.spill_count,
804804
s.spill_bytes,
805+
s.stream_txns,
806+
s.stream_count,
807+
s.stream_bytes,
805808
s.stats_reset
806809
FROM pg_stat_get_replication_slots()AS s;
807810

‎src/backend/postmaster/pgstat.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
17081708
*/
17091709
void
17101710
pgstat_report_replslot(constchar*slotname,intspilltxns,intspillcount,
1711-
intspillbytes)
1711+
intspillbytes,intstreamtxns,intstreamcount,intstreambytes)
17121712
{
17131713
PgStat_MsgReplSlotmsg;
17141714

@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
17211721
msg.m_spill_txns=spilltxns;
17221722
msg.m_spill_count=spillcount;
17231723
msg.m_spill_bytes=spillbytes;
1724+
msg.m_stream_txns=streamtxns;
1725+
msg.m_stream_count=streamcount;
1726+
msg.m_stream_bytes=streambytes;
17241727
pgstat_send(&msg,sizeof(PgStat_MsgReplSlot));
17251728
}
17261729

@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
68926895
replSlotStats[idx].spill_txns+=msg->m_spill_txns;
68936896
replSlotStats[idx].spill_count+=msg->m_spill_count;
68946897
replSlotStats[idx].spill_bytes+=msg->m_spill_bytes;
6898+
replSlotStats[idx].stream_txns+=msg->m_stream_txns;
6899+
replSlotStats[idx].stream_count+=msg->m_stream_count;
6900+
replSlotStats[idx].stream_bytes+=msg->m_stream_bytes;
68956901
}
68966902
}
68976903

@@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts)
71257131
replSlotStats[i].spill_txns=0;
71267132
replSlotStats[i].spill_count=0;
71277133
replSlotStats[i].spill_bytes=0;
7134+
replSlotStats[i].stream_txns=0;
7135+
replSlotStats[i].stream_count=0;
7136+
replSlotStats[i].stream_bytes=0;
71287137
replSlotStats[i].stat_reset_timestamp=ts;
71297138
}
71307139

‎src/backend/replication/logical/logical.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
14711471
ReorderBuffer*rb=ctx->reorder;
14721472

14731473
/*
1474-
* Nothing to do if we haven't spilled anything since the last time the
1475-
* stats has been sent.
1474+
* Nothing to do if we haven't spilledor streamedanything since the last
1475+
*time thestats has been sent.
14761476
*/
1477-
if (rb->spillBytes <=0)
1477+
if (rb->spillBytes <=0&&rb->streamBytes <=0)
14781478
return;
14791479

1480-
elog(DEBUG2,"UpdateDecodingStats: updating stats %p %lld %lld %lld",
1480+
elog(DEBUG2,"UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
14811481
rb,
14821482
(long long)rb->spillTxns,
14831483
(long long)rb->spillCount,
1484-
(long long)rb->spillBytes);
1484+
(long long)rb->spillBytes,
1485+
(long long)rb->streamTxns,
1486+
(long long)rb->streamCount,
1487+
(long long)rb->streamBytes);
14851488

14861489
pgstat_report_replslot(NameStr(ctx->slot->data.name),
1487-
rb->spillTxns,rb->spillCount,rb->spillBytes);
1490+
rb->spillTxns,rb->spillCount,rb->spillBytes,
1491+
rb->streamTxns,rb->streamCount,rb->streamBytes);
14881492
rb->spillTxns=0;
14891493
rb->spillCount=0;
14901494
rb->spillBytes=0;
1495+
rb->streamTxns=0;
1496+
rb->streamCount=0;
1497+
rb->streamBytes=0;
14911498
}

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
346346
buffer->spillTxns=0;
347347
buffer->spillCount=0;
348348
buffer->spillBytes=0;
349+
buffer->streamTxns=0;
350+
buffer->streamCount=0;
351+
buffer->streamBytes=0;
349352

350353
buffer->current_restart_decoding_lsn=InvalidXLogRecPtr;
351354

@@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
34823485
{
34833486
Snapshotsnapshot_now;
34843487
CommandIdcommand_id;
3488+
Sizestream_bytes;
3489+
booltxn_is_streamed;
34853490

34863491
/* We can never reach here for a subtransaction. */
34873492
Assert(txn->toptxn==NULL);
@@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
35623567
txn->snapshot_now=NULL;
35633568
}
35643569

3570+
/*
3571+
* Remember this information to be used later to update stats. We can't
3572+
* update the stats here as an error while processing the changes would
3573+
* lead to the accumulation of stats even though we haven't streamed all
3574+
* the changes.
3575+
*/
3576+
txn_is_streamed=rbtxn_is_streamed(txn);
3577+
stream_bytes=txn->total_size;
3578+
35653579
/* Process and send the changes to output plugin. */
35663580
ReorderBufferProcessTXN(rb,txn,InvalidXLogRecPtr,snapshot_now,
35673581
command_id, true);
35683582

3583+
rb->streamCount+=1;
3584+
rb->streamBytes+=stream_bytes;
3585+
3586+
/* Don't consider already streamed transaction. */
3587+
rb->streamTxns+= (txn_is_streamed) ?0 :1;
3588+
35693589
Assert(dlist_is_empty(&txn->changes));
35703590
Assert(txn->nentries==0);
35713591
Assert(txn->nentries_mem==0);

‎src/backend/replication/slot.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
320320
* ReplicationSlotAllocationLock.
321321
*/
322322
if (SlotIsLogical(slot))
323-
pgstat_report_replslot(NameStr(slot->data.name),0,0,0);
323+
pgstat_report_replslot(NameStr(slot->data.name),0,0,0,0,0,0);
324324

325325
/*
326326
* Now that the slot has been marked as in_use and active, it's safe to

‎src/backend/utils/adt/pgstatfuncs.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
21532153
Datum
21542154
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
21552155
{
2156-
#definePG_STAT_GET_REPLICATION_SLOT_COLS5
2156+
#definePG_STAT_GET_REPLICATION_SLOT_COLS8
21572157
ReturnSetInfo*rsinfo= (ReturnSetInfo*)fcinfo->resultinfo;
21582158
TupleDesctupdesc;
21592159
Tuplestorestate*tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
22012201
values[1]=Int64GetDatum(s->spill_txns);
22022202
values[2]=Int64GetDatum(s->spill_count);
22032203
values[3]=Int64GetDatum(s->spill_bytes);
2204+
values[4]=Int64GetDatum(s->stream_txns);
2205+
values[5]=Int64GetDatum(s->stream_count);
2206+
values[6]=Int64GetDatum(s->stream_bytes);
22042207

22052208
if (s->stat_reset_timestamp==0)
2206-
nulls[4]= true;
2209+
nulls[7]= true;
22072210
else
2208-
values[4]=TimestampTzGetDatum(s->stat_reset_timestamp);
2211+
values[7]=TimestampTzGetDatum(s->stat_reset_timestamp);
22092212

22102213
tuplestore_putvalues(tupstore,tupdesc,values,nulls);
22112214
}

‎src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/*yyyymmddN */
56-
#defineCATALOG_VERSION_NO202010281
56+
#defineCATALOG_VERSION_NO202010291
5757

5858
#endif

‎src/include/catalog/pg_proc.dat

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5260,9 +5260,9 @@
52605260
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
52615261
proretset => 't', provolatile => 's', proparallel => 'r',
52625262
prorettype => 'record', proargtypes => '',
5263-
proallargtypes => '{text,int8,int8,int8,timestamptz}',
5264-
proargmodes => '{o,o,o,o,o}',
5265-
proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stats_reset}',
5263+
proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
5264+
proargmodes => '{o,o,o,o,o,o,o,o}',
5265+
proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
52665266
prosrc => 'pg_stat_get_replication_slots' },
52675267
{ oid => '6118', descr => 'statistics: information about subscription',
52685268
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',

‎src/include/pgstat.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
492492
PgStat_Counterm_spill_txns;
493493
PgStat_Counterm_spill_count;
494494
PgStat_Counterm_spill_bytes;
495+
PgStat_Counterm_stream_txns;
496+
PgStat_Counterm_stream_count;
497+
PgStat_Counterm_stream_bytes;
495498
}PgStat_MsgReplSlot;
496499

497500

@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
823826
PgStat_Counterspill_txns;
824827
PgStat_Counterspill_count;
825828
PgStat_Counterspill_bytes;
829+
PgStat_Counterstream_txns;
830+
PgStat_Counterstream_count;
831+
PgStat_Counterstream_bytes;
826832
TimestampTzstat_reset_timestamp;
827833
}PgStat_ReplSlotStats;
828834

@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
13871393
externvoidpgstat_report_checksum_failures_in_db(Oiddboid,intfailurecount);
13881394
externvoidpgstat_report_checksum_failure(void);
13891395
externvoidpgstat_report_replslot(constchar*slotname,intspilltxns,intspillcount,
1390-
intspillbytes);
1396+
intspillbytes,intstreamtxns,intstreamcount,intstreambytes);
13911397
externvoidpgstat_report_replslot_drop(constchar*slotname);
13921398

13931399
externvoidpgstat_initialize(void);

‎src/include/replication/reorderbuffer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ struct ReorderBuffer
551551
int64spillTxns;/* number of transactions spilled to disk */
552552
int64spillCount;/* spill-to-disk invocation counter */
553553
int64spillBytes;/* amount of data spilled to disk */
554+
555+
/* Statistics about transactions streamed to the decoding output plugin */
556+
int64streamTxns;/* number of transactions streamed */
557+
int64streamCount;/* streaming invocation counter */
558+
int64streamBytes;/* amount of data streamed */
554559
};
555560

556561

‎src/test/regress/expected/rules.out

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
20222022
s.spill_txns,
20232023
s.spill_count,
20242024
s.spill_bytes,
2025+
s.stream_txns,
2026+
s.stream_count,
2027+
s.stream_bytes,
20252028
s.stats_reset
2026-
FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stats_reset);
2029+
FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes,stream_txns, stream_count, stream_bytes,stats_reset);
20272030
pg_stat_slru| SELECT s.name,
20282031
s.blks_zeroed,
20292032
s.blks_hit,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp