Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8091c2c

Browse files
knizhnikkelvich
authored andcommitted
Add arbitrator
1 parent75acab9 commit8091c2c

File tree

5 files changed

+1194
-1046
lines changed

5 files changed

+1194
-1046
lines changed

‎arbitrator/arbitrator.cpp

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#include<time.h>
2+
#include<stdio.h>
3+
#include<stdarg.h>
4+
#include<stdlib.h>
5+
#include<inttypes.h>
6+
#include<sys/time.h>
7+
#include<pthread.h>
8+
9+
#include<string>
10+
#include<vector>
11+
12+
#include<pqxx/connection>
13+
#include<pqxx/transaction>
14+
#include<pqxx/subtransaction.hxx>
15+
#include<pqxx/nontransaction>
16+
#include<pqxx/pipeline>
17+
18+
typedefunsignedlonglong ulong64;/* we are not using uint64 here because we want to use %lld format for this type*/
19+
20+
typedef ulong64nodemask_t;
21+
22+
#defineBIT_CHECK(mask, bit) (((mask) & ((nodemask_t)1 << (bit))) !=0)
23+
#defineBIT_CLEAR(mask, bit) (mask &= ~((nodemask_t)1 << (bit)))
24+
#defineBIT_SET(mask, bit) (mask |= ((nodemask_t)1 << (bit)))
25+
26+
usingnamespacestd;
27+
usingnamespacepqxx;
28+
29+
template<classT>
30+
classmy_unique_ptr
31+
{
32+
T* ptr;
33+
34+
public:
35+
my_unique_ptr(T* p =NULL) : ptr(p) {}
36+
~my_unique_ptr() {delete ptr; }
37+
T&operator*() {return *ptr; }
38+
T*operator->() {return ptr; }
39+
voidoperator=(T* p) { ptr = p; }
40+
voidoperator=(my_unique_ptr& other) {
41+
ptr = other.ptr;
42+
other.ptr =NULL;
43+
}
44+
};
45+
46+
intmain (int argc,char* argv[])
47+
{
48+
vector<string> connection_strings;
49+
50+
if (argc ==1){
51+
printf("Use -h to show usage options\n");
52+
return1;
53+
}
54+
55+
for (int i =1; i < argc; i++) {
56+
if (argv[i][0] =='-') {
57+
switch (argv[i][1]) {
58+
case't':
59+
cfs.timeout =atoi(argv[++i]);
60+
continue;
61+
case'c':
62+
cfg.connections.push_back(string(argv[++i]));
63+
continue;
64+
}
65+
}
66+
printf("Options:\n"
67+
"\t-t TIMEOUT\ttimeout in seconds of waiting database connection string\n"
68+
"\t-c STR\tdatabase connection string\n");
69+
return1;
70+
}
71+
72+
size_t nConns = connection_strings.size();
73+
vector< my_unique_ptr<connection> >conns(nConns);
74+
for (size_t i =0; i < nConns; i++) {
75+
conns[i] =newconnection(connection_strings[i]);
76+
}
77+
nodemask_t disabledMask =0;
78+
nodemask_t enabledMask =0;
79+
80+
while (true) {
81+
vector< my_unique_ptr<nontransaction> >txns(conns.size());
82+
vector< my_unique_ptr<pipeline> >pipes(nConns);
83+
vector<pipeline::query_id>queries(nConns);
84+
char sql[128];
85+
sprintf(sql,"select mtm.arbitrator_poll(%lld)", disabledMask);
86+
87+
for (size_t i =0; i < nConns; i++) {
88+
if (BIT_CHECK(disabledMask, i)) {
89+
if (BIT_CHECK(enabledMask, i)) {
90+
try {
91+
delete conns[i];
92+
conns[i] =newconnection(connection_strings[i]);
93+
BIT_CLEAR(disabledMask, i);
94+
}catch (pqxx_exceptionconst& x) {
95+
conns[i] =NULL;
96+
fprintf(stderr,"Failed to connect to node %d: %s\n", (int)i+1, x.base().what());
97+
}
98+
}
99+
}
100+
if (!BIT_CHECK(disabledMask, i)) {
101+
txns[i] =newnontransaction(*conns[i]);
102+
pipes[i] =newpipeline(*txns[i]);
103+
queries[i] = pipes[i]->insert(sql);
104+
}
105+
sleep(cfg.timeout);
106+
enabledMask = disabledMask;
107+
for (size_t i =0; i < nConns; i++) {
108+
if (!BIT_CHECK(didsabledMask, i)) {
109+
if (!pipes[i]->is_finished(queries[i]))
110+
{
111+
fprintf(stderr,"Doesn't receive response from node %d within %d seconds\n", (int)i+1, cfs.timeout);
112+
BIT_SET(disabledMask, i);
113+
delete conns[i];
114+
conns[i] =NULL;
115+
}else {
116+
result r = pipes[i]->retrieve(results[i]);
117+
enabledMask &= ~r[0][0].as(nodemask_t());
118+
}
119+
}
120+
}
121+
}
122+
}
123+
}

‎multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ CREATE FUNCTION mtm.check_deadlock(xid bigint) RETURNS boolean
9191
AS'MODULE_PATHNAME','mtm_check_deadlock'
9292
LANGUAGE C;
9393

94+
CREATEFUNCTIONmtm.arbitraror_poll_status(xidbigint) RETURNSbigint
95+
AS'MODULE_PATHNAME','mtm_arbitrator_poll'
96+
LANGUAGE C;
97+
9498
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schematext, rel_nametext,primary key(rel_schema, rel_name));
9599

96100
CREATE OR REPLACEFUNCTIONmtm.alter_sequences() RETURNSbooleanAS

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp