Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0ad5f47

Browse files
authored
Merge pull request#667 from slingdata-io/v1.4.25
V1.4.25
2 parentsa27410e +5cc0ea2 commit0ad5f47

File tree

53 files changed

+3596
-1954
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3596
-1954
lines changed

‎cmd/sling/resource/llm_API_SPEC.md‎

Lines changed: 182 additions & 842 deletions
Large diffs are not rendered by default.

‎cmd/sling/sling_cli_test.go‎

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
packagemain_test
1+
packagemain
22

33
import (
44
"bytes"
@@ -132,15 +132,19 @@ func TestCLI(t *testing.T) {
132132
if!assert.NotEmpty(t,tt.Run,"Command is empty") {
133133
break
134134
}
135-
t.Run(g.F("%d/%s",tt.ID,tt.Name),func(t*testing.T) {
135+
136+
testID:=g.F("%d/%s",tt.ID,tt.Name)
137+
t.Run(testID,func(t*testing.T) {
136138
env.Println(env.GreenString(g.F("%02d | ",tt.ID)+tt.Run))
137139

138140
p,err:=process.NewProc("bash")
139141
if!g.AssertNoError(t,err) {
140142
return
141143
}
142144
p.Capture=true
143-
p.Print=true
145+
ifos.Getenv("DEBUG")!="" {
146+
p.Print=true
147+
}
144148
p.WorkDir="../.."
145149

146150
// set new env
@@ -216,7 +220,14 @@ func TestCLI(t *testing.T) {
216220
}
217221
})
218222
ift.Failed() {
219-
break
223+
// Track failure
224+
testFailuresMux.Lock()
225+
testFailures=append(testFailures,testFailure{
226+
connType:"CLI",
227+
testID:testID,
228+
})
229+
testFailuresMux.Unlock()
230+
// Don't break - let all tests complete
220231
}
221232
}
222233
}

‎cmd/sling/sling_run.go‎

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"gopkg.in/yaml.v2"
1515

16+
"github.com/samber/lo"
1617
"github.com/shirou/gopsutil/v3/mem"
1718
"github.com/slingdata-io/sling-cli/core"
1819
"github.com/slingdata-io/sling-cli/core/dbio/connection"
@@ -200,9 +201,13 @@ func processRun(c *g.CliSC) (ok bool, err error) {
200201
os.Setenv("SLING_CLI","TRUE")
201202
os.Setenv("SLING_CLI_ARGS",g.Marshal(os.Args[1:]))
202203

203-
// check for update, and print note
204-
gocheckUpdate(false)
205-
deferprintUpdateAvailable()
204+
// set run mode
205+
ifos.Getenv("SLING_RUN_MODE")=="" {
206+
os.Setenv(
207+
"SLING_RUN_MODE",
208+
lo.Ternary(pipelineCfgPath!="","pipeline","replication"),
209+
)
210+
}
206211

207212
runReplication:
208213
deferconnection.CloseAll()
@@ -214,6 +219,10 @@ runReplication:
214219
}else {
215220
g.Info(env.CyanString(text))
216221
}
222+
223+
// check for update, and print note
224+
gocheckUpdate()
225+
deferprintUpdateAvailable()
217226
}
218227

219228
ifpipelineCfgPath!="" {
@@ -393,13 +402,15 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error
393402
task.Err=g.Error(replication.FailErr)
394403
}
395404

396-
// set log sink
397-
env.LogSink=func(ll*g.LogLine) {
398-
ll.Group=g.F("%s,%s",task.ExecID,task.Config.StreamID())
399-
task.AppendOutput(ll)
405+
// set log sink if not pipeline mode
406+
ifsling.IsReplicationRunMode() {
407+
env.LogSink=func(ll*g.LogLine) {
408+
ll.Group=g.F("%s,%s",task.ExecID,task.Config.StreamID())
409+
task.AppendOutput(ll)
410+
}
400411
}
401412

402-
sling.StateSet(task)// set into store
413+
task.StateSet()// set into store
403414

404415
iftask.Err!=nil {
405416
err=g.Error(task.Err)
@@ -410,7 +421,7 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error
410421
task.Context=ctx
411422

412423
// set into store after
413-
defersling.StateSet(task)
424+
defertask.StateSet()
414425

415426
// run task
416427
setTM()
@@ -529,7 +540,9 @@ func replicationRun(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..
529540
cleanedForChunkLoad[cfg.Target.Object]=true
530541
}
531542

532-
env.LogSink=nil// clear log sink
543+
ifsling.IsReplicationRunMode() {
544+
env.LogSink=nil// clear log sink if replication mode
545+
}
533546

534547
ifcfg.ReplicationStream.Disabled {
535548
println()
@@ -604,7 +617,7 @@ func runPipeline(pipelineCfgPath string) (err error) {
604617
// track usage
605618
deferfunc() {
606619
steps:= []map[string]any{}
607-
for_,s:=rangepipeline.Steps {
620+
for_,s:=rangepipeline.GetSteps() {
608621
steps=append(steps,s.PayloadMap())
609622
}
610623

@@ -623,6 +636,7 @@ func runPipeline(pipelineCfgPath string) (err error) {
623636
// set function here due to scoping
624637
sling.HookRunReplication=runReplication
625638

639+
pipeline.Context=ctx
626640
err=pipeline.Execute()
627641

628642
return
@@ -735,7 +749,17 @@ func setTimeout(values ...string) (deadline time.Time) {
735749
_=cancel
736750

737751
ctx=g.NewContext(parent)// overwrite global context
738-
time.AfterFunc(duration,func() {g.Warn("SLING_TIMEOUT = %s mins reached!",timeout) })
752+
time.AfterFunc(duration,func() {
753+
ifcast.ToBool(os.Getenv("SLING_TIMEOUT_STACK")) {
754+
// Print all goroutine stacks before panicking
755+
buf:=make([]byte,1<<20)// 1MB buffer
756+
stackLen:=runtime.Stack(buf,true)
757+
env.Println(string(buf[:stackLen]))
758+
panic(g.F("SLING_TIMEOUT = %s mins reached!",timeout))
759+
}else {
760+
g.Warn("SLING_TIMEOUT = %s mins reached!",timeout)
761+
}
762+
})
739763

740764
// set deadline for status setting later
741765
g.Debug("setting timeout for %s minutes",timeout)

‎cmd/sling/sling_test.go‎

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"os/exec"
77
"path/filepath"
88
"runtime"
9+
"runtime/debug"
910
"strings"
1011
"sync"
1112
"testing"
1213
"time"
14+
"unicode"
1315

1416
"github.com/jmespath/go-jmespath"
1517
"github.com/samber/lo"
@@ -37,6 +39,18 @@ var testContext = g.NewContext(context.Background())
3739

3840
varconns=connection.GetLocalConns()
3941

42+
// Track test failures
43+
typetestFailurestruct {
44+
connType dbio.Type
45+
testIDstring
46+
}
47+
48+
var (
49+
testFailuresMux sync.Mutex
50+
testFailures []testFailure
51+
suiteFailuresMap=make(map[dbio.Type]string)// connType -> last failed testID
52+
)
53+
4054
typetestDBstruct {
4155
namestring
4256
conn d.Connection
@@ -102,6 +116,93 @@ func init() {
102116
core.Version="test"
103117
}
104118

119+
funcTestMain(m*testing.M) {
120+
// Init args
121+
args:=os.Args
122+
for_,arg:=rangeargs {
123+
ifarg=="-d"||arg=="--debug" {
124+
os.Setenv("DEBUG","true")
125+
env.InitLogger()
126+
}
127+
ifarg=="-a"||arg=="--all" {
128+
os.Setenv("RUN_ALL","true")// runs all test, don't fail early
129+
}
130+
ifarg=="-t"||arg=="--trace" {
131+
os.Setenv("DEBUG","TRACE")
132+
env.InitLogger()
133+
}
134+
ifarg!=""&&unicode.IsDigit(rune(arg[0])) {
135+
os.Setenv("TESTS",arg)
136+
}
137+
}
138+
139+
// Run all tests
140+
exitCode:=m.Run()
141+
142+
// Print summary of failures
143+
testFailuresMux.Lock()
144+
145+
hasDatabaseFailures:=len(suiteFailuresMap)>0
146+
cliFailures:= []testFailure{}
147+
for_,failure:=rangetestFailures {
148+
iffailure.connType=="CLI" {
149+
cliFailures=append(cliFailures,failure)
150+
}
151+
}
152+
hasCLIFailures:=len(cliFailures)>0
153+
154+
ifhasDatabaseFailures||hasCLIFailures {
155+
println()
156+
println("================================================================================")
157+
println(" TEST FAILURE SUMMARY")
158+
println("================================================================================")
159+
println()
160+
161+
// Database test failures
162+
ifhasDatabaseFailures {
163+
println("DATABASE TEST SUITES:")
164+
println()
165+
forconnType,lastTestID:=rangesuiteFailuresMap {
166+
println(g.F(" ❌ FAILED: %s",connType))
167+
println(g.F(" Last Failed Test: %s",lastTestID))
168+
println()
169+
}
170+
println(g.F(" Total Failed DB Test Suites: %d",len(suiteFailuresMap)))
171+
dbFailureCount:=0
172+
for_,failure:=rangetestFailures {
173+
iffailure.connType!="CLI" {
174+
dbFailureCount++
175+
}
176+
}
177+
println(g.F(" Total Failed DB Tests: %d",dbFailureCount))
178+
println()
179+
}
180+
181+
// CLI test failures
182+
ifhasCLIFailures {
183+
println("CLI TESTS:")
184+
println()
185+
for_,failure:=rangecliFailures {
186+
println(g.F(" ❌ FAILED: %s",failure.testID))
187+
println()
188+
}
189+
println(g.F(" Total Failed CLI Tests: %d",len(cliFailures)))
190+
println()
191+
}
192+
193+
println("================================================================================")
194+
totalFailures:=len(testFailures)
195+
println(g.F("TOTAL FAILED TESTS: %d",totalFailures))
196+
println("================================================================================")
197+
}else {
198+
println()
199+
println("✅ All tests passed!")
200+
}
201+
testFailuresMux.Unlock()
202+
203+
os.Exit(exitCode)
204+
}
205+
105206
funcTestOptions(t*testing.T) {
106207
testCases:= []struct {
107208
inputstring
@@ -187,15 +288,15 @@ func TestCfgPath(t *testing.T) {
187288
}
188289

189290
funcTestExtract(t*testing.T) {
190-
core.Version="v1.0.43"
291+
//core.Version = "v1.0.43"
191292

192-
checkUpdate(true)
293+
checkUpdate()
193294
assert.NotEmpty(t,updateVersion)
194295

195296
printUpdateAvailable()
196297

197-
err:=g.ExtractTarGz(g.UserHomeDir()+"/Downloads/sling/sling_1.0.44_darwin_all.tar.gz",g.UserHomeDir()+"/Downloads/sling")
198-
g.AssertNoError(t,err)
298+
//err := g.ExtractTarGz(g.UserHomeDir()+"/Downloads/sling/sling_1.0.44_darwin_all.tar.gz", g.UserHomeDir()+"/Downloads/sling")
299+
//g.AssertNoError(t, err)
199300
}
200301

201302
functestSuite(t*testing.T,connType dbio.Type,testSelect...string) {
@@ -367,18 +468,39 @@ func testSuite(t *testing.T, connType dbio.Type, testSelect ...string) {
367468
continue
368469
}
369470
}
370-
t.Run(g.F("%s/%s",connType,file.RelPath),func(t*testing.T) {
471+
472+
testID:=g.F("%s/%s",connType,file.RelPath)
473+
t.Run(testID,func(t*testing.T) {
371474
runOneTask(t,file,connType)
372475
})
373476
ift.Failed() {
374477
g.LogError(g.Error("Test `%s` Failed for => %s",file.Name,connType))
375-
testContext.Cancel()
376-
return
478+
// Track failure
479+
testFailuresMux.Lock()
480+
testFailures=append(testFailures,testFailure{
481+
connType:connType,
482+
testID:testID,
483+
})
484+
suiteFailuresMap[connType]=testID
485+
testFailuresMux.Unlock()
486+
487+
// cancel early if not specified
488+
if!cast.ToBool(os.Getenv("RUN_ALL")) {
489+
testContext.Cancel()
490+
}
377491
}
378492
}
379493
}
380494

381495
funcrunOneTask(t*testing.T,file g.FileItem,connType dbio.Type) {
496+
deferfunc() {
497+
ifr:=recover();r!=nil {
498+
info:=string(debug.Stack())
499+
g.Warn(g.F("panic occurred! %#v\n%s",r,info))
500+
t.FailNow()
501+
}
502+
}()
503+
382504
os.Setenv("SLING_LOADED_AT_COLUMN","TRUE")
383505
os.Setenv("SLING_CHECKSUM_ROWS","10000")// so that it errors when checksums don't match
384506
println()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp