@@ -3,12 +3,35 @@ package Cluster;
33use strict;
44use warnings;
55
6- use Proc::ProcessTable;
76use PostgresNode;
87use TestLib;
98use Test::More;
109use Cwd;
1110
11+ use Socket;
12+
13+ use IPC::Run;
14+
15+ sub check_port
16+ {
17+ my ($host ,$port ) =@_ ;
18+ my $iaddr = inet_aton($host );
19+ my $paddr = sockaddr_in($port ,$iaddr );
20+ my $proto =getprotobyname (" tcp" );
21+ my $available = 0;
22+
23+ socket (SOCK, PF_INET, SOCK_STREAM,$proto )
24+ or die " socket failed:$! " ;
25+
26+ if (bind (SOCK,$paddr ) &&listen (SOCK, SOMAXCONN))
27+ {
28+ $available = 1;
29+ }
30+
31+ close (SOCK);
32+ return $available ;
33+ }
34+
1235my %allocated_ports = ();
1336sub allocate_ports
1437{
@@ -19,8 +42,7 @@ sub allocate_ports
1942{
2043my $port =int (rand () * 16384) + 49152;
2144next if $allocated_ports {$port };
22- diag(" checking for port$port \n " );
23- if (!TestLib::run_log([' pg_isready' ,' -h' ,$host ,' -p' ,$port ]))
45+ if (check_port($host ,$port ))
2446{
2547$allocated_ports {$port } = 1;
2648push (@allocated_now ,$port );
@@ -44,6 +66,7 @@ sub new
4466my $node = new PostgresNode(" node$i " ,$host ,$pgport );
4567$node -> {id } =$i ;
4668$node -> {arbiter_port } =$arbiter_port ;
69+ $node -> {mmconnstr } =" ${\$ node->connstr('postgres') } arbiter_port=${\$ node->{arbiter_port} }" ;
4770push (@$nodes ,$node );
4871}
4972
@@ -67,48 +90,54 @@ sub init
6790}
6891}
6992
93+ sub all_connstrs
94+ {
95+ my ($self ) =@_ ;
96+ my $nodes =$self -> {nodes };
97+ return join (' ,' ,map {" ${\$ _->connstr('postgres') } arbiter_port=${\$ _->{arbiter_port} }" }@$nodes );
98+ }
99+
100+
70101sub configure
71102{
72103my ($self ) =@_ ;
73104my $nodes =$self -> {nodes };
74- my $nnodes =scalar @{$nodes };
75105
76- my $connstr =join ( ' , ' , map { " ${ \$ _->connstr('postgres') } arbiter_port=${ \$ _->{arbiter_port} } " } @$nodes );
106+ my $connstr =$self -> all_connstrs( );
77107
78108foreach my $node (@$nodes )
79109{
80110my $id =$node -> {id };
81111my $host =$node -> host;
82112my $pgport =$node -> port;
83113my $arbiter_port =$node -> {arbiter_port };
114+ my $unix_sock_dir =$ENV {PGHOST };
84115
85116$node -> append_conf(" postgresql.conf" ,qq(
86117log_statement = none
87118listen_addresses = '$host '
88- unix_socket_directories = ''
119+ unix_socket_directories = '$unix_sock_dir '
89120port =$pgport
90- max_prepared_transactions =200
91- max_connections =200
121+ max_prepared_transactions =10
122+ max_connections =10
92123max_worker_processes = 100
93124wal_level = logical
94- fsync = off
95- max_wal_senders = 10
125+ max_wal_senders = 6
96126wal_sender_timeout = 0
97127default_transaction_isolation = 'repeatable read'
98- max_replication_slots =10
128+ max_replication_slots =6
99129shared_preload_libraries = 'multimaster'
130+ shared_buffers = 16MB
100131
101132multimaster.arbiter_port =$arbiter_port
102- multimaster.workers = 10
103- multimaster.queue_size = 10485760 # 10mb
133+ multimaster.workers = 1
104134multimaster.node_id =$id
105135multimaster.conn_strings = '$connstr '
106- multimaster.heartbeat_recv_timeout =1000
136+ multimaster.heartbeat_recv_timeout =1050
107137multimaster.heartbeat_send_timeout = 250
108- multimaster.max_nodes =$nnodes
109- multimaster.ignore_tables_without_pk = true
110- multimaster.twopc_min_timeout = 50000
111- multimaster.min_2pc_timeout = 50000
138+ multimaster.max_nodes = 6
139+ multimaster.ignore_tables_without_pk = false
140+ multimaster.queue_size = 4194304
112141log_line_prefix = '%t : '
113142) );
114143
@@ -128,6 +157,7 @@ sub start
128157foreach my $node (@$nodes )
129158{
130159$node -> start();
160+ note(" Starting node with connstr 'dbname=postgres port=@{[$node ->port() ]} host=@{[$node ->host() ]}'" );
131161}
132162}
133163
@@ -137,7 +167,7 @@ sub stopnode
137167return 1unless defined $node -> {_pid };
138168$mode =' fast' unless defined $mode ;
139169my $name =$node -> name;
140- diag (" stopping$name ${mode} ly" );
170+ note (" stopping$name ${mode} ly" );
141171
142172if ($mode eq ' kill' ) {
143173killtree($node -> {_pid });
@@ -147,13 +177,13 @@ sub stopnode
147177my $pgdata =$node -> data_dir;
148178my $ret = TestLib::system_log(' pg_ctl' ,' -D' ,$pgdata ,' -m' ,' fast' ,' stop' );
149179my $pidfile =$node -> data_dir ." /postmaster.pid" ;
150- diag (" unlink$pidfile " );
180+ note (" unlink$pidfile " );
151181unlink $pidfile ;
152182$node -> {_pid } =undef ;
153183$node -> _update_pid;
154184
155185if ($ret != 0) {
156- diag (" $name failed to stop${mode} ly" );
186+ note (" $name failed to stop${mode} ly" );
157187return 0;
158188}
159189
@@ -166,43 +196,22 @@ sub stopid
166196return stopnode($self -> {nodes }-> [$idx ]);
167197}
168198
169- sub killtree
199+ sub dumplogs
170200{
171- my $root =shift ;
172- diag(" killtree$root \n " );
173-
174- my $t = new Proc::ProcessTable;
175-
176- my %parent = ();
177- # my %cmd = ();
178- foreach my $p (@{$t -> table}) {
179- $parent {$p -> pid} =$p -> ppid;
180- # $cmd{$p->pid} = $p->cmndline;
181- }
201+ my ($self ) =@_ ;
202+ my $nodes =$self -> {nodes };
182203
183- if (!defined $root ) {
184- return ;
185- }
186- my @queue = ($root );
187- my @killist = ();
188-
189- while (scalar @queue ) {
190- my $victim =shift @queue ;
191- while (my ($pid ,$ppid ) =each %parent ) {
192- if ($ppid ==$victim ) {
193- push @queue ,$pid ;
194- }
195- }
196- diag(" SIGSTOP to$victim " );
197- kill ' STOP' ,$victim ;
198- unshift @killist ,$victim ;
204+ note(" Dumping logs:" );
205+ foreach my $node (@$nodes ) {
206+ note(" ##################################################################" );
207+ note($node -> {_logfile });
208+ note(" ##################################################################" );
209+ my $filename =$node -> {_logfile };
210+ open my $fh ,' <' ,$filename or die " error opening$filename :$! " ;
211+ my $data =do {local $/ ; <$fh > };
212+ note($data );
213+ note(" ##################################################################\n\n " );
199214}
200-
201- diag(" SIGKILL to" .join (' ' ,@killist ));
202- kill ' KILL' ,@killist ;
203- # foreach my $victim (@killist) {
204- # print("kill $victim " . $cmd{$victim} . "\n");
205- # }
206215}
207216
208217sub stop
@@ -211,34 +220,32 @@ sub stop
211220my $nodes =$self -> {nodes };
212221$mode =' fast' unless defined $mode ;
213222
214- diag(" Dumping logs:" );
215- foreach my $node (@$nodes ) {
216- diag(" ##################################################################" );
217- diag($node -> {_logfile });
218- diag(" ##################################################################" );
219- my $filename =$node -> {_logfile };
220- open my $fh ,' <' ,$filename or die " error opening$filename :$! " ;
221- my $data =do {local $/ ; <$fh > };
222- diag($data );
223- diag(" ##################################################################\n\n " );
224- }
225-
226223my $ok = 1;
227- diag (" stopping cluster${mode} ly" );
224+ note (" stopping cluster${mode} ly" );
228225
229226foreach my $node (@$nodes ) {
230227if (!stopnode($node ,$mode )) {
231228$ok = 0;
232- if (!stopnode($node ,' kill' )) {
233- my $name =$node -> name;
234- BAIL_OUT(" failed to kill$name " );
235- }
229+ # if (!stopnode($node, 'kill')) {
230+ # my $name = $node->name;
231+ # BAIL_OUT("failed to kill $name");
232+ # }
236233}
237234}
238235sleep (2);
236+
237+ $self -> dumplogs();
238+
239239return $ok ;
240240}
241241
242+ sub bail_out_with_logs
243+ {
244+ my ($self ,$msg ) =@_ ;
245+ $self -> dumplogs();
246+ BAIL_OUT($msg );
247+ }
248+
242249sub teardown
243250{
244251my ($self ) =@_ ;
@@ -269,10 +276,127 @@ sub poll
269276return 1;
270277}
271278my $tries_left =$tries -$i - 1;
272- diag (" $poller poll for$pollee failed [$tries_left tries left]" );
279+ note (" $poller poll for$pollee failed [$tries_left tries left]" );
273280sleep ($delay );
274281}
275282return 0;
276283}
277284
285+ sub pgbench ()
286+ {
287+ my ($self ,$node ,@args ) =@_ ;
288+ my $pgbench_handle =$self -> pgbench_async($node ,@args );
289+ $self -> pgbench_await($pgbench_handle );
290+ }
291+
292+ sub pgbench_async ()
293+ {
294+ my ($self ,$node ,@args ) =@_ ;
295+
296+ my ($in ,$out ,$err ,$rc );
297+ $in =' ' ;
298+ $out =' ' ;
299+
300+ my @pgbench_command = (
301+ ' pgbench' ,
302+ @args ,
303+ -h => $self -> {nodes }-> [$node ]-> host(),
304+ -p => $self -> {nodes }-> [$node ]-> port(),
305+ ' postgres' ,
306+ );
307+ note(" running pgbench:" .join (" " ,@pgbench_command ));
308+ my $handle = IPC::Run::start(\@pgbench_command ,$in ,$out );
309+ return $handle ;
310+ }
311+
312+ sub pgbench_await ()
313+ {
314+ my ($self ,$pgbench_handle ) =@_ ;
315+ IPC::Run::finish($pgbench_handle ) || BAIL_OUT(" pgbench exited with$? " );
316+ }
317+
318+ sub is_data_identic ()
319+ {
320+ my ($self ,@nodenums ) =@_ ;
321+ my $checksum =' ' ;
322+
323+ my $sql =" select md5('(' || string_agg(aid::text || ', ' || abalance::text , '),(') || ')')
324+ from (select * from pgbench_accounts order by aid) t;" ;
325+
326+ foreach my $i (@nodenums )
327+ {
328+ my $current_hash =' ' ;
329+ $self -> {nodes }-> [$i ]-> psql(' postgres' ,$sql ,stdout => \$current_hash );
330+ if ($current_hash eq ' ' )
331+ {
332+ note(" got empty hash from node$i " );
333+ return 0;
334+ }
335+ if ($checksum eq ' ' )
336+ {
337+ $checksum =$current_hash ;
338+ }
339+ elsif ($checksum ne $current_hash )
340+ {
341+ note(" got different hashes:$checksum ang$current_hash " );
342+ return 0;
343+ }
344+ }
345+
346+ note($checksum );
347+ return 1;
348+ }
349+
350+ sub add_node ()
351+ {
352+ my ($self ,%params ) =@_ ;
353+
354+ my $pgport ;
355+ my $arbiter_port ;
356+ my $connstrs ;
357+ my $node_id ;
358+
359+ if (defined $params {node_id })
360+ {
361+ $node_id =$params {node_id };
362+ $pgport =$params {port };
363+ $arbiter_port =$params {arbiter_port };
364+ $connstrs =$self -> all_connstrs();
365+ }
366+ else
367+ {
368+ $node_id =scalar (@{$self -> {nodes }}) + 1;
369+ $pgport = (allocate_ports(' 127.0.0.1' , 1))[0];
370+ $arbiter_port = (allocate_ports(' 127.0.0.1' , 1))[0];
371+ $connstrs =$self -> all_connstrs() ." , dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port " ;
372+ }
373+
374+ my $node = PostgresNode-> get_new_node(" node${node_id} x" );
375+
376+ $self -> {nodes }-> [0]-> backup(" backup_for_$node_id " );
377+ # do init from backup before setting host, since init_from_backup() checks
378+ # it default value
379+ $node -> init_from_backup($self -> {nodes }-> [0]," backup_for_$node_id " );
380+
381+ $node -> {_host } =' 127.0.0.1' ;
382+ $node -> {_port } =$pgport ;
383+ $node -> {port } =$pgport ;
384+ $node -> {host } =' 127.0.0.1' ;
385+ $node -> {arbiter_port } =$arbiter_port ;
386+ $node -> {mmconnstr } =" ${\$ node->connstr('postgres') } arbiter_port=${\$ node->{arbiter_port} }" ;
387+ $node -> append_conf(" postgresql.conf" ,qq(
388+ multimaster.arbiter_port =$arbiter_port
389+ multimaster.conn_strings = '$connstrs '
390+ multimaster.node_id =$node_id
391+ port =$pgport
392+ ) );
393+ $node -> append_conf(" pg_hba.conf" ,qq(
394+ local replication all trust
395+ host replication all 127.0.0.1/32 trust
396+ host replication all ::1/128 trust
397+ ) );
398+
399+ push (@{$self -> {nodes }},$node );
400+ }
401+
2784021;