@@ -3,12 +3,35 @@ package Cluster;
3
3
use strict;
4
4
use warnings;
5
5
6
- use Proc::ProcessTable;
7
6
use PostgresNode;
8
7
use TestLib;
9
8
use Test::More;
10
9
use Cwd;
11
10
11
+ use Socket;
12
+
13
+ use IPC::Run;
14
+
15
+ sub check_port
16
+ {
17
+ my ($host ,$port ) =@_ ;
18
+ my $iaddr = inet_aton($host );
19
+ my $paddr = sockaddr_in($port ,$iaddr );
20
+ my $proto =getprotobyname (" tcp" );
21
+ my $available = 0;
22
+
23
+ socket (SOCK, PF_INET, SOCK_STREAM,$proto )
24
+ or die " socket failed:$! " ;
25
+
26
+ if (bind (SOCK,$paddr ) &&listen (SOCK, SOMAXCONN))
27
+ {
28
+ $available = 1;
29
+ }
30
+
31
+ close (SOCK);
32
+ return $available ;
33
+ }
34
+
12
35
my %allocated_ports = ();
13
36
sub allocate_ports
14
37
{
@@ -19,8 +42,7 @@ sub allocate_ports
19
42
{
20
43
my $port =int (rand () * 16384) + 49152;
21
44
next if $allocated_ports {$port };
22
- diag(" checking for port$port \n " );
23
- if (!TestLib::run_log([' pg_isready' ,' -h' ,$host ,' -p' ,$port ]))
45
+ if (check_port($host ,$port ))
24
46
{
25
47
$allocated_ports {$port } = 1;
26
48
push (@allocated_now ,$port );
@@ -44,6 +66,7 @@ sub new
44
66
my $node = new PostgresNode(" node$i " ,$host ,$pgport );
45
67
$node -> {id } =$i ;
46
68
$node -> {arbiter_port } =$arbiter_port ;
69
+ $node -> {mmconnstr } =" ${\$ node->connstr('postgres') } arbiter_port=${\$ node->{arbiter_port} }" ;
47
70
push (@$nodes ,$node );
48
71
}
49
72
@@ -67,48 +90,54 @@ sub init
67
90
}
68
91
}
69
92
93
+ sub all_connstrs
94
+ {
95
+ my ($self ) =@_ ;
96
+ my $nodes =$self -> {nodes };
97
+ return join (' ,' ,map {" ${\$ _->connstr('postgres') } arbiter_port=${\$ _->{arbiter_port} }" }@$nodes );
98
+ }
99
+
100
+
70
101
sub configure
71
102
{
72
103
my ($self ) =@_ ;
73
104
my $nodes =$self -> {nodes };
74
- my $nnodes =scalar @{$nodes };
75
105
76
- my $connstr =join ( ' , ' , map { " ${ \$ _->connstr('postgres') } arbiter_port=${ \$ _->{arbiter_port} } " } @$nodes );
106
+ my $connstr =$self -> all_connstrs( );
77
107
78
108
foreach my $node (@$nodes )
79
109
{
80
110
my $id =$node -> {id };
81
111
my $host =$node -> host;
82
112
my $pgport =$node -> port;
83
113
my $arbiter_port =$node -> {arbiter_port };
114
+ my $unix_sock_dir =$ENV {PGHOST };
84
115
85
116
$node -> append_conf(" postgresql.conf" ,qq(
86
117
log_statement = none
87
118
listen_addresses = '$host '
88
- unix_socket_directories = ''
119
+ unix_socket_directories = '$unix_sock_dir '
89
120
port =$pgport
90
- max_prepared_transactions =200
91
- max_connections =200
121
+ max_prepared_transactions =10
122
+ max_connections =10
92
123
max_worker_processes = 100
93
124
wal_level = logical
94
- fsync = off
95
- max_wal_senders = 10
125
+ max_wal_senders = 6
96
126
wal_sender_timeout = 0
97
127
default_transaction_isolation = 'repeatable read'
98
- max_replication_slots =10
128
+ max_replication_slots =6
99
129
shared_preload_libraries = 'multimaster'
130
+ shared_buffers = 16MB
100
131
101
132
multimaster.arbiter_port =$arbiter_port
102
- multimaster.workers = 10
103
- multimaster.queue_size = 10485760 # 10mb
133
+ multimaster.workers = 1
104
134
multimaster.node_id =$id
105
135
multimaster.conn_strings = '$connstr '
106
- multimaster.heartbeat_recv_timeout =1000
136
+ multimaster.heartbeat_recv_timeout =1050
107
137
multimaster.heartbeat_send_timeout = 250
108
- multimaster.max_nodes =$nnodes
109
- multimaster.ignore_tables_without_pk = true
110
- multimaster.twopc_min_timeout = 50000
111
- multimaster.min_2pc_timeout = 50000
138
+ multimaster.max_nodes = 6
139
+ multimaster.ignore_tables_without_pk = false
140
+ multimaster.queue_size = 4194304
112
141
log_line_prefix = '%t : '
113
142
) );
114
143
@@ -128,6 +157,7 @@ sub start
128
157
foreach my $node (@$nodes )
129
158
{
130
159
$node -> start();
160
+ note(" Starting node with connstr 'dbname=postgres port=@{[$node ->port() ]} host=@{[$node ->host() ]}'" );
131
161
}
132
162
}
133
163
@@ -137,7 +167,7 @@ sub stopnode
137
167
return 1unless defined $node -> {_pid };
138
168
$mode =' fast' unless defined $mode ;
139
169
my $name =$node -> name;
140
- diag (" stopping$name ${mode} ly" );
170
+ note (" stopping$name ${mode} ly" );
141
171
142
172
if ($mode eq ' kill' ) {
143
173
killtree($node -> {_pid });
@@ -147,13 +177,13 @@ sub stopnode
147
177
my $pgdata =$node -> data_dir;
148
178
my $ret = TestLib::system_log(' pg_ctl' ,' -D' ,$pgdata ,' -m' ,' fast' ,' stop' );
149
179
my $pidfile =$node -> data_dir ." /postmaster.pid" ;
150
- diag (" unlink$pidfile " );
180
+ note (" unlink$pidfile " );
151
181
unlink $pidfile ;
152
182
$node -> {_pid } =undef ;
153
183
$node -> _update_pid;
154
184
155
185
if ($ret != 0) {
156
- diag (" $name failed to stop${mode} ly" );
186
+ note (" $name failed to stop${mode} ly" );
157
187
return 0;
158
188
}
159
189
@@ -166,43 +196,22 @@ sub stopid
166
196
return stopnode($self -> {nodes }-> [$idx ]);
167
197
}
168
198
169
- sub killtree
199
+ sub dumplogs
170
200
{
171
- my $root =shift ;
172
- diag(" killtree$root \n " );
173
-
174
- my $t = new Proc::ProcessTable;
175
-
176
- my %parent = ();
177
- # my %cmd = ();
178
- foreach my $p (@{$t -> table}) {
179
- $parent {$p -> pid} =$p -> ppid;
180
- # $cmd{$p->pid} = $p->cmndline;
181
- }
201
+ my ($self ) =@_ ;
202
+ my $nodes =$self -> {nodes };
182
203
183
- if (!defined $root ) {
184
- return ;
185
- }
186
- my @queue = ($root );
187
- my @killist = ();
188
-
189
- while (scalar @queue ) {
190
- my $victim =shift @queue ;
191
- while (my ($pid ,$ppid ) =each %parent ) {
192
- if ($ppid ==$victim ) {
193
- push @queue ,$pid ;
194
- }
195
- }
196
- diag(" SIGSTOP to$victim " );
197
- kill ' STOP' ,$victim ;
198
- unshift @killist ,$victim ;
204
+ note(" Dumping logs:" );
205
+ foreach my $node (@$nodes ) {
206
+ note(" ##################################################################" );
207
+ note($node -> {_logfile });
208
+ note(" ##################################################################" );
209
+ my $filename =$node -> {_logfile };
210
+ open my $fh ,' <' ,$filename or die " error opening$filename :$! " ;
211
+ my $data =do {local $/ ; <$fh > };
212
+ note($data );
213
+ note(" ##################################################################\n\n " );
199
214
}
200
-
201
- diag(" SIGKILL to" .join (' ' ,@killist ));
202
- kill ' KILL' ,@killist ;
203
- # foreach my $victim (@killist) {
204
- # print("kill $victim " . $cmd{$victim} . "\n");
205
- # }
206
215
}
207
216
208
217
sub stop
@@ -211,34 +220,32 @@ sub stop
211
220
my $nodes =$self -> {nodes };
212
221
$mode =' fast' unless defined $mode ;
213
222
214
- diag(" Dumping logs:" );
215
- foreach my $node (@$nodes ) {
216
- diag(" ##################################################################" );
217
- diag($node -> {_logfile });
218
- diag(" ##################################################################" );
219
- my $filename =$node -> {_logfile };
220
- open my $fh ,' <' ,$filename or die " error opening$filename :$! " ;
221
- my $data =do {local $/ ; <$fh > };
222
- diag($data );
223
- diag(" ##################################################################\n\n " );
224
- }
225
-
226
223
my $ok = 1;
227
- diag (" stopping cluster${mode} ly" );
224
+ note (" stopping cluster${mode} ly" );
228
225
229
226
foreach my $node (@$nodes ) {
230
227
if (!stopnode($node ,$mode )) {
231
228
$ok = 0;
232
- if (!stopnode($node ,' kill' )) {
233
- my $name =$node -> name;
234
- BAIL_OUT(" failed to kill$name " );
235
- }
229
+ # if (!stopnode($node, 'kill')) {
230
+ # my $name = $node->name;
231
+ # BAIL_OUT("failed to kill $name");
232
+ # }
236
233
}
237
234
}
238
235
sleep (2);
236
+
237
+ $self -> dumplogs();
238
+
239
239
return $ok ;
240
240
}
241
241
242
+ sub bail_out_with_logs
243
+ {
244
+ my ($self ,$msg ) =@_ ;
245
+ $self -> dumplogs();
246
+ BAIL_OUT($msg );
247
+ }
248
+
242
249
sub teardown
243
250
{
244
251
my ($self ) =@_ ;
@@ -269,10 +276,127 @@ sub poll
269
276
return 1;
270
277
}
271
278
my $tries_left =$tries -$i - 1;
272
- diag (" $poller poll for$pollee failed [$tries_left tries left]" );
279
+ note (" $poller poll for$pollee failed [$tries_left tries left]" );
273
280
sleep ($delay );
274
281
}
275
282
return 0;
276
283
}
277
284
285
+ sub pgbench ()
286
+ {
287
+ my ($self ,$node ,@args ) =@_ ;
288
+ my $pgbench_handle =$self -> pgbench_async($node ,@args );
289
+ $self -> pgbench_await($pgbench_handle );
290
+ }
291
+
292
+ sub pgbench_async ()
293
+ {
294
+ my ($self ,$node ,@args ) =@_ ;
295
+
296
+ my ($in ,$out ,$err ,$rc );
297
+ $in =' ' ;
298
+ $out =' ' ;
299
+
300
+ my @pgbench_command = (
301
+ ' pgbench' ,
302
+ @args ,
303
+ -h => $self -> {nodes }-> [$node ]-> host(),
304
+ -p => $self -> {nodes }-> [$node ]-> port(),
305
+ ' postgres' ,
306
+ );
307
+ note(" running pgbench:" .join (" " ,@pgbench_command ));
308
+ my $handle = IPC::Run::start(\@pgbench_command ,$in ,$out );
309
+ return $handle ;
310
+ }
311
+
312
+ sub pgbench_await ()
313
+ {
314
+ my ($self ,$pgbench_handle ) =@_ ;
315
+ IPC::Run::finish($pgbench_handle ) || BAIL_OUT(" pgbench exited with$? " );
316
+ }
317
+
318
+ sub is_data_identic ()
319
+ {
320
+ my ($self ,@nodenums ) =@_ ;
321
+ my $checksum =' ' ;
322
+
323
+ my $sql =" select md5('(' || string_agg(aid::text || ', ' || abalance::text , '),(') || ')')
324
+ from (select * from pgbench_accounts order by aid) t;" ;
325
+
326
+ foreach my $i (@nodenums )
327
+ {
328
+ my $current_hash =' ' ;
329
+ $self -> {nodes }-> [$i ]-> psql(' postgres' ,$sql ,stdout => \$current_hash );
330
+ if ($current_hash eq ' ' )
331
+ {
332
+ note(" got empty hash from node$i " );
333
+ return 0;
334
+ }
335
+ if ($checksum eq ' ' )
336
+ {
337
+ $checksum =$current_hash ;
338
+ }
339
+ elsif ($checksum ne $current_hash )
340
+ {
341
+ note(" got different hashes:$checksum ang$current_hash " );
342
+ return 0;
343
+ }
344
+ }
345
+
346
+ note($checksum );
347
+ return 1;
348
+ }
349
+
350
+ sub add_node ()
351
+ {
352
+ my ($self ,%params ) =@_ ;
353
+
354
+ my $pgport ;
355
+ my $arbiter_port ;
356
+ my $connstrs ;
357
+ my $node_id ;
358
+
359
+ if (defined $params {node_id })
360
+ {
361
+ $node_id =$params {node_id };
362
+ $pgport =$params {port };
363
+ $arbiter_port =$params {arbiter_port };
364
+ $connstrs =$self -> all_connstrs();
365
+ }
366
+ else
367
+ {
368
+ $node_id =scalar (@{$self -> {nodes }}) + 1;
369
+ $pgport = (allocate_ports(' 127.0.0.1' , 1))[0];
370
+ $arbiter_port = (allocate_ports(' 127.0.0.1' , 1))[0];
371
+ $connstrs =$self -> all_connstrs() ." , dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port " ;
372
+ }
373
+
374
+ my $node = PostgresNode-> get_new_node(" node${node_id} x" );
375
+
376
+ $self -> {nodes }-> [0]-> backup(" backup_for_$node_id " );
377
+ # do init from backup before setting host, since init_from_backup() checks
378
+ # it default value
379
+ $node -> init_from_backup($self -> {nodes }-> [0]," backup_for_$node_id " );
380
+
381
+ $node -> {_host } =' 127.0.0.1' ;
382
+ $node -> {_port } =$pgport ;
383
+ $node -> {port } =$pgport ;
384
+ $node -> {host } =' 127.0.0.1' ;
385
+ $node -> {arbiter_port } =$arbiter_port ;
386
+ $node -> {mmconnstr } =" ${\$ node->connstr('postgres') } arbiter_port=${\$ node->{arbiter_port} }" ;
387
+ $node -> append_conf(" postgresql.conf" ,qq(
388
+ multimaster.arbiter_port =$arbiter_port
389
+ multimaster.conn_strings = '$connstrs '
390
+ multimaster.node_id =$node_id
391
+ port =$pgport
392
+ ) );
393
+ $node -> append_conf(" pg_hba.conf" ,qq(
394
+ local replication all trust
395
+ host replication all 127.0.0.1/32 trust
396
+ host replication all ::1/128 trust
397
+ ) );
398
+
399
+ push (@{$self -> {nodes }},$node );
400
+ }
401
+
278
402
1;