Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit87cd559

Browse files
committed
Add mtm.get_cluster_state and mtm.get_nodes_state
1 parent00ec3e9 commit87cd559

File tree

4 files changed

+116
-2
lines changed

4 files changed

+116
-2
lines changed

‎contrib/mmts/bgwpool.c‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,16 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
9797
}
9898
}
9999

100+
size_tBgwPoolGetQueueSize(BgwPool*pool)
101+
{
102+
size_tused;
103+
SpinLockAcquire(&pool->lock);
104+
used=pool->head <=pool->tail ?pool->tail-pool->head :pool->size-pool->head+pool->tail;
105+
SpinLockRelease(&pool->lock);
106+
returnused;
107+
}
108+
109+
100110
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize)
101111
{
102112
Assert(size+4 <=pool->size);

‎contrib/mmts/bgwpool.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbn
3333

3434
externvoidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
3535

36+
externsize_tBgwPoolGetQueueSize(BgwPool*pool);
37+
3638
#endif

‎contrib/mmts/multimaster--1.0.sql‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,17 @@ CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
2323
AS'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

26+
27+
CREATETYPEmtm.node_state(idinteger, disabled bool, disconnected bool, catchUpboolean, slotLagbigint, connStrtext);
28+
29+
CREATEFUNCTIONmtm.get_nodes_state() RETURNS SETOFmtm.node_state
30+
AS'MODULE_PATHNAME','mtm_get_nodes_state'
31+
LANGUAGE C;
32+
33+
CREATETYPEmtm.cluster_state(statustext, disabledNodeMaskbigint, disconnectedNodeMaskbigint, catchUpNodeMaskbigint, nNodesinteger, nActiveQueriesinteger, queueSizebigint);
34+
35+
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
36+
AS'MODULE_PATHNAME','mtm_get_cluster_state'
37+
LANGUAGE C;
38+
2639
CREATETABLEIF NOT EXISTSmtm.ddl_log (issuedtimestamp with time zonenot null, querytext);

‎contrib/mmts/multimaster.c‎

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include<time.h>
1111

1212
#include"postgres.h"
13+
#include"funcapi.h"
1314
#include"fmgr.h"
1415
#include"miscadmin.h"
1516
#include"libpq-fe.h"
@@ -91,6 +92,8 @@ PG_FUNCTION_INFO_V1(mtm_stop_replication);
9192
PG_FUNCTION_INFO_V1(mtm_drop_node);
9293
PG_FUNCTION_INFO_V1(mtm_recover_node);
9394
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
95+
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
96+
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
9497

9598
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
9699
staticvoidMtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
@@ -683,6 +686,22 @@ static void MtmCheckSlots()
683686
}
684687
}
685688

689+
staticint64MtmGetSlotLag(intnodeId)
690+
{
691+
inti;
692+
for (i=0;i<max_replication_slots;i++) {
693+
ReplicationSlot*slot=&ReplicationSlotCtl->replication_slots[i];
694+
intnode;
695+
if (slot->in_use
696+
&&sscanf(slot->data.name.data,MULTIMASTER_SLOT_PATTERN,&node)==1
697+
&&node==nodeId)
698+
{
699+
returnGetXLogInsertRecPtr()-slot->data.restart_lsn;
700+
}
701+
}
702+
return-1;
703+
}
704+
686705
staticvoid
687706
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
688707
{
@@ -932,8 +951,8 @@ _PG_init(void)
932951
"Multimaster queue size",
933952
NULL,
934953
&MtmQueueSize,
935-
1024*1024,
936-
1024,
954+
256*1024*1024,
955+
1024*1024,
937956
INT_MAX,
938957
PGC_BACKEND,
939958
0,
@@ -1258,6 +1277,76 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
12581277
PG_RETURN_INT64(dtmTx.snapshot);
12591278
}
12601279

1280+
typedefstruct
1281+
{
1282+
intnodeId;
1283+
char*connStrPtr;
1284+
TupleDescdesc;
1285+
Datumvalues[6];
1286+
boolnulls[6];
1287+
}MtmGetNodeStateCtx;
1288+
1289+
Datum
1290+
mtm_get_nodes_state(PG_FUNCTION_ARGS)
1291+
{
1292+
FuncCallContext*funcctx;
1293+
MtmGetNodeStateCtx*usrfctx;
1294+
MemoryContextoldcontext;
1295+
char*p;
1296+
boolis_first_call=SRF_IS_FIRSTCALL();
1297+
1298+
if (is_first_call) {
1299+
funcctx=SRF_FIRSTCALL_INIT();
1300+
oldcontext=MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1301+
usrfctx= (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
1302+
get_call_result_type(fcinfo,NULL,&usrfctx->desc);
1303+
usrfctx->nodeId=1;
1304+
usrfctx->connStrPtr=pstrdup(MtmConnStrs);
1305+
memset(usrfctx->nulls, false,sizeof(usrfctx->nulls));
1306+
funcctx->user_fctx=usrfctx;
1307+
MemoryContextSwitchTo(oldcontext);
1308+
}
1309+
funcctx=SRF_PERCALL_SETUP();
1310+
usrfctx= (MtmGetNodeStateCtx*)funcctx->user_fctx;
1311+
if (usrfctx->nodeId>MtmNodes) {
1312+
SRF_RETURN_DONE(funcctx);
1313+
}
1314+
usrfctx->values[0]=Int32GetDatum(usrfctx->nodeId);
1315+
usrfctx->values[1]=BoolGetDatum(BIT_CHECK(dtm->disabledNodeMask,usrfctx->nodeId-1));
1316+
usrfctx->values[2]=BoolGetDatum(BIT_CHECK(dtm->connectivityMask,usrfctx->nodeId-1));
1317+
usrfctx->values[3]=BoolGetDatum(BIT_CHECK(dtm->nodeLockerMask,usrfctx->nodeId-1));
1318+
usrfctx->values[4]=Int64GetDatum(MtmGetSlotLag(usrfctx->nodeId));
1319+
p=strchr(usrfctx->connStrPtr,',');
1320+
if (p!=NULL) {
1321+
*p++='\0';
1322+
}
1323+
usrfctx->values[5]=CStringGetTextDatum(usrfctx->connStrPtr);
1324+
usrfctx->connStrPtr=p;
1325+
usrfctx->nodeId+=1;
1326+
1327+
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(heap_form_tuple(usrfctx->desc,usrfctx->values,usrfctx->nulls)));
1328+
}
1329+
1330+
Datum
1331+
mtm_get_cluster_state(PG_FUNCTION_ARGS)
1332+
{
1333+
TupleDescdesc;
1334+
Datumvalues[7];
1335+
boolnulls[7]= {false};
1336+
1337+
get_call_result_type(fcinfo,NULL,&desc);
1338+
1339+
values[0]=CStringGetTextDatum(MtmNodeStatusMnem[dtm->status]);
1340+
values[1]=Int64GetDatum(dtm->disabledNodeMask);
1341+
values[2]=Int64GetDatum(dtm->connectivityMask);
1342+
values[3]=Int64GetDatum(dtm->nodeLockerMask);
1343+
values[4]=Int32GetDatum(dtm->nNodes);
1344+
values[5]=Int32GetDatum((int)dtm->pool.active);
1345+
values[6]=Int64GetDatum(BgwPoolGetQueueSize(&dtm->pool));
1346+
1347+
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));
1348+
}
1349+
12611350
/*
12621351
* Execute statement with specified parameters and check its result
12631352
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp