Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3edfdff

Browse files
knizhnikkelvich
authored andcommitted
Add information about BGW to node status
1 parent5b20bea commit3edfdff

File tree

5 files changed

+23
-5
lines changed

5 files changed

+23
-5
lines changed

‎multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ AS 'MODULE_PATHNAME','mtm_get_last_csn'
3636
LANGUAGE C;
3737

3838

39-
CREATETYPEmtm.node_stateAS ("id"integer,"disabled" bool,"disconnected" bool,"catchUp" bool,"slotLag"bigint,"avgTransDelay"bigint,"lastStatusChange"timestamp,"oldestSnapshot"bigint,"connStr"text);
39+
CREATETYPEmtm.node_stateAS ("id"integer,"disabled" bool,"disconnected" bool,"catchUp" bool,"slotLag"bigint,"avgTransDelay"bigint,"lastStatusChange"timestamp,"oldestSnapshot"bigint,"SenderPid"integer,"SenderStartTime"timestamp,"ReceiverPid"integer,"ReceiverStartTime"timestamp,"connStr"text);
4040

4141
CREATEFUNCTIONmtm.get_nodes_state() RETURNS SETOFmtm.node_state
4242
AS'MODULE_PATHNAME','mtm_get_nodes_state'

‎multimaster.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,7 +2073,8 @@ void MtmDropNode(int nodeId, bool dropSlot)
20732073
staticvoid
20742074
MtmOnProcExit(intcode,Datumarg)
20752075
{
2076-
if (MtmReplicationNodeId >=0) {
2076+
if (MtmReplicationNodeId>0) {
2077+
Mtm->nodes[MtmReplicationNodeId-1].senderPid=-1;
20772078
MTM_LOG1("WAL-sender to %d is terminated",MtmReplicationNodeId);
20782079
MtmOnNodeDisconnect(MtmReplicationNodeId);
20792080
}
@@ -2085,6 +2086,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20852086
ListCell*param;
20862087
boolrecoveryCompleted= false;
20872088
MtmIsRecoverySession= false;
2089+
Mtm->nodes[MtmReplicationNodeId-1].senderPid=MyProcPid;
2090+
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime=MtmGetSystemTime();
20882091
foreach(param,args->in_params)
20892092
{
20902093
DefElem*elem=lfirst(param);
@@ -2377,7 +2380,11 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
23772380
usrfctx->values[5]=Int64GetDatum(Mtm->transCount ?Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount :0);
23782381
usrfctx->values[6]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USECS_PER_SEC));
23792382
usrfctx->values[7]=Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].oldestSnapshot);
2380-
usrfctx->values[8]=CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2383+
usrfctx->values[8]=Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderPid);
2384+
usrfctx->values[9]=Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderStartTime);
2385+
usrfctx->values[10]=Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverPid);
2386+
usrfctx->values[11]=Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverStartTime);
2387+
usrfctx->values[12]=CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23812388
usrfctx->nodeId+=1;
23822389

23832390
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(heap_form_tuple(usrfctx->desc,usrfctx->values,usrfctx->nulls)));

‎multimaster.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
#defineAnum_mtm_local_tables_rel_name 2
6161

6262
#defineNatts_mtm_cluster_state 16
63-
#defineNatts_mtm_nodes_state9
63+
#defineNatts_mtm_nodes_state13
6464

6565
typedefuint64csn_t;/* commit serial number */
6666
#defineINVALID_CSN ((csn_t)-1)
@@ -125,8 +125,12 @@ typedef struct
125125
MtmConnectionInfocon;
126126
timestamp_ttransDelay;
127127
timestamp_tlastStatusChangeTime;
128+
timestamp_treceiverStartTime;
129+
timestamp_tsenderStartTime;
130+
intsenderPid;
131+
intreceiverPid;
128132
XLogRecPtrflushPos;
129-
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
133+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
130134
}MtmNodeInfo;
131135

132136
typedefstructMtmTransState

‎pglogical_proto.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
146146
}else {
147147
csn_tcsn=MtmTransactionSnapshot(txn->xid);
148148
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
149+
/*
150+
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151+
* We do not need to send such transactions unless we perform recovery
152+
*/
149153
if (csn==INVALID_CSN&& !isRecovery) {
150154
return;
151155
}

‎pglogical_receiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ pglogical_receiver_main(Datum main_arg)
226226

227227
MtmCreateSpillDirectory(nodeId);
228228

229+
Mtm->nodes[nodeId-1].senderPid=MyProcPid;
230+
Mtm->nodes[nodeId-1].senderStartTime=MtmGetSystemTime();
231+
229232
sprintf(worker_proc,"mtm_pglogical_receiver_%d_%d",MtmNodeId,nodeId);
230233

231234
/* We're now ready to receive signals */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp