22#include < stdio.h>
33#include < stdarg.h>
44#include < stdlib.h>
5+ #include < unistd.h>
56#include < inttypes.h>
67#include < sys/time.h>
78#include < pthread.h>
@@ -34,18 +35,34 @@ class my_unique_ptr
3435public:
3536my_unique_ptr (T* p =NULL ) : ptr(p) {}
3637~my_unique_ptr () {delete ptr; }
37- T&operator *() {return *ptr; }
38- T*operator ->() {return ptr; }
39- void operator =(T* p) { ptr = p; }
38+ T&operator *()const {return *ptr; }
39+ T*operator ->()const {return ptr; }
40+ bool operator ==(Tconst * p)const {return ptr == p; }
41+ bool operator !=(Tconst * p)const {return ptr != p; }
42+ void operator =(T* p) {
43+ delete ptr;
44+ ptr = p;
45+ }
4046void operator =(my_unique_ptr& other) {
47+ delete ptr;
4148 ptr = other.ptr ;
4249 other.ptr =NULL ;
4350 }
4451};
4552
53+ struct config
54+ {
55+ int timeout;
56+ vector<string> connections;
57+
58+ config () {
59+ timeout =1000000 ;// 1 second
60+ }
61+ };
62+
4663int main (int argc,char * argv[])
4764{
48- vector<string> connection_strings ;
65+ config cfg ;
4966
5067if (argc ==1 ){
5168printf (" Use -h to show usage options\n " );
@@ -56,26 +73,27 @@ int main (int argc, char* argv[])
5673if (argv[i][0 ] ==' -' ) {
5774switch (argv[i][1 ]) {
5875case ' t' :
59- cfs .timeout =atoi (argv[++i]);
76+ cfg .timeout =atoi (argv[++i]);
6077continue ;
6178case ' c' :
6279 cfg.connections .push_back (string (argv[++i]));
6380continue ;
6481}
6582 }
6683printf (" Options:\n "
67- " \t -t TIMEOUT\t timeout inseconds of waiting database connection string\n "
84+ " \t -t TIMEOUT\t timeout inmicroseconds of waiting database connection string (default: 1 second) \n "
6885" \t -c STR\t database connection string\n " );
6986return 1 ;
7087 }
7188
72- size_t nConns =connection_strings .size ();
89+ size_t nConns =cfg. connections .size ();
7390 vector< my_unique_ptr<connection> >conns (nConns);
7491for (size_t i =0 ; i < nConns; i++) {
75- conns[i] =new connection (connection_strings [i]);
92+ conns[i] =new connection (cfg. connections [i]);
7693 }
7794nodemask_t disabledMask =0 ;
78- nodemask_t enabledMask =0 ;
95+ nodemask_t newEnabledMask =0 ;
96+ nodemask_t oldEnabledMask =0 ;
7997
8098while (true ) {
8199 vector< my_unique_ptr<nontransaction> >txns (conns.size ());
@@ -84,46 +102,63 @@ int main (int argc, char* argv[])
84102char sql[128 ];
85103sprintf (sql," select mtm.arbitrator_poll(%lld)" , disabledMask);
86104
105+ // Initiate queries to all live nodes
87106for (size_t i =0 ; i < nConns; i++) {
88- if (BIT_CHECK (disabledMask, i)) {
89- if (BIT_CHECK (enabledMask, i)) {
107+ // Some of live node reestablished connection with dead node, so arbitrator should also try to connect to this node
108+ if (conns[i] ==NULL ) {
109+ if (BIT_CHECK (newEnabledMask, i)) {
90110try {
91- delete conns[i];
92- conns[i] =new connection (connection_strings[i]);
111+ conns[i] =new connection (cfg.connections [i]);
93112BIT_CLEAR (disabledMask, i);
113+ fprintf (stdout," Reestablish connection with node %d\n " , (int )i+1 );
94114}catch (pqxx_exceptionconst & x) {
95- conns[i] =NULL ;
96- fprintf (stderr," Failed to connect to node %d: %s\n " , (int )i+1 , x.base ().what ());
115+ if (conns[i] ==NULL ) {
116+ conns[i] =NULL ;
117+ fprintf (stderr," Failed to connect to node %d: %s\n " , (int )i+1 , x.base ().what ());
118+ }
97119}
98120}
99- }
100- if (!BIT_CHECK (disabledMask, i)) {
121+ }else {
101122txns[i] =new nontransaction (*conns[i]);
102123pipes[i] =new pipeline (*txns[i]);
103124queries[i] = pipes[i]->insert (sql);
104125}
105- sleep (cfg.timeout );
106- enabledMask =0 ;
107- for (size_t i =0 ; i < nConns; i++) {
108- if (!BIT_CHECK (didsabledMask, i)) {
109- if (!pipes[i]->is_finished (queries[i]))
110- {
111- fprintf (stderr," Doesn't receive response from node %d within %d seconds\n " , (int )i+1 , cfs.timeout );
112- BIT_SET (disabledMask, i);
113- delete conns[i];
126+ }
127+ // Wait some time
128+ usleep (cfg.timeout );
129+ oldEnabledMask = newEnabledMask;
130+ newEnabledMask = ~0 ;
131+ for (size_t i =0 ; i < nConns; i++) {
132+ if (!BIT_CHECK (disabledMask, i)) {
133+ if (!pipes[i]->is_finished (queries[i])) {
134+ fprintf (stderr," Doesn't receive response from node %d within %d microseconds\n " , (int )i+1 , cfg.timeout );
135+ BIT_SET (disabledMask, i);
136+ conns[i] =NULL ;
137+ }else {
138+ try {
139+ result r = pipes[i]->retrieve (queries[i]);
140+ newEnabledMask &= r[0 ][0 ].as (nodemask_t ());
141+ }catch (pqxx_exceptionconst & x) {
114142conns[i] =NULL ;
115- }else {
116- try {
117- result r = pipes[i]->retrieve (results[i]);
118- enabledMask |= r[0 ][0 ].as (nodemask_t ());
119- }catch (pqxx_exceptionconst & x) {
120- delete conns[i];
121- conns[i] =NULL ;
122- fprintf (stderr," Failed to retrieve result from node %d: %s\n " , (int )i+1 , x.base ().what ());
123- }
124- }
143+ fprintf (stderr," Failed to retrieve result from node %d: %s\n " , (int )i+1 , x.base ().what ());
144+ }
125145}
126146}
127147}
148+ if (newEnabledMask == ~0 ) {
149+ if (oldEnabledMask != ~0 ) {
150+ fprintf (stdout," There are no more live nodes\n " );
151+ }
152+ // No live nodes:
153+ disabledNodeMask =0 ;
154+ }else {
155+ if (newEnabledMask != oldEnabledMask) {
156+ for (size_t i =0 ; i < nConns; i++) {
157+ if (BIT_CHECK (newEnabledMask ^ oldEnabledMask, i)) {
158+ fprintf (stdout," Node %d is %s\n " , (int )i+1 ,BIT_CHECK (newEnabledMask, i) ?" enabled" :" disabled" );
159+ }
160+ }
161+ }
162+ }
128163}
129164}