2
2
#include < stdio.h>
3
3
#include < stdarg.h>
4
4
#include < stdlib.h>
5
+ #include < unistd.h>
5
6
#include < inttypes.h>
6
7
#include < sys/time.h>
7
8
#include < pthread.h>
@@ -34,18 +35,34 @@ class my_unique_ptr
34
35
public:
35
36
my_unique_ptr (T* p =NULL ) : ptr(p) {}
36
37
~my_unique_ptr () {delete ptr; }
37
- T&operator *() {return *ptr; }
38
- T*operator ->() {return ptr; }
39
- void operator =(T* p) { ptr = p; }
38
+ T&operator *()const {return *ptr; }
39
+ T*operator ->()const {return ptr; }
40
+ bool operator ==(Tconst * p)const {return ptr == p; }
41
+ bool operator !=(Tconst * p)const {return ptr != p; }
42
+ void operator =(T* p) {
43
+ delete ptr;
44
+ ptr = p;
45
+ }
40
46
void operator =(my_unique_ptr& other) {
47
+ delete ptr;
41
48
ptr = other.ptr ;
42
49
other.ptr =NULL ;
43
50
}
44
51
};
45
52
53
+ struct config
54
+ {
55
+ int timeout;
56
+ vector<string> connections;
57
+
58
+ config () {
59
+ timeout =1000000 ;// 1 second
60
+ }
61
+ };
62
+
46
63
int main (int argc,char * argv[])
47
64
{
48
- vector<string> connection_strings ;
65
+ config cfg ;
49
66
50
67
if (argc ==1 ){
51
68
printf (" Use -h to show usage options\n " );
@@ -56,26 +73,27 @@ int main (int argc, char* argv[])
56
73
if (argv[i][0 ] ==' -' ) {
57
74
switch (argv[i][1 ]) {
58
75
case ' t' :
59
- cfs .timeout =atoi (argv[++i]);
76
+ cfg .timeout =atoi (argv[++i]);
60
77
continue ;
61
78
case ' c' :
62
79
cfg.connections .push_back (string (argv[++i]));
63
80
continue ;
64
81
}
65
82
}
66
83
printf (" Options:\n "
67
- " \t -t TIMEOUT\t timeout inseconds of waiting database connection string\n "
84
+ " \t -t TIMEOUT\t timeout inmicroseconds of waiting database connection string (default: 1 second) \n "
68
85
" \t -c STR\t database connection string\n " );
69
86
return 1 ;
70
87
}
71
88
72
- size_t nConns =connection_strings .size ();
89
+ size_t nConns =cfg. connections .size ();
73
90
vector< my_unique_ptr<connection> >conns (nConns);
74
91
for (size_t i =0 ; i < nConns; i++) {
75
- conns[i] =new connection (connection_strings [i]);
92
+ conns[i] =new connection (cfg. connections [i]);
76
93
}
77
94
nodemask_t disabledMask =0 ;
78
- nodemask_t enabledMask =0 ;
95
+ nodemask_t newEnabledMask =0 ;
96
+ nodemask_t oldEnabledMask =0 ;
79
97
80
98
while (true ) {
81
99
vector< my_unique_ptr<nontransaction> >txns (conns.size ());
@@ -84,46 +102,63 @@ int main (int argc, char* argv[])
84
102
char sql[128 ];
85
103
sprintf (sql," select mtm.arbitrator_poll(%lld)" , disabledMask);
86
104
105
+ // Initiate queries to all live nodes
87
106
for (size_t i =0 ; i < nConns; i++) {
88
- if (BIT_CHECK (disabledMask, i)) {
89
- if (BIT_CHECK (enabledMask, i)) {
107
+ // Some of live node reestablished connection with dead node, so arbitrator should also try to connect to this node
108
+ if (conns[i] ==NULL ) {
109
+ if (BIT_CHECK (newEnabledMask, i)) {
90
110
try {
91
- delete conns[i];
92
- conns[i] =new connection (connection_strings[i]);
111
+ conns[i] =new connection (cfg.connections [i]);
93
112
BIT_CLEAR (disabledMask, i);
113
+ fprintf (stdout," Reestablish connection with node %d\n " , (int )i+1 );
94
114
}catch (pqxx_exceptionconst & x) {
95
- conns[i] =NULL ;
96
- fprintf (stderr," Failed to connect to node %d: %s\n " , (int )i+1 , x.base ().what ());
115
+ if (conns[i] ==NULL ) {
116
+ conns[i] =NULL ;
117
+ fprintf (stderr," Failed to connect to node %d: %s\n " , (int )i+1 , x.base ().what ());
118
+ }
97
119
}
98
120
}
99
- }
100
- if (!BIT_CHECK (disabledMask, i)) {
121
+ }else {
101
122
txns[i] =new nontransaction (*conns[i]);
102
123
pipes[i] =new pipeline (*txns[i]);
103
124
queries[i] = pipes[i]->insert (sql);
104
125
}
105
- sleep (cfg.timeout );
106
- enabledMask =0 ;
107
- for (size_t i =0 ; i < nConns; i++) {
108
- if (!BIT_CHECK (didsabledMask, i)) {
109
- if (!pipes[i]->is_finished (queries[i]))
110
- {
111
- fprintf (stderr," Doesn't receive response from node %d within %d seconds\n " , (int )i+1 , cfs.timeout );
112
- BIT_SET (disabledMask, i);
113
- delete conns[i];
126
+ }
127
+ // Wait some time
128
+ usleep (cfg.timeout );
129
+ oldEnabledMask = newEnabledMask;
130
+ newEnabledMask = ~0 ;
131
+ for (size_t i =0 ; i < nConns; i++) {
132
+ if (!BIT_CHECK (disabledMask, i)) {
133
+ if (!pipes[i]->is_finished (queries[i])) {
134
+ fprintf (stderr," Doesn't receive response from node %d within %d microseconds\n " , (int )i+1 , cfg.timeout );
135
+ BIT_SET (disabledMask, i);
136
+ conns[i] =NULL ;
137
+ }else {
138
+ try {
139
+ result r = pipes[i]->retrieve (queries[i]);
140
+ newEnabledMask &= r[0 ][0 ].as (nodemask_t ());
141
+ }catch (pqxx_exceptionconst & x) {
114
142
conns[i] =NULL ;
115
- }else {
116
- try {
117
- result r = pipes[i]->retrieve (results[i]);
118
- enabledMask |= r[0 ][0 ].as (nodemask_t ());
119
- }catch (pqxx_exceptionconst & x) {
120
- delete conns[i];
121
- conns[i] =NULL ;
122
- fprintf (stderr," Failed to retrieve result from node %d: %s\n " , (int )i+1 , x.base ().what ());
123
- }
124
- }
143
+ fprintf (stderr," Failed to retrieve result from node %d: %s\n " , (int )i+1 , x.base ().what ());
144
+ }
125
145
}
126
146
}
127
147
}
148
+ if (newEnabledMask == ~0 ) {
149
+ if (oldEnabledMask != ~0 ) {
150
+ fprintf (stdout," There are no more live nodes\n " );
151
+ }
152
+ // No live nodes:
153
+ disabledNodeMask =0 ;
154
+ }else {
155
+ if (newEnabledMask != oldEnabledMask) {
156
+ for (size_t i =0 ; i < nConns; i++) {
157
+ if (BIT_CHECK (newEnabledMask ^ oldEnabledMask, i)) {
158
+ fprintf (stdout," Node %d is %s\n " , (int )i+1 ,BIT_CHECK (newEnabledMask, i) ?" enabled" :" disabled" );
159
+ }
160
+ }
161
+ }
162
+ }
128
163
}
129
164
}