Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitced1d58

Browse files
author
Vladimir Ershov
committed
proper termination
1 parent1ab15cf commitced1d58

File tree

5 files changed

+184
-3
lines changed

5 files changed

+184
-3
lines changed

‎src/scheduler_executor.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ handle_sigterm(SIGNAL_ARGS)
6565
}
6666

6767
errno=save_errno;
68-
proc_exit(0);
68+
/*
69+
* Do not need to exit at once
70+
* CHECK_FOR_INTERRUPTS will do cleanup and exits
71+
*/
6972
}
7073

7174
intread_worker_job_limit(void)
@@ -133,7 +136,9 @@ void executor_worker_main(Datum arg)
133136
ProcessConfigFile(PGC_SIGHUP);
134137
worker_jobs_limit=read_worker_job_limit();
135138
}
139+
CHECK_FOR_INTERRUPTS();
136140
result=do_one_job(shared,&status);
141+
CHECK_FOR_INTERRUPTS();
137142
if(result>0)
138143
{
139144
if(++jobs_done >=worker_jobs_limit)
@@ -207,6 +212,7 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
207212

208213
pgstat_report_activity(STATE_RUNNING,"initialize job");
209214
job=initializeExecutorJob(shared);
215+
CHECK_FOR_INTERRUPTS();
210216
if(!job)
211217
{
212218
if(shared->message[0]==0)
@@ -332,6 +338,7 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
332338

333339
SetConfigOption("schedule.transaction_state","success",PGC_INTERNAL,PGC_S_SESSION);
334340
}
341+
CHECK_FOR_INTERRUPTS();
335342
if(job->next_time_statement)
336343
{
337344
shared->next_time=get_next_excution_time(job->next_time_statement,&EE);

‎src/scheduler_manager.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,8 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10761076
TimestampTznext_time;
10771077
char*next_time_str;
10781078
char*error;
1079+
schd_remove_reason_tdie_reason=0;
1080+
BgwHandleStatusstatus;
10791081

10801082
if(p->free==p->len)return0;
10811083
busy=p->len-p->free;
@@ -1084,6 +1086,7 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10841086
for(i=0;i<busy;i++)
10851087
{
10861088
item=p->slots[i];
1089+
10871090
if(item->wait_worker_to_die)
10881091
{
10891092
toremove[nremove].pos=i;
@@ -1122,6 +1125,27 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
11221125
toremove[nremove].vanish_item= true;
11231126
nremove++;
11241127
}
1128+
else
1129+
{
1130+
die_reason=0;
1131+
status=GetBackgroundWorkerPid(item->handler,&tmppid);
1132+
if(status==BGWH_STOPPED)
1133+
{
1134+
die_reason=RmExited;
1135+
}
1136+
elseif(status==BGWH_POSTMASTER_DIED)
1137+
{
1138+
die_reason=RmDied;
1139+
}
1140+
1141+
if(die_reason)
1142+
{
1143+
toremove[nremove].pos=i;
1144+
toremove[nremove].reason=die_reason;
1145+
toremove[nremove].vanish_item= true;
1146+
nremove++;
1147+
}
1148+
}
11251149
}
11261150
}
11271151
if(nremove)
@@ -1180,6 +1204,10 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
11801204
set_job_error(ctx->mem_ctx,item->job,"unknown error occured" );
11811205
}
11821206
}
1207+
elseif(toremove[i].reason==RmExited||toremove[i].reason==RmDied)
1208+
{
1209+
set_job_error(ctx->mem_ctx,item->job,"Executor died unexpectedly (%d)",toremove[i].reason);
1210+
}
11831211
elseif(toremove[i].reason==RmFreeSlot)
11841212
{
11851213
/* Just free slot - worker exited cause it achived max job

‎src/scheduler_manager.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ typedef enum {
3333
RmWaitWorker,
3434
RmError,
3535
RmDoneResubmit,
36-
RmDone
36+
RmDone,
37+
RmExited,
38+
RmDied
3739
}schd_remove_reason_t;
3840

3941
typedefstruct {

‎test/perl/runtest.pl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@
9191
"DROP ROLE IF EXISTS tester",
9292
"CREATE ROLE tester",
9393
"GRANT INSERT ON test_results TO tester",
94+
"CREATE TABLE task_info (pid integer, name text, vanished timestamp, finished boolean default false)",
95+
"GRANT ALL ON task_info TO tester",
9496
);
9597
map { __do_sql($dbh,$_) }@sql2;
9698
$dbh->disconnect();
@@ -109,7 +111,7 @@
109111
);
110112
my$harness = TAP::Harness->new( \%args );
111113
my@tests =glob('t/*.t' );
112-
#@tests = ('t/jobMaxRunTime.t');
114+
#@tests = ('t/terminateBackend.t');
113115
$harness->runtests(@tests);
114116

115117

‎test/perl/t/terminateBackend.t

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#!/usr/bin/perl
2+
use strict;
3+
no warnings;
4+
use Test::Moretests=> 15;
5+
use DBI;
6+
use Getopt::Long;
7+
use Data::Dumper;
8+
9+
my$dbh =require't/_connect.pl';
10+
ok($dbh->err == 0,'connect')or BAIL_OUT($DBI::errstr);
11+
12+
my$query ="DELETE FROM test_results;";
13+
$dbh->do($query);
14+
ok($dbh->err == 0,'clean up test_results')or BAIL_OUT($DBI::errstr);
15+
16+
my$query ="DELETE FROM task_info;";
17+
$dbh->do($query);
18+
ok($dbh->err == 0,'clean up task_info')or BAIL_OUT($DBI::errstr);
19+
20+
one_task_do('sleeper','select pg_sleep(200)');
21+
one_task_do('writer',
22+
'DO
23+
$do$
24+
BEGIN
25+
WHILE true LOOP
26+
INSERT INTO test_results (commentary) SELECT md5(random()::text) from generate_series(1, 1000000) s(i);
27+
DELETE FROM test_results;
28+
END LOOP;
29+
END
30+
$do$',qq[, 'max_run_time', '120 seconds'] );
31+
32+
33+
$dbh->disconnect();
34+
done_testing();
35+
36+
subone_task_do
37+
{
38+
my$name =shift;
39+
my$sql_part =shift;
40+
my$add_to_task =shift;
41+
42+
$add_to_task ||='';
43+
44+
$query ="SELECT schedule.create_job(
45+
jsonb_build_object(
46+
'name', '$name' ,
47+
'cron', '* * * * *',
48+
'commands', jsonb_build_array(
49+
'insert into task_info values ( pg_backend_pid(), ''$name'')',
50+
'$sql_part',
51+
'update task_info set finished = true where pid = pg_backend_pid()'
52+
),
53+
'max_instances', 1$add_to_task
54+
)
55+
)";
56+
my$sth =$dbh->prepare($query);
57+
ok($sth->execute(),"create$name task")or BAIL_OUT($DBI::errstr);
58+
59+
my$job_id =$sth->fetchrow_array();
60+
$sth->finish();
61+
62+
my$pid = wait_for_task_to_begin($dbh,$name, 120);
63+
64+
ok($pid > 0,'find'.$name.' task started')or BAIL_OUT("failed to await task '$name' for 120s");
65+
66+
$query ="SELECT pg_terminate_backend(?)";
67+
$sth =$dbh->prepare($query);
68+
$sth->bind_param(1,$pid);
69+
ok($sth->execute(),"terminate$name job")or BAIL_OUT($DBI::errstr);
70+
$sth->finish();
71+
72+
$sth =$dbh->prepare('UPDATE task_info SET vanished = now() where pid = ?');
73+
ok($sth->execute($pid),"set$name task vanished")or BAIL_OUT(print$DBI::errstr);
74+
$sth->finish;
75+
76+
ok(find_job_exited($dbh,$job_id),"find$name exit job")or BAIL_OUT("Cannot find job$job_id exited");
77+
78+
$sth =$dbh->prepare('SELECT schedule.deactivate_job(?)');
79+
ok($sth->execute($job_id),"deactivate$name job")or BAIL_OUT("Cannot deactivate$name job");
80+
81+
}
82+
83+
subwait_for_task_to_begin
84+
{
85+
my$db =shift;
86+
my$name =shift;
87+
my$how_long =shift;
88+
89+
my$iter =$how_long;
90+
my$sth1 =$db->prepare('SELECT pid from task_info where name = ? and vanished is null and finished = false limit 1');
91+
92+
while($iter-- > 0)
93+
{
94+
if($sth1->execute($name))
95+
{
96+
my$pid =$sth1->fetchrow_array()and$sth1->finish();
97+
return$pidif$pid;
98+
}
99+
else
100+
{
101+
die$DBI::errstr;
102+
}
103+
sleep(1);
104+
}
105+
return 0;
106+
}
107+
108+
subfind_job_exited
109+
{
110+
my$d =shift;
111+
my$id =shift;
112+
113+
my$n = 20;
114+
115+
while($n-- > 0)
116+
{
117+
my$s =$d->prepare('SELECT message from schedule.log where cron = ? and status = false');
118+
if($s->execute($id))
119+
{
120+
my@data =$s->fetchrow_array()and$s->finish();
121+
if(scalar(@data))
122+
{
123+
if($data[0] =~/^Executor died unexpectedly/)
124+
{
125+
return 1;
126+
}
127+
else
128+
{
129+
return 0;
130+
}
131+
}
132+
}
133+
else
134+
{
135+
die$DBI::errstr;
136+
}
137+
sleep(1);
138+
}
139+
140+
return 0;
141+
}
142+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp