Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5f7a3ec

Browse files
knizhnikkelvich
authored andcommitted
Add mtm.get_cluster_state and mtm.get_nodes_state
1 parentf051d35 commit5f7a3ec

File tree

4 files changed

+116
-2
lines changed

4 files changed

+116
-2
lines changed

‎bgwpool.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,16 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
9797
}
9898
}
9999

100+
size_tBgwPoolGetQueueSize(BgwPool*pool)
101+
{
102+
size_tused;
103+
SpinLockAcquire(&pool->lock);
104+
used=pool->head <=pool->tail ?pool->tail-pool->head :pool->size-pool->head+pool->tail;
105+
SpinLockRelease(&pool->lock);
106+
returnused;
107+
}
108+
109+
100110
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize)
101111
{
102112
Assert(size+4 <=pool->size);

‎bgwpool.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbn
3333

3434
externvoidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
3535

36+
externsize_tBgwPoolGetQueueSize(BgwPool*pool);
37+
3638
#endif

‎multimaster--1.0.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,17 @@ CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
2323
AS'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

26+
27+
CREATETYPEmtm.node_state(idinteger, disabled bool, disconnected bool, catchUpboolean, slotLagbigint, connStrtext);
28+
29+
CREATEFUNCTIONmtm.get_nodes_state() RETURNS SETOFmtm.node_state
30+
AS'MODULE_PATHNAME','mtm_get_nodes_state'
31+
LANGUAGE C;
32+
33+
CREATETYPEmtm.cluster_state(statustext, disabledNodeMaskbigint, disconnectedNodeMaskbigint, catchUpNodeMaskbigint, nNodesinteger, nActiveQueriesinteger, queueSizebigint);
34+
35+
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
36+
AS'MODULE_PATHNAME','mtm_get_cluster_state'
37+
LANGUAGE C;
38+
2639
CREATETABLEIF NOT EXISTSmtm.ddl_log (issuedtimestamp with time zonenot null, querytext);

‎multimaster.c

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include<time.h>
1111

1212
#include"postgres.h"
13+
#include"funcapi.h"
1314
#include"fmgr.h"
1415
#include"miscadmin.h"
1516
#include"libpq-fe.h"
@@ -91,6 +92,8 @@ PG_FUNCTION_INFO_V1(mtm_stop_replication);
9192
PG_FUNCTION_INFO_V1(mtm_drop_node);
9293
PG_FUNCTION_INFO_V1(mtm_recover_node);
9394
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
95+
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
96+
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
9497

9598
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
9699
staticvoidMtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
@@ -683,6 +686,22 @@ static void MtmCheckSlots()
683686
}
684687
}
685688

689+
staticint64MtmGetSlotLag(intnodeId)
690+
{
691+
inti;
692+
for (i=0;i<max_replication_slots;i++) {
693+
ReplicationSlot*slot=&ReplicationSlotCtl->replication_slots[i];
694+
intnode;
695+
if (slot->in_use
696+
&&sscanf(slot->data.name.data,MULTIMASTER_SLOT_PATTERN,&node)==1
697+
&&node==nodeId)
698+
{
699+
returnGetXLogInsertRecPtr()-slot->data.restart_lsn;
700+
}
701+
}
702+
return-1;
703+
}
704+
686705
staticvoid
687706
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
688707
{
@@ -932,8 +951,8 @@ _PG_init(void)
932951
"Multimaster queue size",
933952
NULL,
934953
&MtmQueueSize,
935-
1024*1024,
936-
1024,
954+
256*1024*1024,
955+
1024*1024,
937956
INT_MAX,
938957
PGC_BACKEND,
939958
0,
@@ -1258,6 +1277,76 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
12581277
PG_RETURN_INT64(dtmTx.snapshot);
12591278
}
12601279

1280+
typedefstruct
1281+
{
1282+
intnodeId;
1283+
char*connStrPtr;
1284+
TupleDescdesc;
1285+
Datumvalues[6];
1286+
boolnulls[6];
1287+
}MtmGetNodeStateCtx;
1288+
1289+
Datum
1290+
mtm_get_nodes_state(PG_FUNCTION_ARGS)
1291+
{
1292+
FuncCallContext*funcctx;
1293+
MtmGetNodeStateCtx*usrfctx;
1294+
MemoryContextoldcontext;
1295+
char*p;
1296+
boolis_first_call=SRF_IS_FIRSTCALL();
1297+
1298+
if (is_first_call) {
1299+
funcctx=SRF_FIRSTCALL_INIT();
1300+
oldcontext=MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1301+
usrfctx= (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
1302+
get_call_result_type(fcinfo,NULL,&usrfctx->desc);
1303+
usrfctx->nodeId=1;
1304+
usrfctx->connStrPtr=pstrdup(MtmConnStrs);
1305+
memset(usrfctx->nulls, false,sizeof(usrfctx->nulls));
1306+
funcctx->user_fctx=usrfctx;
1307+
MemoryContextSwitchTo(oldcontext);
1308+
}
1309+
funcctx=SRF_PERCALL_SETUP();
1310+
usrfctx= (MtmGetNodeStateCtx*)funcctx->user_fctx;
1311+
if (usrfctx->nodeId>MtmNodes) {
1312+
SRF_RETURN_DONE(funcctx);
1313+
}
1314+
usrfctx->values[0]=Int32GetDatum(usrfctx->nodeId);
1315+
usrfctx->values[1]=BoolGetDatum(BIT_CHECK(dtm->disabledNodeMask,usrfctx->nodeId-1));
1316+
usrfctx->values[2]=BoolGetDatum(BIT_CHECK(dtm->connectivityMask,usrfctx->nodeId-1));
1317+
usrfctx->values[3]=BoolGetDatum(BIT_CHECK(dtm->nodeLockerMask,usrfctx->nodeId-1));
1318+
usrfctx->values[4]=Int64GetDatum(MtmGetSlotLag(usrfctx->nodeId));
1319+
p=strchr(usrfctx->connStrPtr,',');
1320+
if (p!=NULL) {
1321+
*p++='\0';
1322+
}
1323+
usrfctx->values[5]=CStringGetTextDatum(usrfctx->connStrPtr);
1324+
usrfctx->connStrPtr=p;
1325+
usrfctx->nodeId+=1;
1326+
1327+
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(heap_form_tuple(usrfctx->desc,usrfctx->values,usrfctx->nulls)));
1328+
}
1329+
1330+
Datum
1331+
mtm_get_cluster_state(PG_FUNCTION_ARGS)
1332+
{
1333+
TupleDescdesc;
1334+
Datumvalues[7];
1335+
boolnulls[7]= {false};
1336+
1337+
get_call_result_type(fcinfo,NULL,&desc);
1338+
1339+
values[0]=CStringGetTextDatum(MtmNodeStatusMnem[dtm->status]);
1340+
values[1]=Int64GetDatum(dtm->disabledNodeMask);
1341+
values[2]=Int64GetDatum(dtm->connectivityMask);
1342+
values[3]=Int64GetDatum(dtm->nodeLockerMask);
1343+
values[4]=Int32GetDatum(dtm->nNodes);
1344+
values[5]=Int32GetDatum((int)dtm->pool.active);
1345+
values[6]=Int64GetDatum(BgwPoolGetQueueSize(&dtm->pool));
1346+
1347+
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));
1348+
}
1349+
12611350
/*
12621351
* Execute statement with specified parameters and check its result
12631352
*/

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp