Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf510c8f

Browse files
committed
Add mm_disable_node function
1 parent63663e5 commitf510c8f

File tree

2 files changed

+116
-61
lines changed

2 files changed

+116
-61
lines changed

‎contrib/multimaster/multimaster--1.0.sql‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,7 @@ CREATE FUNCTION mm_stop_replication() RETURNS void
99
AS'MODULE_PATHNAME','mm_stop_replication'
1010
LANGUAGE C;
1111

12+
CREATEFUNCTIONmm_disable_node(nodeinteger) RETURNS void
13+
AS'MODULE_PATHNAME','mm_disable_node'
14+
LANGUAGE C;
15+

‎contrib/multimaster/multimaster.c‎

Lines changed: 112 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ typedef struct
5959
TransactionIdminXid;/* XID of oldest transaction visible by any active transaction (local or global) */
6060
TransactionIdnextXid;/* next XID for local transaction */
6161
size_tnReservedXids;/* number of XIDs reserved for local transactions */
62+
int64disabledNodeMask;
6263
intnNodes;
6364
pg_atomic_uint32nReceivers;
6465
boolinitialized;
@@ -74,13 +75,16 @@ typedef struct
7475
#defineDTM_SHMEM_SIZE (64*1024*1024)
7576
#defineDTM_HASH_SIZE 1003
7677

78+
#defineBIT_SET(mask,bit) ((mask) & ((int64)1 << (bit)))
79+
7780
void_PG_init(void);
7881
void_PG_fini(void);
7982

8083
PG_MODULE_MAGIC;
8184

8285
PG_FUNCTION_INFO_V1(mm_start_replication);
8386
PG_FUNCTION_INFO_V1(mm_stop_replication);
87+
PG_FUNCTION_INFO_V1(mm_disable_node);
8488

8589
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
8690
staticvoidDtmMergeWithGlobalSnapshot(Snapshotsnapshot);
@@ -108,6 +112,7 @@ static void DtmBackgroundWorker(Datum arg);
108112
staticvoidMMMarkTransAsLocal(TransactionIdxid);
109113
staticBgwPool*MMPoolConstructor(void);
110114
staticboolMMRunUtilityStmt(PGconn*conn,charconst*sql);
115+
staticvoidMMBroadcastUtilityStmt(charconst*sql,boolignoreError);
111116

112117
staticHTAB*xid_in_doubt;
113118
staticHTAB*local_trans;
@@ -737,6 +742,7 @@ static void DtmInitialize()
737742
dtm->nReservedXids=0;
738743
dtm->minXid=InvalidTransactionId;
739744
dtm->nNodes=MMNodes;
745+
dtm->disabledNodeMask=0;
740746
pg_atomic_write_u32(&dtm->nReceivers,0);
741747
dtm->initialized= false;
742748
BgwPoolInit(&dtm->pool,MMExecutor,MMDatabaseName,MMQueueSize);
@@ -1209,6 +1215,22 @@ mm_stop_replication(PG_FUNCTION_ARGS)
12091215
PG_RETURN_VOID();
12101216
}
12111217

1218+
Datum
1219+
mm_disable_node(PG_FUNCTION_ARGS)
1220+
{
1221+
intnodeId=PG_GETARG_INT32(0);
1222+
if (!BIT_SET(dtm->disabledNodeMask,nodeId))
1223+
{
1224+
dtm->disabledNodeMask |= ((int64)1 <<nodeId);
1225+
dtm->nNodes-=1;
1226+
if (!IsTransactionBlock())
1227+
{
1228+
MMBroadcastUtilityStmt(psprintf("select mm_disable_node(%d)",nodeId), true);
1229+
}
1230+
}
1231+
PG_RETURN_VOID();
1232+
}
1233+
12121234
/*
12131235
* Execute statement with specified parameters and check its result
12141236
*/
@@ -1224,6 +1246,95 @@ static bool MMRunUtilityStmt(PGconn* conn, char const* sql)
12241246
returnret;
12251247
}
12261248

1249+
staticvoidMMBroadcastUtilityStmt(charconst*sql,boolignoreError)
1250+
{
1251+
char*conn_str=pstrdup(MMConnStrs);
1252+
char*conn_str_end=conn_str+strlen(conn_str);
1253+
inti=0;
1254+
int64disabledNodeMask=dtm->disabledNodeMask;
1255+
intfailedNode=-1;
1256+
charconst*errorMsg=NULL;
1257+
PGconn**conns=palloc0(sizeof(PGconn*)*MMNodes);
1258+
1259+
while (conn_str<conn_str_end)
1260+
{
1261+
char*p=strchr(conn_str,',');
1262+
if (p==NULL) {
1263+
p=conn_str_end;
1264+
}
1265+
*p='\0';
1266+
if (!BIT_SET(disabledNodeMask,i))
1267+
{
1268+
conns[i]=PQconnectdb(conn_str);
1269+
if (PQstatus(conns[i])!=CONNECTION_OK)
1270+
{
1271+
if (ignoreError)
1272+
{
1273+
PQfinish(conns[i]);
1274+
conns[i]=NULL;
1275+
}else {
1276+
failedNode=i;
1277+
do {
1278+
PQfinish(conns[i]);
1279+
}while (--i >=0);
1280+
elog(ERROR,"Failed to establish connection '%s' to node %d",conn_str,failedNode);
1281+
}
1282+
}
1283+
}
1284+
conn_str=p+1;
1285+
i+=1;
1286+
}
1287+
Assert(i==MMNodes);
1288+
1289+
for (i=0;i<MMNodes;i++)
1290+
{
1291+
if (conns[i])
1292+
{
1293+
if (!MMRunUtilityStmt(conns[i],"BEGIN TRANSACTION")&& !ignoreError)
1294+
{
1295+
errorMsg="Failed to start transaction at node %d";
1296+
failedNode=i;
1297+
break;
1298+
}
1299+
if (!MMRunUtilityStmt(conns[i],sql)&& !ignoreError)
1300+
{
1301+
errorMsg="Failed to run command at node %d";
1302+
failedNode=i;
1303+
break;
1304+
}
1305+
}
1306+
}
1307+
if (failedNode >=0&& !ignoreError)
1308+
{
1309+
for (i=0;i<MMNodes;i++)
1310+
{
1311+
if (conns[i])
1312+
{
1313+
MMRunUtilityStmt(conns[i],"ROLLBACK TRANSACTION");
1314+
}
1315+
}
1316+
}else {
1317+
for (i=0;i<MMNodes;i++)
1318+
{
1319+
if (conns[i]&& !MMRunUtilityStmt(conns[i],"COMMIT TRANSACTION")&& !ignoreError)
1320+
{
1321+
errorMsg="Commit failed at node %d";
1322+
failedNode=i;
1323+
}
1324+
}
1325+
}
1326+
for (i=0;i<MMNodes;i++)
1327+
{
1328+
if (conns[i])
1329+
{
1330+
PQfinish(conns[i]);
1331+
}
1332+
}
1333+
if (!ignoreError&&failedNode >=0)
1334+
{
1335+
elog(ERROR,errorMsg,failedNode+1);
1336+
}
1337+
}
12271338

12281339
staticvoidMMProcessUtility(Node*parsetree,constchar*queryString,
12291340
ProcessUtilityContextcontext,ParamListInfoparams,
@@ -1267,67 +1378,7 @@ static void MMProcessUtility(Node *parsetree, const char *queryString,
12671378
MMIsDistributedTrans= false;
12681379
}
12691380
}else {
1270-
char*conn_str=pstrdup(MMConnStrs);
1271-
char*conn_str_end=conn_str+strlen(conn_str);
1272-
inti=0;
1273-
intfailedNode=-1;
1274-
charconst*errorMsg=NULL;
1275-
PGconn**conns;
1276-
conns=palloc(sizeof(PGconn*)*MMNodes);
1277-
1278-
while (conn_str<conn_str_end) {
1279-
char*p=strchr(conn_str,',');
1280-
if (p==NULL) {
1281-
p=conn_str_end;
1282-
}
1283-
*p='\0';
1284-
conns[i]=PQconnectdb(conn_str);
1285-
if (PQstatus(conns[i])!=CONNECTION_OK)
1286-
{
1287-
failedNode=i;
1288-
do {
1289-
PQfinish(conns[i]);
1290-
}while (--i >=0);
1291-
elog(ERROR,"Failed to establish connection '%s' to node %d",conn_str,failedNode);
1292-
}
1293-
conn_str=p+1;
1294-
i+=1;
1295-
}
1296-
Assert(i==MMNodes);
1297-
1298-
for (i=0;i<MMNodes;i++) {
1299-
if (!MMRunUtilityStmt(conns[i],"BEGIN TRANSACTION"))
1300-
{
1301-
errorMsg="Failed to start transaction at node %d";
1302-
failedNode=i;
1303-
break;
1304-
}
1305-
if (!MMRunUtilityStmt(conns[i],queryString))
1306-
{
1307-
errorMsg="Failed to run command at node %d";
1308-
failedNode=i;
1309-
break;
1310-
}
1311-
}
1312-
if (failedNode >=0)
1313-
{
1314-
for (i=0;i<MMNodes;i++) {
1315-
MMRunUtilityStmt(conns[i],"ROLLBACK TRANSACTION");
1316-
}
1317-
}else {
1318-
for (i=0;i<MMNodes;i++) {
1319-
if (!MMRunUtilityStmt(conns[i],"COMMIT TRANSACTION")) {
1320-
errorMsg="Commit failed at node %d";
1321-
failedNode=i;
1322-
}
1323-
}
1324-
}
1325-
for (i=0;i<MMNodes;i++) {
1326-
PQfinish(conns[i]);
1327-
}
1328-
if (failedNode >=0) {
1329-
elog(ERROR,errorMsg,failedNode+1);
1330-
}
1381+
MMBroadcastUtilityStmt(queryString, false);
13311382
}
13321383
}
13331384
staticvoid

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp