Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitab48faa

Browse files
committed
One pipeline run now starts exactly one pipeline process. Added scheduler tests. Introduced an interface for pipeline which makes testing easier. Removed single job log view.
1 parent22375b5 commitab48faa

File tree

8 files changed

+334
-261
lines changed

8 files changed

+334
-261
lines changed

‎cmd/gaia/main.go‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/gaia-pipeline/gaia"
1414
"github.com/gaia-pipeline/gaia/handlers"
1515
"github.com/gaia-pipeline/gaia/pipeline"
16+
"github.com/gaia-pipeline/gaia/plugin"
1617
scheduler"github.com/gaia-pipeline/gaia/scheduler"
1718
"github.com/gaia-pipeline/gaia/store"
1819
hclog"github.com/hashicorp/go-hclog"
@@ -132,8 +133,11 @@ func main() {
132133
os.Exit(1)
133134
}
134135

136+
// Create new plugin system
137+
pS:=&plugin.Plugin{}
138+
135139
// Initialize scheduler
136-
scheduler:=scheduler.NewScheduler(store)
140+
scheduler:=scheduler.NewScheduler(store,pS)
137141
err=scheduler.Init()
138142
iferr!=nil {
139143
gaia.Cfg.Logger.Error("cannot initialize scheduler:","error",err.Error())

‎frontend/client/views/pipeline/detail.vue‎

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ export default {
9090
}
9191
],
9292
runsRows: [],
93-
job:null,
9493
pipelineViewOptions: {
9594
physics: { stabilization:true },
9695
layout: {
@@ -333,11 +332,6 @@ export default {
333332
// Create vis network
334333
// We have to move out the instance out of vue because of https://github.com/almende/vis/issues/2567
335334
window.pipelineView=newVis.Network(container, data,this.pipelineViewOptions)
336-
337-
// Create an selectNode event
338-
window.pipelineView.on('selectNode',function (params) {
339-
this.job=this.nodes.get(params.nodes[0])
340-
}.bind(this))
341335
}
342336
},
343337
@@ -358,13 +352,8 @@ export default {
358352
},
359353
360354
jobLog () {
361-
var jobid=null
362-
if (this.job) {
363-
jobid=this.job.internalID
364-
}
365-
366355
// Route
367-
this.$router.push({path:'/pipeline/log', query: { pipelineid:this.pipelineID, runid:this.runID, jobid: jobid }})
356+
this.$router.push({path:'/pipeline/log', query: { pipelineid:this.pipelineID, runid:this.runID }})
368357
},
369358
370359
startPipeline (pipelineid) {

‎frontend/client/views/pipeline/log.vue‎

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ export default {
2222
logText:'',
2323
jobRunning:true,
2424
runID:null,
25-
pipelineID:null,
26-
jobID:null,
27-
currentPath:''
25+
pipelineID:null
2826
}
2927
},
3028
@@ -36,64 +34,47 @@ export default {
3634
this.fetchData()
3735
3836
// periodically update dashboard
39-
this.intervalID=setInterval(function () {
37+
varintervalID=setInterval(function () {
4038
this.fetchData()
4139
}.bind(this),3000)
42-
this.currentPath=this.$route.path
40+
41+
// Append interval id to store
42+
this.$store.commit('appendInterval', intervalID)
4343
},
4444
4545
watch: {
4646
'$route':'fetchData'
4747
},
4848
49+
destroyed () {
50+
this.$store.commit('clearIntervals')
51+
},
52+
4953
components: {
5054
Message
5155
},
5256
5357
methods: {
5458
fetchData () {
55-
if (this.$route.path!==this.currentPath) {
56-
this.$store.commit('clearIntervals')
57-
}
58-
5959
// look up required url parameters
6060
this.pipelineID=this.$route.query.pipelineid
6161
this.runID=this.$route.query.runid
6262
if (!this.runID||!this.pipelineID) {
6363
return
6464
}
6565
66-
// job id is optional. If ommitted, all logs from all jobs
67-
// are displayed.
68-
this.jobID=this.$route.query.jobid
69-
7066
this.$http
71-
.get('/api/v1/pipelinerun/'+this.pipelineID+'/'+this.runID+'/log', {
72-
showProgressBar:false,
73-
params: {
74-
jobid:this.jobID
75-
}
76-
})
67+
.get('/api/v1/pipelinerun/'+this.pipelineID+'/'+this.runID+'/log', { showProgressBar:false })
7768
.then(response=> {
7869
if (response.data) {
79-
// Check if we got multiple objects
80-
var finished=true
81-
this.logText=''
82-
for (let i=0, l=response.data.length; i< l; i++) {
83-
// We add the received log
84-
this.logText+=response.data[i].log
85-
86-
// LF does not work for HTML. Replace with <br />
87-
this.logText=this.logText.replace(/\n/g,'<br />')
88-
89-
// Job not finished?
90-
if (!response.data[i].finished) {
91-
finished=false
92-
}
93-
}
70+
// We add the received log
71+
this.logText=response.data.log
72+
73+
// LF does not work for HTML. Replace with <br />
74+
this.logText=this.logText.replace(/\n/g,'<br />')
9475
9576
// All jobs finished. Stop interval.
96-
if (finished&&response.data.length>0) {
77+
if (response.data.finished) {
9778
this.jobRunning=false
9879
clearInterval(this.intervalID)
9980
}

‎gaia.go‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ const (
6666

6767
// LogsFolderName represents the Name of the logs folder in pipeline run folder
6868
LogsFolderName="logs"
69+
70+
// LogsFileName represents the file name of the logs output
71+
LogsFileName="output.log"
6972
)
7073

7174
// User is the user object

‎handlers/pipeline_run.go‎

Lines changed: 14 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"net/http"
66
"os"
77
"path/filepath"
8-
"sort"
98
"strconv"
109

1110
"github.com/gaia-pipeline/gaia"
@@ -84,20 +83,15 @@ func PipelineGetLatestRun(c echo.Context) error {
8483
returnc.JSON(http.StatusOK,run)
8584
}
8685

87-
// GetJobLogs returns jobs for a given job.
88-
// If no jobID is given, a collection of all jobs logs will be returned.
86+
// GetJobLogs returns logs from a pipeline run.
8987
//
9088
// Required parameters:
9189
// pipelineid - Related pipeline id
9290
// pipelinerunid - Related pipeline run id
93-
//
94-
// Optional parameters:
95-
// jobid - Job id
9691
funcGetJobLogs(c echo.Context)error {
9792
// Get parameters and validate
9893
pipelineID:=c.Param("pipelineid")
9994
pipelineRunID:=c.Param("runid")
100-
jobID:=c.QueryParam("jobid")
10195

10296
// Transform pipelineid to int
10397
p,err:=strconv.Atoi(pipelineID)
@@ -111,92 +105,31 @@ func GetJobLogs(c echo.Context) error {
111105
returnc.String(http.StatusBadRequest,"invalid pipeline run id given")
112106
}
113107

114-
// Get pipeline run from store
115108
run,err:=storeService.PipelineGetRunByPipelineIDAndID(p,r)
116109
iferr!=nil {
117110
returnc.String(http.StatusBadRequest,"cannot find pipeline run with given pipeline id and pipeline run id")
118111
}
119112

120-
// jobID is not empty, just return the logs from this job
121-
ifjobID!="" {
122-
for_,job:=rangerun.Jobs {
123-
ifstrconv.FormatUint(uint64(job.ID),10)==jobID {
124-
// Get logs
125-
jL,err:=getLogs(pipelineID,pipelineRunID,jobID,false)
126-
iferr!=nil {
127-
returnc.String(http.StatusBadRequest,err.Error())
128-
}
129-
130-
// Check if job is finished
131-
ifjob.Status==gaia.JobSuccess||job.Status==gaia.JobFailed {
132-
jL.Finished=true
133-
}
134-
135-
// We always return an array.
136-
// It makes a bit easier in the frontend.
137-
jobLogsList:= []jobLogs{}
138-
jobLogsList=append(jobLogsList,*jL)
139-
returnc.JSON(http.StatusOK,jobLogsList)
140-
}
141-
}
113+
// Create return object
114+
jL:=jobLogs{}
142115

143-
// Logs for given job id not found
144-
returnc.String(http.StatusBadRequest,"cannot find job with given job id")
116+
// Determine if job has been finished
117+
ifrun.Status==gaia.RunFailed||run.Status==gaia.RunSuccess {
118+
jL.Finished=true
145119
}
146120

147-
// Sort the slice. This is important for the order of the returned logs.
148-
sort.Slice(run.Jobs,func(i,jint)bool {
149-
returnrun.Jobs[i].Priority<run.Jobs[j].Priority
150-
})
151-
152-
// Return a collection of all logs
153-
jobs:= []jobLogs{}
154-
for_,job:=rangerun.Jobs {
155-
// Get logs
156-
jL,err:=getLogs(pipelineID,pipelineRunID,strconv.FormatUint(uint64(job.ID),10),true)
121+
// Check if log file exists
122+
logFilePath:=filepath.Join(gaia.Cfg.WorkspacePath,pipelineID,pipelineRunID,gaia.LogsFolderName,gaia.LogsFileName)
123+
if_,err:=os.Stat(logFilePath);err==nil {
124+
content,err:=ioutil.ReadFile(logFilePath)
157125
iferr!=nil {
158-
returnc.String(http.StatusBadRequest,err.Error())
159-
}
160-
161-
// No error but also no job logs. Job must be in the queue.
162-
// We skip it so no error will break things.
163-
ifjL==nil {
164-
continue
165-
}
166-
167-
// Check if job is finished
168-
ifjob.Status==gaia.JobSuccess||job.Status==gaia.JobFailed {
169-
jL.Finished=true
126+
returnc.String(http.StatusInternalServerError,"cannot read pipeline run log file")
170127
}
171128

172-
jobs=append(jobs,*jL)
129+
// Convert logs
130+
jL.Log=string(content)
173131
}
174132

175133
// Return logs
176-
returnc.JSON(http.StatusOK,jobs)
177-
}
178-
179-
funcgetLogs(pipelineID,pipelineRunID,jobIDstring,getAllJobLogsbool) (*jobLogs,error) {
180-
// Lookup log file
181-
logFilePath:=filepath.Join(gaia.Cfg.WorkspacePath,pipelineID,pipelineRunID,gaia.LogsFolderName,jobID)
182-
183-
// We only check if logs exist when a specific job log was requested.
184-
// If we don't do this, get all job logs will fail during a pipeline run.
185-
if_,err:=os.Stat(logFilePath);os.IsNotExist(err) {
186-
if!getAllJobLogs {
187-
returnnil,err
188-
}
189-
returnnil,nil
190-
}
191-
192-
// Read file
193-
content,err:=ioutil.ReadFile(logFilePath)
194-
iferr!=nil {
195-
returnnil,err
196-
}
197-
198-
// Create return struct
199-
return&jobLogs{
200-
Log:string(content),
201-
},nil
134+
returnc.JSON(http.StatusOK,jL)
202135
}

‎plugin/plugin.go‎

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os/exec"
99

1010
"github.com/gaia-pipeline/gaia"
11+
"github.com/gaia-pipeline/gaia/scheduler"
1112
"github.com/gaia-pipeline/protobuf"
1213
plugin"github.com/hashicorp/go-plugin"
1314
)
@@ -45,23 +46,29 @@ type Plugin struct {
4546

4647
// NewPlugin creates a new instance of Plugin.
4748
// One Plugin instance represents one connection to a plugin.
48-
//
49-
// It expects the start command to start the plugin and the log path (including file)
50-
// where the output should be logged to.
51-
funcNewPlugin(command*exec.Cmd,logPath*string) (p*Plugin,errerror) {
52-
// Allocate
53-
p=&Plugin{}
49+
func (p*Plugin)NewPlugin() scheduler.Plugin {
50+
return&Plugin{}
51+
}
5452

53+
// Connect prepares the log path, starts the plugin, initiates the
54+
// gRPC connection and looks up the plugin.
55+
// It's up to the caller to call plugin.Close to shutdown the plugin
56+
// and close the gRPC connection.
57+
//
58+
// It expects the start command for the plugin and the path where
59+
// the log file should be stored.
60+
func (p*Plugin)Connect(command*exec.Cmd,logPath*string)error {
5561
// Create log file and open it.
5662
// We will close this file in the close method.
5763
iflogPath!=nil {
64+
varerrerror
5865
p.logFile,err=os.OpenFile(
5966
*logPath,
6067
os.O_CREATE|os.O_WRONLY,
6168
0666,
6269
)
6370
iferr!=nil {
64-
returnnil,err
71+
returnerr
6572
}
6673
}
6774

@@ -77,13 +84,6 @@ func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) {
7784
Stderr:p.writer,
7885
})
7986

80-
returnp,nil
81-
}
82-
83-
// Connect starts the plugin, initiates the gRPC connection and looks up the plugin.
84-
// It's up to the caller to call plugin.Close to shutdown the plugin
85-
// and close the gRPC connection.
86-
func (p*Plugin)Connect()error {
8787
// Connect via gRPC
8888
gRPCClient,err:=p.client.Client()
8989
iferr!=nil {
@@ -116,6 +116,10 @@ func (p *Plugin) Execute(j *gaia.Job) error {
116116

117117
// Execute the job
118118
_,err:=p.pluginConn.ExecuteJob(job)
119+
120+
// Flush logs
121+
p.writer.Flush()
122+
119123
returnerr
120124
}
121125

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp