Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit294238c

Browse files
committed
Add information about BGW to node status
1 parentad75913 commit294238c

File tree

5 files changed

+23
-5
lines changed

5 files changed

+23
-5
lines changed

‎contrib/mmts/multimaster--1.0.sql‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ AS 'MODULE_PATHNAME','mtm_get_last_csn'
3636
LANGUAGE C;
3737

3838

39-
CREATETYPEmtm.node_stateAS ("id"integer,"disabled" bool,"disconnected" bool,"catchUp" bool,"slotLag"bigint,"avgTransDelay"bigint,"lastStatusChange"timestamp,"oldestSnapshot"bigint,"connStr"text);
39+
CREATETYPEmtm.node_stateAS ("id"integer,"disabled" bool,"disconnected" bool,"catchUp" bool,"slotLag"bigint,"avgTransDelay"bigint,"lastStatusChange"timestamp,"oldestSnapshot"bigint,"SenderPid"integer,"SenderStartTime"timestamp,"ReceiverPid"integer,"ReceiverStartTime"timestamp,"connStr"text);
4040

4141
CREATEFUNCTIONmtm.get_nodes_state() RETURNS SETOFmtm.node_state
4242
AS'MODULE_PATHNAME','mtm_get_nodes_state'

‎contrib/mmts/multimaster.c‎

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2074,7 +2074,8 @@ void MtmDropNode(int nodeId, bool dropSlot)
20742074
staticvoid
20752075
MtmOnProcExit(intcode,Datumarg)
20762076
{
2077-
if (MtmReplicationNodeId >=0) {
2077+
if (MtmReplicationNodeId>0) {
2078+
Mtm->nodes[MtmReplicationNodeId-1].senderPid=-1;
20782079
MTM_LOG1("WAL-sender to %d is terminated",MtmReplicationNodeId);
20792080
MtmOnNodeDisconnect(MtmReplicationNodeId);
20802081
}
@@ -2086,6 +2087,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20862087
ListCell*param;
20872088
boolrecoveryCompleted= false;
20882089
MtmIsRecoverySession= false;
2090+
Mtm->nodes[MtmReplicationNodeId-1].senderPid=MyProcPid;
2091+
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime=MtmGetSystemTime();
20892092
foreach(param,args->in_params)
20902093
{
20912094
DefElem*elem=lfirst(param);
@@ -2378,7 +2381,11 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
23782381
usrfctx->values[5]=Int64GetDatum(Mtm->transCount ?Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount :0);
23792382
usrfctx->values[6]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USECS_PER_SEC));
23802383
usrfctx->values[7]=Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].oldestSnapshot);
2381-
usrfctx->values[8]=CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2384+
usrfctx->values[8]=Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderPid);
2385+
usrfctx->values[9]=Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderStartTime);
2386+
usrfctx->values[10]=Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverPid);
2387+
usrfctx->values[11]=Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverStartTime);
2388+
usrfctx->values[12]=CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23822389
usrfctx->nodeId+=1;
23832390

23842391
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(heap_form_tuple(usrfctx->desc,usrfctx->values,usrfctx->nulls)));

‎contrib/mmts/multimaster.h‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
#defineAnum_mtm_local_tables_rel_name 2
6161

6262
#defineNatts_mtm_cluster_state 16
63-
#defineNatts_mtm_nodes_state9
63+
#defineNatts_mtm_nodes_state13
6464

6565
typedefuint64csn_t;/* commit serial number */
6666
#defineINVALID_CSN ((csn_t)-1)
@@ -125,8 +125,12 @@ typedef struct
125125
MtmConnectionInfocon;
126126
timestamp_ttransDelay;
127127
timestamp_tlastStatusChangeTime;
128+
timestamp_treceiverStartTime;
129+
timestamp_tsenderStartTime;
130+
intsenderPid;
131+
intreceiverPid;
128132
XLogRecPtrflushPos;
129-
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
133+
csn_toldestSnapshot;/* Oldest snapshot used by active transactions at this node */
130134
}MtmNodeInfo;
131135

132136
typedefstructMtmTransState

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
146146
}else {
147147
csn_tcsn=MtmTransactionSnapshot(txn->xid);
148148
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
149+
/*
150+
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151+
* We do not need to send such transactions unless we perform recovery
152+
*/
149153
if (csn==INVALID_CSN&& !isRecovery) {
150154
return;
151155
}

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ pglogical_receiver_main(Datum main_arg)
226226

227227
MtmCreateSpillDirectory(nodeId);
228228

229+
Mtm->nodes[nodeId-1].senderPid=MyProcPid;
230+
Mtm->nodes[nodeId-1].senderStartTime=MtmGetSystemTime();
231+
229232
sprintf(worker_proc,"mtm_pglogical_receiver_%d_%d",MtmNodeId,nodeId);
230233

231234
/* We're now ready to receive signals */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp