|
10 | 10 | #include<time.h>
|
11 | 11 |
|
12 | 12 | #include"postgres.h"
|
| 13 | +#include"funcapi.h" |
13 | 14 | #include"fmgr.h"
|
14 | 15 | #include"miscadmin.h"
|
15 | 16 | #include"libpq-fe.h"
|
@@ -91,6 +92,8 @@ PG_FUNCTION_INFO_V1(mtm_stop_replication);
|
91 | 92 | PG_FUNCTION_INFO_V1(mtm_drop_node);
|
92 | 93 | PG_FUNCTION_INFO_V1(mtm_recover_node);
|
93 | 94 | PG_FUNCTION_INFO_V1(mtm_get_snapshot);
|
| 95 | +PG_FUNCTION_INFO_V1(mtm_get_nodes_state); |
| 96 | +PG_FUNCTION_INFO_V1(mtm_get_cluster_state); |
94 | 97 |
|
95 | 98 | staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
|
96 | 99 | staticvoidMtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
|
@@ -683,6 +686,22 @@ static void MtmCheckSlots()
|
683 | 686 | }
|
684 | 687 | }
|
685 | 688 |
|
| 689 | +staticint64MtmGetSlotLag(intnodeId) |
| 690 | +{ |
| 691 | +inti; |
| 692 | +for (i=0;i<max_replication_slots;i++) { |
| 693 | +ReplicationSlot*slot=&ReplicationSlotCtl->replication_slots[i]; |
| 694 | +intnode; |
| 695 | +if (slot->in_use |
| 696 | +&&sscanf(slot->data.name.data,MULTIMASTER_SLOT_PATTERN,&node)==1 |
| 697 | +&&node==nodeId) |
| 698 | +{ |
| 699 | +returnGetXLogInsertRecPtr()-slot->data.restart_lsn; |
| 700 | +} |
| 701 | +} |
| 702 | +return-1; |
| 703 | +} |
| 704 | + |
686 | 705 | staticvoid
|
687 | 706 | MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
|
688 | 707 | {
|
@@ -932,8 +951,8 @@ _PG_init(void)
|
932 | 951 | "Multimaster queue size",
|
933 | 952 | NULL,
|
934 | 953 | &MtmQueueSize,
|
935 |
| -1024*1024, |
936 |
| -1024, |
| 954 | +256*1024*1024, |
| 955 | +1024*1024, |
937 | 956 | INT_MAX,
|
938 | 957 | PGC_BACKEND,
|
939 | 958 | 0,
|
@@ -1258,6 +1277,76 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
|
1258 | 1277 | PG_RETURN_INT64(dtmTx.snapshot);
|
1259 | 1278 | }
|
1260 | 1279 |
|
| 1280 | +typedefstruct |
| 1281 | +{ |
| 1282 | +intnodeId; |
| 1283 | +char*connStrPtr; |
| 1284 | +TupleDescdesc; |
| 1285 | +Datumvalues[6]; |
| 1286 | +boolnulls[6]; |
| 1287 | +}MtmGetNodeStateCtx; |
| 1288 | + |
| 1289 | +Datum |
| 1290 | +mtm_get_nodes_state(PG_FUNCTION_ARGS) |
| 1291 | +{ |
| 1292 | +FuncCallContext*funcctx; |
| 1293 | +MtmGetNodeStateCtx*usrfctx; |
| 1294 | +MemoryContextoldcontext; |
| 1295 | +char*p; |
| 1296 | +boolis_first_call=SRF_IS_FIRSTCALL(); |
| 1297 | + |
| 1298 | +if (is_first_call) { |
| 1299 | +funcctx=SRF_FIRSTCALL_INIT(); |
| 1300 | +oldcontext=MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
| 1301 | +usrfctx= (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx)); |
| 1302 | +get_call_result_type(fcinfo,NULL,&usrfctx->desc); |
| 1303 | +usrfctx->nodeId=1; |
| 1304 | +usrfctx->connStrPtr=pstrdup(MtmConnStrs); |
| 1305 | +memset(usrfctx->nulls, false,sizeof(usrfctx->nulls)); |
| 1306 | +funcctx->user_fctx=usrfctx; |
| 1307 | +MemoryContextSwitchTo(oldcontext); |
| 1308 | + } |
| 1309 | +funcctx=SRF_PERCALL_SETUP(); |
| 1310 | +usrfctx= (MtmGetNodeStateCtx*)funcctx->user_fctx; |
| 1311 | +if (usrfctx->nodeId>MtmNodes) { |
| 1312 | +SRF_RETURN_DONE(funcctx); |
| 1313 | +} |
| 1314 | +usrfctx->values[0]=Int32GetDatum(usrfctx->nodeId); |
| 1315 | +usrfctx->values[1]=BoolGetDatum(BIT_CHECK(dtm->disabledNodeMask,usrfctx->nodeId-1)); |
| 1316 | +usrfctx->values[2]=BoolGetDatum(BIT_CHECK(dtm->connectivityMask,usrfctx->nodeId-1)); |
| 1317 | +usrfctx->values[3]=BoolGetDatum(BIT_CHECK(dtm->nodeLockerMask,usrfctx->nodeId-1)); |
| 1318 | +usrfctx->values[4]=Int64GetDatum(MtmGetSlotLag(usrfctx->nodeId)); |
| 1319 | +p=strchr(usrfctx->connStrPtr,','); |
| 1320 | +if (p!=NULL) { |
| 1321 | +*p++='\0'; |
| 1322 | +} |
| 1323 | +usrfctx->values[5]=CStringGetTextDatum(usrfctx->connStrPtr); |
| 1324 | +usrfctx->connStrPtr=p; |
| 1325 | +usrfctx->nodeId+=1; |
| 1326 | + |
| 1327 | +SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(heap_form_tuple(usrfctx->desc,usrfctx->values,usrfctx->nulls))); |
| 1328 | +} |
| 1329 | + |
| 1330 | +Datum |
| 1331 | +mtm_get_cluster_state(PG_FUNCTION_ARGS) |
| 1332 | +{ |
| 1333 | +TupleDescdesc; |
| 1334 | +Datumvalues[7]; |
| 1335 | +boolnulls[7]= {false}; |
| 1336 | + |
| 1337 | +get_call_result_type(fcinfo,NULL,&desc); |
| 1338 | + |
| 1339 | +values[0]=CStringGetTextDatum(MtmNodeStatusMnem[dtm->status]); |
| 1340 | +values[1]=Int64GetDatum(dtm->disabledNodeMask); |
| 1341 | +values[2]=Int64GetDatum(dtm->connectivityMask); |
| 1342 | +values[3]=Int64GetDatum(dtm->nodeLockerMask); |
| 1343 | +values[4]=Int32GetDatum(dtm->nNodes); |
| 1344 | +values[5]=Int32GetDatum((int)dtm->pool.active); |
| 1345 | +values[6]=Int64GetDatum(BgwPoolGetQueueSize(&dtm->pool)); |
| 1346 | + |
| 1347 | +PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls))); |
| 1348 | +} |
| 1349 | + |
1261 | 1350 | /*
|
1262 | 1351 | * Execute statement with specified parameters and check its result
|
1263 | 1352 | */
|
|