Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit97a3e42

Browse files
knizhnikkelvich
authored andcommitted
Add mtm.get_cluster_info function
1 parentabd79ad commit97a3e42

File tree

5 files changed

+88
-29
lines changed

5 files changed

+88
-29
lines changed

‎bgwpool.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ typedef struct
1616
intid;
1717
}BgwPoolExecutorCtx;
1818

19-
size_tn_snapshots;
20-
size_tn_active;
21-
2219
staticvoidBgwPoolMainLoop(Datumarg)
2320
{
2421
BgwPoolExecutorCtx*ctx= (BgwPoolExecutorCtx*)arg;
@@ -36,7 +33,8 @@ static void BgwPoolMainLoop(Datum arg)
3633
size=*(int*)&pool->queue[pool->head];
3734
Assert(size<pool->size);
3835
work=malloc(size);
39-
pool->active-=1;
36+
pool->pending-=1;
37+
pool->active+=1;
4038
if (pool->head+size+4>pool->size) {
4139
memcpy(work,pool->queue,size);
4240
pool->head=INTALIGN(size);
@@ -54,6 +52,9 @@ static void BgwPoolMainLoop(Datum arg)
5452
SpinLockRelease(&pool->lock);
5553
pool->executor(id,work,size);
5654
free(work);
55+
SpinLockAcquire(&pool->lock);
56+
pool->active-=1;
57+
SpinLockRelease(&pool->lock);
5758
}
5859
}
5960

@@ -71,6 +72,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7172
pool->tail=0;
7273
pool->size=queueSize;
7374
pool->active=0;
75+
pool->pending=0;
7476
strcpy(pool->dbname,dbname);
7577
}
7678

@@ -126,9 +128,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
126128
PGSemaphoreLock(&pool->overflow);
127129
SpinLockAcquire(&pool->lock);
128130
}else {
129-
pool->active+=1;
130-
n_snapshots+=1;
131-
n_active+=pool->active;
131+
pool->pending+=1;
132132
*(int*)&pool->queue[pool->tail]=size;
133133
if (pool->size-pool->tail >=size+4) {
134134
memcpy(&pool->queue[pool->tail+4],work,size);

‎bgwpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct
2020
size_ttail;
2121
size_tsize;
2222
size_tactive;
23+
size_tpending;
2324
boolproducerBlocked;
2425
chardbname[MAX_DBNAME_LEN];
2526
char*queue;

‎multimaster--1.0.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,22 @@ AS 'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

2626

27-
CREATETYPEmtm.node_stateAS (idinteger, disabled bool, disconnected bool, catchUp bool, slotLagbigint, avgTransDelaybigint, lastStatusChangetimestamp, connStrtext);
27+
CREATETYPEmtm.node_stateAS ("id"integer,"disabled" bool,"disconnected" bool,"catchUp" bool,"slotLag"bigint,"avgTransDelay"bigint,"lastStatusChange"timestamp,"connStr"text);
2828

2929
CREATEFUNCTIONmtm.get_nodes_state() RETURNS SETOFmtm.node_state
3030
AS'MODULE_PATHNAME','mtm_get_nodes_state'
3131
LANGUAGE C;
3232

33-
CREATETYPEmtm.cluster_stateAS (statustext, disabledNodeMaskbigint, disconnectedNodeMaskbigint, catchUpNodeMaskbigint, nNodesinteger, nActiveQueriesinteger, queueSizebigint, transCountbigint, timeShiftbigint, recoverySlotinteger);
33+
CREATETYPEmtm.cluster_stateAS ("status"text,"disabledNodeMask"bigint,"disconnectedNodeMask"bigint,"catchUpNodeMask"bigint,"nNodes"integer,"nActiveQueries"integer,"nPendingQueries"integer,"queueSize"bigint,"transCount"bigint,"timeShift"bigint,"recoverySlot"integer);
3434

3535
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
3636
AS'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATEFUNCTIONmtm.get_cluster_info() RETURNS SETOFmtm.cluster_state
40+
AS'MODULE_PATHNAME','mtm_get_cluster_info'
41+
LANGUAGE C;
42+
3943
CREATEFUNCTIONmtm.make_table_local(relation regclass) RETURNS void
4044
AS'MODULE_PATHNAME','mtm_make_table_local'
4145
LANGUAGE C;

‎multimaster.c

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
107107
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
108108
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
109109
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
110+
PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
110111
PG_FUNCTION_INFO_V1(mtm_make_table_local);
111112
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
112113

@@ -1608,7 +1609,7 @@ _PG_init(void)
16081609
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
16091610
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16101611
&Mtm2PCMinTimeout,
1611-
10000,
1612+
100000,/* 100 seconds */
16121613
0,
16131614
INT_MAX,
16141615
PGC_BACKEND,
@@ -1623,7 +1624,7 @@ _PG_init(void)
16231624
"Percent of prepare time for maximal time of second phase of two-pahse commit",
16241625
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16251626
&Mtm2PCPrepareRatio,
1626-
100,
1627+
1000,/* 10 times */
16271628
0,
16281629
INT_MAX,
16291630
PGC_BACKEND,
@@ -2177,10 +2178,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
21772178
typedefstruct
21782179
{
21792180
intnodeId;
2180-
char*connStrPtr;
21812181
TupleDescdesc;
2182-
Datumvalues[8];
2183-
boolnulls[8];
2182+
Datumvalues[Natts_mtm_nodes_state];
2183+
boolnulls[Natts_mtm_nodes_state];
21842184
}MtmGetNodeStateCtx;
21852185

21862186
Datum
@@ -2189,7 +2189,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21892189
FuncCallContext*funcctx;
21902190
MtmGetNodeStateCtx*usrfctx;
21912191
MemoryContextoldcontext;
2192-
char*p;
21932192
int64lag;
21942193
boolis_first_call=SRF_IS_FIRSTCALL();
21952194

@@ -2199,7 +2198,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21992198
usrfctx= (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
22002199
get_call_result_type(fcinfo,NULL,&usrfctx->desc);
22012200
usrfctx->nodeId=1;
2202-
usrfctx->connStrPtr=pstrdup(MtmConnStrs);
22032201
memset(usrfctx->nulls, false,sizeof(usrfctx->nulls));
22042202
funcctx->user_fctx=usrfctx;
22052203
MemoryContextSwitchTo(oldcontext);
@@ -2218,23 +2216,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22182216
usrfctx->nulls[4]=lag<0;
22192217
usrfctx->values[5]=Int64GetDatum(Mtm->transCount ?Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount :0);
22202218
usrfctx->values[6]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2221-
p=strchr(usrfctx->connStrPtr,',');
2222-
if (p!=NULL) {
2223-
*p++='\0';
2224-
}
2225-
usrfctx->values[7]=CStringGetTextDatum(usrfctx->connStrPtr);
2226-
usrfctx->connStrPtr=p;
2219+
usrfctx->values[7]=CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
22272220
usrfctx->nodeId+=1;
22282221

22292222
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(heap_form_tuple(usrfctx->desc,usrfctx->values,usrfctx->nulls)));
22302223
}
22312224

2225+
22322226
Datum
22332227
mtm_get_cluster_state(PG_FUNCTION_ARGS)
22342228
{
22352229
TupleDescdesc;
2236-
Datumvalues[10];
2237-
boolnulls[10]= {false};
2230+
Datumvalues[Natts_mtm_cluster_state];
2231+
boolnulls[Natts_mtm_cluster_state]= {false};
22382232
get_call_result_type(fcinfo,NULL,&desc);
22392233

22402234
values[0]=CStringGetTextDatum(MtmNodeStatusMnem[Mtm->status]);
@@ -2243,16 +2237,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
22432237
values[3]=Int64GetDatum(Mtm->nodeLockerMask);
22442238
values[4]=Int32GetDatum(Mtm->nNodes);
22452239
values[5]=Int32GetDatum((int)Mtm->pool.active);
2246-
values[6]=Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2247-
values[7]=Int64GetDatum(Mtm->transCount);
2248-
values[8]=Int64GetDatum(Mtm->timeShift);
2249-
values[9]=Int32GetDatum(Mtm->recoverySlot);
2250-
nulls[9]=Mtm->recoverySlot==0;
2240+
values[6]=Int32GetDatum((int)Mtm->pool.pending);
2241+
values[7]=Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2242+
values[8]=Int64GetDatum(Mtm->transCount);
2243+
values[9]=Int64GetDatum(Mtm->timeShift);
2244+
values[10]=Int32GetDatum(Mtm->recoverySlot);
22512245

22522246
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));
22532247
}
22542248

22552249

2250+
typedefstruct
2251+
{
2252+
intnodeId;
2253+
}MtmGetClusterInfoCtx;
2254+
2255+
2256+
Datum
2257+
mtm_get_cluster_info(PG_FUNCTION_ARGS)
2258+
{
2259+
2260+
FuncCallContext*funcctx;
2261+
MtmGetClusterInfoCtx*usrfctx;
2262+
MemoryContextoldcontext;
2263+
TupleDescdesc;
2264+
boolis_first_call=SRF_IS_FIRSTCALL();
2265+
inti;
2266+
PGconn*conn;
2267+
PGresult*result;
2268+
char*values[Natts_mtm_cluster_state];
2269+
HeapTupletuple;
2270+
2271+
if (is_first_call) {
2272+
funcctx=SRF_FIRSTCALL_INIT();
2273+
oldcontext=MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
2274+
usrfctx= (MtmGetClusterInfoCtx*)palloc(sizeof(MtmGetNodeStateCtx));
2275+
get_call_result_type(fcinfo,NULL,&desc);
2276+
funcctx->attinmeta=TupleDescGetAttInMetadata(desc);
2277+
usrfctx->nodeId=1;
2278+
funcctx->user_fctx=usrfctx;
2279+
MemoryContextSwitchTo(oldcontext);
2280+
}
2281+
funcctx=SRF_PERCALL_SETUP();
2282+
usrfctx= (MtmGetClusterInfoCtx*)funcctx->user_fctx;
2283+
if (usrfctx->nodeId>MtmNodes) {
2284+
SRF_RETURN_DONE(funcctx);
2285+
}
2286+
conn=PQconnectdb(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2287+
if (PQstatus(conn)!=CONNECTION_OK) {
2288+
elog(ERROR,"Failed to establish connection '%s' to node %d",Mtm->nodes[usrfctx->nodeId-1].con.connStr,usrfctx->nodeId);
2289+
}
2290+
result=PQexec(conn,"select * from mtm.get_cluster_state()");
2291+
2292+
if (PQresultStatus(result)!=PGRES_TUPLES_OK||PQntuples(result)!=1) {
2293+
elog(ERROR,"Failed to receive data from %d",usrfctx->nodeId);
2294+
}
2295+
2296+
for (i=0;i<Natts_mtm_cluster_state;i++) {
2297+
values[i]=PQgetvalue(result,0,i);
2298+
}
2299+
tuple=BuildTupleFromCStrings(funcctx->attinmeta,values);
2300+
PQclear(result);
2301+
PQfinish(conn);
2302+
usrfctx->nodeId+=1;
2303+
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(tuple));
2304+
}
2305+
2306+
22562307
Datummtm_make_table_local(PG_FUNCTION_ARGS)
22572308
{
22582309
Oidreloid=PG_GETARG_OID(1);

‎multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
#defineAnum_mtm_local_tables_rel_schema 1
5757
#defineAnum_mtm_local_tables_rel_name 2
5858

59+
#defineNatts_mtm_cluster_state 11
60+
#defineNatts_mtm_nodes_state 8
61+
5962
typedefuint64csn_t;/* commit serial number */
6063
#defineINVALID_CSN ((csn_t)-1)
6164

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp