sling
packageThis package is not in the latest version of its module.
Details
Validgo.mod file
The Go module system was introduced in Go 1.11 and is the official dependency management solution for Go.
Redistributable license
Redistributable licenses place minimal restrictions on how software can be used, modified, and redistributed.
Tagged version
Modules with tagged versions give importers more predictable builds.
Stable version
When a project reaches major version v1 it is considered stable.
- Learn more about best practices
Repository
Links
Documentation¶
Index¶
- Variables
- func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientGet(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPost(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPut(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ErrorHelper(err error) (helpString string)
- func GetJWTFromKey(masterServerURL, key string) (string, error)
- func GetPipelineStoreEnv() (store map[string]any)
- func GetSQLText(sqlStringPath string) (string, error)
- func IsJSONorYAML(payload string) bool
- func IsPipelineRunMode() bool
- func IsReplicationRunMode() bool
- func NewExecID() string
- func SetPipelineStoreEnv(store map[string]any)
- func SetStreamDefaults(name string, stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)
- func Sling(cfg *Config) (err error)
- type Config
- func (cfg *Config) AsReplication() (rc ReplicationConfig)
- func (cfg *Config) ClearTableForChunkLoadWithRange() (err error)
- func (cfg *Config) ColumnsPrepared() (columns iop.Columns)
- func (cfg *Config) DetermineType() (Type JobType, err error)
- func (cfg *Config) FormatTargetObjectName() (err error)
- func (cfg *Config) GetFormatMap() (m map[string]any, err error)
- func (cfg *Config) HasIncrementalVal() bool
- func (cfg *Config) HasWildcard() bool
- func (cfg *Config) IgnoreExisting() bool
- func (cfg *Config) IsFileStreamWithStateAndParts() bool
- func (cfg *Config) IsFullRefreshWithChunking() bool
- func (cfg *Config) IsFullRefreshWithRange() bool
- func (cfg *Config) IsIncrementalWithChunking() bool
- func (cfg *Config) IsIncrementalWithRange() bool
- func (cfg *Config) IsTruncateWithChunking() bool
- func (cfg *Config) IsTruncateWithRange() bool
- func (cfg *Config) MD5() string
- func (cfg *Config) Prepare() (err error)
- func (cfg *Config) ReplicationMode() bool
- func (cfg *Config) Scan(value any) error
- func (cfg *Config) SetDefault()
- func (cfg *Config) SrcConnMD5() string
- func (cfg *Config) StreamID() string
- func (cfg *Config) TgtConnMD5() string
- func (cfg *Config) TransformsPrepared() (stageTransforms []map[string]string)
- func (cfg *Config) Unmarshal(cfgStr string) (err error)
- func (cfg Config) Value() (driver.Value, error)
- func (cfg *Config) WithChunking() bool
- type ConfigOptions
- type ConnState
- type DateTimeState
- type ExecStatus
- type ExecutionState
- type ExecutionStatus
- type Hook
- type HookKind
- type HookMap
- type HookStage
- type HookType
- type Hooks
- type JobType
- type Mode
- type ObjectState
- type OnFailType
- type ParseOptions
- type Pipeline
- type PipelineState
- func (ps *PipelineState) GetStore() map[string]any
- func (ps *PipelineState) Marshall() string
- func (ps *PipelineState) SetStateData(id string, data map[string]any)
- func (ps *PipelineState) SetStateKeyValue(id, key string, value any)
- func (ps *PipelineState) SetStoreData(key string, value any, del bool)
- func (ps *PipelineState) StepExecution() *PipelineStepExecution
- func (ps *PipelineState) TaskExecution() *TaskExecution
- type PipelineStepExecution
- type ProgressBar
- type ReplicationConfig
- func (rd *ReplicationConfig) AddStream(key string, cfg *ReplicationStreamConfig)
- func (rd *ReplicationConfig) Compile(cfgOverwrite *Config, selectStreams ...string) (err error)
- func (rd *ReplicationConfig) DeleteStream(key string)
- func (rd *ReplicationConfig) ExecuteReplicationHook(stage HookStage) (err error)
- func (rd ReplicationConfig) GetStream(name string) (streamName string, cfg *ReplicationStreamConfig, found bool)
- func (rd *ReplicationConfig) JSON() string
- func (rd *ReplicationConfig) MD5() string
- func (rd ReplicationConfig) MatchStreams(pattern string) (streams map[string]*ReplicationStreamConfig)
- func (rd ReplicationConfig) Normalize(n string) string
- func (rd *ReplicationConfig) OriginalCfg() string
- func (rd *ReplicationConfig) ParseReplicationHook(stage HookStage) (err error)
- func (rd *ReplicationConfig) ParseStreamHook(stage HookStage, rs *ReplicationStreamConfig) (hooks Hooks, err error)
- func (rd *ReplicationConfig) ProcessChunks() (err error)
- func (rd *ReplicationConfig) ProcessWildcards() (err error)
- func (rd *ReplicationConfig) ProcessWildcardsAPI(c connection.Connection, patterns []string) (wildcards Wildcards, err error)
- func (rd *ReplicationConfig) ProcessWildcardsDatabase(c connection.Connection, patterns []string) (wildcards Wildcards, err error)
- func (rd *ReplicationConfig) ProcessWildcardsFile(c connection.Connection, patterns []string) (wildcards Wildcards, err error)
- func (rd *ReplicationConfig) RuntimeState() (_ *ReplicationState, err error)
- func (rd *ReplicationConfig) Scan(value interface{}) error
- func (rd *ReplicationConfig) SetRuntimeState(state *ReplicationState)
- func (rd *ReplicationConfig) StreamToTaskConfig(stream *ReplicationStreamConfig, name string, env map[string]string) (cfg Config, err error)
- func (rd ReplicationConfig) StreamsOrdered() []string
- func (rd ReplicationConfig) Value() (driver.Value, error)
- type ReplicationState
- func (rs *ReplicationState) GetStore() map[string]any
- func (rs *ReplicationState) Marshall() string
- func (rs *ReplicationState) SetStateData(id string, data map[string]any)
- func (rs *ReplicationState) SetStateKeyValue(id, key string, value any)
- func (rs *ReplicationState) SetStoreData(key string, value any, del bool)
- func (rs *ReplicationState) StepExecution() *PipelineStepExecution
- func (rs *ReplicationState) TaskExecution() *TaskExecution
- type ReplicationStreamConfig
- type RouteName
- type RunFile
- type RunFileType
- type RunFiles
- type RunState
- type RuntimeState
- type Source
- type SourceOptions
- type StatusMap
- type StreamState
- type Target
- type TargetOptions
- type TaskExecution
- func (t *TaskExecution) AddCleanupTaskFirst(f func())
- func (t *TaskExecution) AddCleanupTaskLast(f func())
- func (t *TaskExecution) AppendOutput(ll *g.LogLine)
- func (t *TaskExecution) Cleanup()
- func (t *TaskExecution) Data() *iop.Dataset
- func (t *TaskExecution) Df() *iop.Dataflow
- func (t *TaskExecution) Execute() error
- func (t *TaskExecution) ExecuteHooks(stage HookStage) (err error)
- func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)
- func (t *TaskExecution) GetBytesString() (s string)
- func (t *TaskExecution) GetCount() (count uint64)
- func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)
- func (t *TaskExecution) GetSourceTable() (sTable database.Table, err error)
- func (t *TaskExecution) GetStateMap() map[string]any
- func (t *TaskExecution) GetTargetTable(tempTableSuffix ...string) (tTable database.Table, err error)
- func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)
- func (t *TaskExecution) IsStalled(window float64) bool
- func (t *TaskExecution) ReadFromApi(cfg *Config, srcConn *api.APIConnection) (df *iop.Dataflow, err error)
- func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)
- func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
- func (t *TaskExecution) SetProgress(text string, args ...interface{})
- func (t *TaskExecution) StateSet()
- func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)
- func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64, err error)
- type Wildcard
- type Wildcards
Constants¶
This section is empty.
Variables¶
var AllExecStatus = []struct {ValueExecStatusTSNamestring}{{ExecStatusCreated, "ExecStatusCreated"},{ExecStatusQueued, "ExecStatusQueued"},{ExecStatusStarted, "ExecStatusStarted"},{ExecStatusRunning, "ExecStatusRunning"},{ExecStatusSuccess, "ExecStatusSuccess"},{ExecStatusTerminated, "ExecStatusTerminated"},{ExecStatusInterrupted, "ExecStatusInterrupted"},{ExecStatusTimedOut, "ExecStatusTimedOut"},{ExecStatusError, "ExecStatusError"},{ExecStatusSkipped, "ExecStatusSkipped"},{ExecStatusStalled, "ExecStatusStalled"},}
var AllJobType = []struct {ValueJobTypeTSNamestring}{{ConnTest, "ConnTest"},{ConnDiscover, "ConnDiscover"},{ConnExec, "ConnExec"},{DbToDb, "DbToDb"},{FileToDB, "FileToDB"},{DbToFile, "DbToFile"},{FileToFile, "FileToFile"},{ApiToDB, "ApiToDB"},{ApiToFile, "ApiToFile"},{DbSQL, "DbSQL"},}
var AllMode = []struct {ValueModeTSNamestring}{{FullRefreshMode, "FullRefreshMode"},{IncrementalMode, "IncrementalMode"},{TruncateMode, "TruncateMode"},{SnapshotMode, "SnapshotMode"},{BackfillMode, "BackfillMode"},}
var HookRunReplication func(string, *Config, ...string)errorvar ParseHook = func(any,ParseOptions) (Hook,error) {returnnil,g.Error("please use the official sling-cli release for using hooks and pipelines")}
var ShowProgress =env.IsInteractiveTerminal()var SourceDBOptionsDefault =SourceOptions{EmptyAsNull:g.Bool(false),DatetimeFormat: "AUTO",MaxDecimals:g.Int(-1),}
var SourceFileOptionsDefault =SourceOptions{EmptyAsNull:g.Bool(false),Header:g.Bool(true),Flatten:g.Bool(false),Compression:iop.CompressorTypePtr(iop.AutoCompressorType),NullIf:g.String("NULL"),DatetimeFormat: "AUTO",SkipBlankLines:g.Bool(false),FieldsPerRec:g.Int(-1),MaxDecimals:g.Int(-1),}
var StoreSet = func(valany)error { returnnil }Set in the store/store.go file for history keeping
var TargetDBOptionsDefault =TargetOptions{FileMaxRows:lo.Ternary(os.Getenv("FILE_MAX_ROWS") != "",g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))),g.Int64(0),),UseBulk:g.Bool(true),AddNewColumns:g.Bool(true),AdjustColumnType:g.Bool(false),DatetimeFormat: "auto",MaxDecimals:g.Int(-1),ColumnCasing:g.Ptr(iop.NormalizeColumnCasing),}
var TargetFileOptionsDefault =TargetOptions{Header:g.Bool(true),Compression:lo.Ternary(os.Getenv("COMPRESSION") != "",iop.CompressorTypePtr(iop.CompressorType(strings.ToLower(os.Getenv("COMPRESSION")))),iop.CompressorTypePtr(iop.AutoCompressorType),),Concurrency:lo.Ternary(os.Getenv("CONCURRENCY") != "",cast.ToInt(os.Getenv("CONCURRENCY")),7,),FileMaxRows:lo.Ternary(os.Getenv("FILE_MAX_ROWS") != "",g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))),g.Int64(0),),FileMaxBytes:lo.Ternary(os.Getenv("FILE_MAX_BYTES") != "",g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_BYTES"))),g.Int64(0),),Format:dbio.FileTypeNone,UseBulk:g.Bool(true),AddNewColumns:g.Bool(true),DatetimeFormat: "auto",Delimiter: ",",MaxDecimals:g.Int(-1),ColumnCasing:g.Ptr(iop.NormalizeColumnCasing),}
Functions¶
funcClientDelete¶
func ClientDelete(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)
ClientDelete sends a DELETE request
funcClientGet¶
func ClientGet(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)
ClientGet sends a GET request
funcClientOptions¶
func ClientOptions(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)
ClientOptions sends a HEAD request
funcClientPatch¶
func ClientPatch(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)
ClientPatch sends a PATCH request
funcClientPost¶
func ClientPost(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)
ClientPost sends a POST request
funcClientPut¶
func ClientPut(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)
ClientPut sends a PUT request
funcErrorHelper¶added inv1.0.61
funcGetJWTFromKey¶
GetJWTFromKey logs in and returns the JWT based on the provided key
funcGetPipelineStoreEnv¶added inv1.5.1
GetPipelineStoreEnv syncs pipeline store from replication step run
funcGetSQLText¶added inv1.1.14
GetSQLText process source sql file / text
funcIsJSONorYAML¶added inv1.2.14
IsJSONorYAML detects a JSON or YAML payload
funcIsPipelineRunMode¶added inv1.4.25
func IsPipelineRunMode()bool
funcIsReplicationRunMode¶added inv1.4.25
func IsReplicationRunMode()bool
funcSetPipelineStoreEnv¶added inv1.5.1
SetPipelineStoreEnv syncs pipeline store to replication step run
funcSetStreamDefaults¶added inv0.86.36
func SetStreamDefaults(namestring, stream *ReplicationStreamConfig, replicationCfgReplicationConfig)
Types¶
typeConfig¶
type Config struct {SourceSource `json:"source,omitempty" yaml:"source,omitempty"`TargetTarget `json:"target" yaml:"target"`ModeMode `json:"mode,omitempty" yaml:"mode,omitempty"`Transformsany `json:"transforms,omitempty" yaml:"transforms,omitempty"`OptionsConfigOptions `json:"options,omitempty" yaml:"options,omitempty"`Env map[string]string `json:"env,omitempty" yaml:"env,omitempty"`StreamNamestring `json:"stream_name,omitempty" yaml:"stream_name,omitempty"`ReplicationStream *ReplicationStreamConfig `json:"replication_stream,omitempty" yaml:"replication_stream,omitempty"`DependsOn []string `json:"depends_on,omitempty" yaml:"depends_on,omitempty"`SrcConnconnection.Connection `json:"-" yaml:"-"`TgtConnconnection.Connection `json:"-" yaml:"-"`Preparedbool `json:"-" yaml:"-"`IncrementalValany `json:"incremental_val" yaml:"incremental_val"`IncrementalValStrstring `json:"incremental_val_str" yaml:"incremental_val_str"`IncrementalGTEbool `json:"incremental_gte,omitempty" yaml:"incremental_gte,omitempty"`MetadataLoadedAt *bool `json:"-" yaml:"-"`MetadataStreamURLbool `json:"-" yaml:"-"`MetadataRowNumbool `json:"-" yaml:"-"`MetadataRowIDbool `json:"-" yaml:"-"`MetadataExecIDbool `json:"-" yaml:"-"`// contains filtered or unexported fields}Config is the new config struct
func (*Config)AsReplication¶added inv1.1.14
func (cfg *Config) AsReplication() (rcReplicationConfig)
func (*Config)ClearTableForChunkLoadWithRange¶added inv1.4.14
ClearTableForChunkLoadWithRange clears the table for chunk load with in modefull-refresh or truncate
func (*Config)ColumnsPrepared¶added inv1.2.16
ColumnsPrepared returns the prepared columns
func (*Config)DetermineType¶added inv0.84.9
func (*Config)FormatTargetObjectName¶added inv0.85.54
func (*Config)GetFormatMap¶added inv1.0.31
GetFormatMap returns a map to format a string with provided with variables
func (*Config)HasIncrementalVal¶added inv1.2.21
HasIncrementalVal returns true there is a non-null incremental value
func (*Config)HasWildcard¶added inv1.1.14
func (*Config)IgnoreExisting¶added inv1.2.10
IgnoreExisting returns true target_options.ignore_existing is true
func (*Config)IsFileStreamWithStateAndParts¶added inv1.3.6
func (*Config)IsFullRefreshWithChunking¶added inv1.4.14
IsFullRefreshWithChunking returns true is the stream is chunking
func (*Config)IsFullRefreshWithRange¶added inv1.4.14
IsFullRefreshWithRange returns true is the stream has range
func (*Config)IsIncrementalWithChunking¶added inv1.4.14
IsIncrementalWithChunking returns true is the stream is chunking
func (*Config)IsIncrementalWithRange¶added inv1.4.14
IsIncrementalWithRange returns true is the stream has range
func (*Config)IsTruncateWithChunking¶added inv1.4.14
IsTruncateWithChunking returns true is the stream is chunking
func (*Config)IsTruncateWithRange¶added inv1.4.14
IsTruncateWithRange returns true is the stream has range
func (*Config)ReplicationMode¶added inv0.87.34
ReplicationMode returns true for replication mode
func (*Config)SrcConnMD5¶added inv1.2.15
func (*Config)TgtConnMD5¶added inv1.2.15
func (*Config)TransformsPrepared¶added inv1.2.16
TransformsPrepared returns the transforms columns
func (*Config)WithChunking¶added inv1.4.14
typeConfigOptions¶
type ConfigOptions struct {Debugbool `json:"debug,omitempty" yaml:"debug,omitempty"`StdInbool `json:"-"`// whether stdin is passedStdOutbool `json:"stdout,omitempty" yaml:"stdout,omitempty"`// whether to output to stdoutDatasetbool `json:"dataset,omitempty" yaml:"dataset,omitempty"`// whether to output to dataset}ConfigOptions are configuration options
typeConnState¶added inv1.3.5
type ConnState struct {Namestring `json:"name,omitempty"`Typedbio.Type `json:"type,omitempty"`Kinddbio.Kind `json:"kind,omitempty"`Bucketstring `json:"bucket,omitempty"`Containerstring `json:"container,omitempty"`Databasestring `json:"database,omitempty"`Instancestring `json:"instance,omitempty"`Schemastring `json:"schema,omitempty"`}typeDateTimeState¶added inv1.3.5
type DateTimeState struct {Timestamptime.Time `json:"timestamp,omitempty"`Unixint64 `json:"unix,omitempty"`FileNamestring `json:"file_name,omitempty"`Rfc3339string `json:"rfc3339,omitempty"`Datestring `json:"date,omitempty"`Datetimestring `json:"datetime,omitempty"`YYYYstring `json:"YYYY,omitempty"`YYstring `json:"YY,omitempty"`MMMstring `json:"MMM,omitempty"`MMstring `json:"MM,omitempty"`DDstring `json:"DD,omitempty"`DDDstring `json:"DDD,omitempty"`HHstring `json:"HH,omitempty"`}func (*DateTimeState)Update¶added inv1.3.5
func (dts *DateTimeState) Update()
typeExecStatus¶
type ExecStatusstring
ExecStatus is the status of an execution
const (// ExecStatusCreated = createdExecStatusCreatedExecStatus = "created"// ExecStatusQueued = queuedExecStatusQueuedExecStatus = "queued"// ExecStatusSubmitted = submittedExecStatusSubmittedExecStatus = "submitted"// ExecStatusStarted = startedExecStatusStartedExecStatus = "started"// ExecStatusRunning = runningExecStatusRunningExecStatus = "running"// ExecStatusSuccess = successExecStatusSuccessExecStatus = "success"// ExecStatusTerminated = terminatedExecStatusTerminatedExecStatus = "terminated"// ExecStatusInterrupted = interruptedExecStatusInterruptedExecStatus = "interrupted"// ExecStatusTimedOut = timed-out (when no heartbeat sent for 30 sec)ExecStatusTimedOutExecStatus = "timed-out"// ExecStatusError = errorExecStatusErrorExecStatus = "error"// ExecStatusSkipped = skippedExecStatusSkippedExecStatus = "skipped"// ExecStatusStalled = stalled (when still heartbeating, but rows are unchanged for a while)ExecStatusStalledExecStatus = "stalled"// ExecStatusWarning = cancelledExecStatusCancelledExecStatus = "cancelled"// ExecStatusWarning = warningExecStatusWarningExecStatus = "warning")
func (ExecStatus)IsFailure¶
func (sExecStatus) IsFailure()bool
IsFailure returns true if an execution is failed
func (ExecStatus)IsFinished¶
func (sExecStatus) IsFinished()bool
IsFinished returns true if an execution is finished
func (ExecStatus)IsRunning¶
func (sExecStatus) IsRunning()bool
IsRunning returns true if an execution is running
func (ExecStatus)IsSuccess¶
func (sExecStatus) IsSuccess()bool
IsSuccess returns true if an execution is successful
func (ExecStatus)IsWarning¶added inv1.2.23
func (sExecStatus) IsWarning()bool
IsWarning returns true if an execution is warning
typeExecutionState¶added inv1.3.6
type ExecutionState struct {IDstring `json:"id"`FilePathstring `json:"string"`TotalBytesuint64 `json:"total_bytes"`TotalRowsuint64 `json:"total_rows"`StatusStatusMap `json:"status"`StartTime *time.Time `json:"start_time"`EndTime *time.Time `json:"end_time"`Durationint64 `json:"duration"`Error *string `json:"error"`}typeExecutionStatus¶
type ExecutionStatus struct {JobIDint `json:"job_id,omitempty"`ExecIDint64 `json:"exec_id,omitempty"`StatusExecStatus `json:"status,omitempty"`Textstring `json:"text,omitempty"`Rowsuint64 `json:"rows,omitempty"`Bytesuint64 `json:"bytes,omitempty"`Percentint `json:"percent,omitempty"`Stalledbool `json:"stalled,omitempty"`Duration *int `json:"duration,omitempty"`AvgDurationint `json:"avg_duration,omitempty"`}ExecutionStatus is an execution status object
typeHookMap¶added inv1.3.5
type HookMap struct {Start []any `json:"start,omitempty" yaml:"start,omitempty"`End []any `json:"end,omitempty" yaml:"end,omitempty"`Pre []any `json:"pre,omitempty" yaml:"pre,omitempty"`Post []any `json:"post,omitempty" yaml:"post,omitempty"`PreMerge []any `json:"pre_merge,omitempty" yaml:"pre_merge,omitempty"`PostMerge []any `json:"post_merge,omitempty" yaml:"post_merge,omitempty"`}typeJobType¶
type JobTypestring
JobType is an enum type for jobs
const ApiToDBJobType = "api-db"ApiToDB is from api to db
const ApiToFileJobType = "api-file"ApiToFile is from api to file
const ConnDiscoverJobType = "conn-discover"ConnTest is for a connection discover
const ConnExecJobType = "conn-exec"ConnTest is for a connection exec
const ConnTestJobType = "conn-test"ConnTest is for a connection test
const DbSQLJobType = "db-sql"DbSQL is for a db sql query
const DbToDbJobType = "db-db"DbToDb is from db to db
const DbToFileJobType = "db-file"DbToFile is from db to file
const FileToDBJobType = "file-db"FileToDB is from db to db
const FileToFileJobType = "file-file"FileToFile is from file to file
typeMode¶
type Modestring
Mode is a load mode
const (// TruncateMode is to truncateTruncateModeMode = "truncate"// FullRefreshMode is to dropFullRefreshModeMode = "full-refresh"// IncrementalMode is to incrementalIncrementalModeMode = "incremental"// SnapshotMode is to snapshotSnapshotModeMode = "snapshot"// BackfillMode is to backfillBackfillModeMode = "backfill")
typeObjectState¶added inv1.3.5
type ObjectState struct {Schemastring `json:"schema,omitempty"`Tablestring `json:"table,omitempty"`Namestring `json:"name,omitempty"`FullNamestring `json:"full_name,omitempty"`TempSchemastring `json:"temp_schema,omitempty"`TempTablestring `json:"temp_table,omitempty"`TempFullNamestring `json:"temp_full_name,omitempty"`}typeOnFailType¶added inv1.4.1
type OnFailTypestring
typeParseOptions¶added inv1.3.5
type ParseOptions struct {// contains filtered or unexported fields}typePipeline¶added inv1.4.1
type Pipeline struct {Steps []any `json:"steps,omitempty" yaml:"steps,omitempty"`Env map[string]any `json:"env,omitempty" yaml:"env,omitempty"`Context *g.Context `json:"-"`Outputstrings.Builder `json:"-"`OutputLines chan *g.LogLine `json:"-"`CurrentStep *PipelineStepExecution `json:"-"`MD5string `json:"-"`FileNamestring `json:"-"`Bodystring `json:"body,omitempty" yaml:"-"`// raw body of pipeline// contains filtered or unexported fields}funcLoadPipelineConfig¶added inv1.4.1
funcLoadPipelineConfigFromFile¶added inv1.4.1
func (*Pipeline)Execute¶added inv1.4.1
Execute executes the pipeline steps using PipelineStepExecution
func (*Pipeline)RuntimeState¶added inv1.4.1
func (pl *Pipeline) RuntimeState() (_ *PipelineState, errerror)
RuntimeState returns the state for use
typePipelineState¶added inv1.4.1
type PipelineState struct {State map[string]map[string]any `json:"state,omitempty"`Store map[string]any `json:"store,omitempty"`Env map[string]any `json:"env,omitempty"`TimestampDateTimeState `json:"timestamp,omitempty"`Runs map[string]*RunState `json:"runs,omitempty"`Run *RunState `json:"run,omitempty"`}func (*PipelineState)GetStore¶added inv1.5.1
func (ps *PipelineState) GetStore() map[string]any
func (*PipelineState)Marshall¶added inv1.4.1
func (ps *PipelineState) Marshall()string
func (*PipelineState)SetStateData¶added inv1.4.1
func (ps *PipelineState) SetStateData(idstring, data map[string]any)
func (*PipelineState)SetStateKeyValue¶added inv1.4.1
func (ps *PipelineState) SetStateKeyValue(id, keystring, valueany)
func (*PipelineState)SetStoreData¶added inv1.4.5
func (ps *PipelineState) SetStoreData(keystring, valueany, delbool)
func (*PipelineState)StepExecution¶added inv1.4.25
func (ps *PipelineState) StepExecution() *PipelineStepExecution
func (*PipelineState)TaskExecution¶added inv1.4.1
func (ps *PipelineState) TaskExecution() *TaskExecution
typePipelineStepExecution¶added inv1.4.25
type PipelineStepExecution struct {ExecIDstring `json:"exec_id"`StatusExecStatus `json:"status"`Errerror `json:"error"`StartTime *time.Time `json:"start_time"`EndTime *time.Time `json:"end_time"`Progressstring `json:"progress"`Outputstrings.Builder `json:"-"`OutputLines chan *g.LogLine `json:"-"`Pipeline *Pipeline `json:"-"`Map map[string]any `json:"-"`StepHook `json:"-"`// The specific step to execute}PipelineStepExecution represents a single step execution context
func (*PipelineStepExecution)Context¶added inv1.4.25
func (pse *PipelineStepExecution) Context() *g.Context
func (*PipelineStepExecution)Execute¶added inv1.4.25
func (pse *PipelineStepExecution) Execute(skipbool) (errerror)
Execute executes a single pipeline step
func (*PipelineStepExecution)StateSet¶added inv1.4.25
func (pse *PipelineStepExecution) StateSet()
typeProgressBar¶
type ProgressBar struct {// contains filtered or unexported fields}func (*ProgressBar)Finish¶
func (pb *ProgressBar) Finish()
func (*ProgressBar)SetStatus¶
func (pb *ProgressBar) SetStatus(statusstring)
SetStatus sets the progress bar status
func (*ProgressBar)Start¶
func (pb *ProgressBar) Start()
typeReplicationConfig¶added inv0.86.36
type ReplicationConfig struct {Sourcestring `json:"source,omitempty" yaml:"source,omitempty"`Targetstring `json:"target,omitempty" yaml:"target,omitempty"`HooksHookMap `json:"hooks,omitempty" yaml:"hooks,omitempty"`DefaultsReplicationStreamConfig `json:"defaults,omitempty" yaml:"defaults,omitempty"`Streams map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"`Env map[string]any `json:"env,omitempty" yaml:"env,omitempty"`// Tasks are compiled tasksTasks []*Config `json:"tasks"`Compiledbool `json:"compiled"`FailErrstring// error string to fail all (e.g. when the first tasks fails to connect)Context *g.Context `json:"-"`// contains filtered or unexported fields}funcLoadReplicationConfig¶added inv0.87.17
func LoadReplicationConfig(contentstring) (configReplicationConfig, errerror)
funcLoadReplicationConfigFromFile¶added inv1.2.10
func LoadReplicationConfigFromFile(cfgPathstring) (configReplicationConfig, errerror)
funcUnmarshalReplication¶added inv0.86.39
func UnmarshalReplication(replicYAMLstring) (configReplicationConfig, errerror)
UnmarshalReplication converts a yaml file to a replication
func (*ReplicationConfig)AddStream¶added inv1.1.15
func (rd *ReplicationConfig) AddStream(keystring, cfg *ReplicationStreamConfig)
func (*ReplicationConfig)Compile¶added inv1.2.2
func (rd *ReplicationConfig) Compile(cfgOverwrite *Config, selectStreams ...string) (errerror)
Compile compiles the replication into tasks
func (*ReplicationConfig)DeleteStream¶added inv1.1.15
func (rd *ReplicationConfig) DeleteStream(keystring)
func (*ReplicationConfig)ExecuteReplicationHook¶added inv1.4.15
func (rd *ReplicationConfig) ExecuteReplicationHook(stageHookStage) (errerror)
func (ReplicationConfig)GetStream¶added inv1.1.15
func (rdReplicationConfig) GetStream(namestring) (streamNamestring, cfg *ReplicationStreamConfig, foundbool)
GetStream returns the stream if the it exists
func (*ReplicationConfig)JSON¶added inv1.2.14
func (rd *ReplicationConfig) JSON()string
JSON returns json payload
func (*ReplicationConfig)MD5¶added inv1.1.6
func (rd *ReplicationConfig) MD5()string
MD5 returns a md5 hash of the json payload
func (ReplicationConfig)MatchStreams¶added inv1.1.15
func (rdReplicationConfig) MatchStreams(patternstring) (streams map[string]*ReplicationStreamConfig)
GetStream returns the stream if the it exists
func (ReplicationConfig)Normalize¶added inv1.1.14
func (rdReplicationConfig) Normalize(nstring)string
Normalize normalized the name
func (*ReplicationConfig)OriginalCfg¶added inv1.0.63
func (rd *ReplicationConfig) OriginalCfg()string
OriginalCfg returns original config
func (*ReplicationConfig)ParseReplicationHook¶added inv1.3.6
func (rd *ReplicationConfig) ParseReplicationHook(stageHookStage) (errerror)
func (*ReplicationConfig)ParseStreamHook¶added inv1.3.5
func (rd *ReplicationConfig) ParseStreamHook(stageHookStage, rs *ReplicationStreamConfig) (hooksHooks, errerror)
func (*ReplicationConfig)ProcessChunks¶added inv1.4.1
func (rd *ReplicationConfig) ProcessChunks() (errerror)
func (*ReplicationConfig)ProcessWildcards¶added inv0.87.18
func (rd *ReplicationConfig) ProcessWildcards() (errerror)
ProcessWildcards process the streams using wildcardssuch as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`
func (*ReplicationConfig)ProcessWildcardsAPI¶added inv1.4.5
func (rd *ReplicationConfig) ProcessWildcardsAPI(cconnection.Connection, patterns []string) (wildcardsWildcards, errerror)
func (*ReplicationConfig)ProcessWildcardsDatabase¶added inv1.0.68
func (rd *ReplicationConfig) ProcessWildcardsDatabase(cconnection.Connection, patterns []string) (wildcardsWildcards, errerror)
func (*ReplicationConfig)ProcessWildcardsFile¶added inv1.0.68
func (rd *ReplicationConfig) ProcessWildcardsFile(cconnection.Connection, patterns []string) (wildcardsWildcards, errerror)
func (*ReplicationConfig)RuntimeState¶added inv1.3.5
func (rd *ReplicationConfig) RuntimeState() (_ *ReplicationState, errerror)
func (*ReplicationConfig)Scan¶added inv0.86.36
func (rd *ReplicationConfig) Scan(value interface{})error
Scan scan value into Jsonb, implements sql.Scanner interface
func (*ReplicationConfig)SetRuntimeState¶added inv1.5.2
func (rd *ReplicationConfig) SetRuntimeState(state *ReplicationState)
func (*ReplicationConfig)StreamToTaskConfig¶added inv1.4.14
func (rd *ReplicationConfig) StreamToTaskConfig(stream *ReplicationStreamConfig, namestring, env map[string]string) (cfgConfig, errerror)
func (ReplicationConfig)StreamsOrdered¶added inv1.0.6
func (rdReplicationConfig) StreamsOrdered() []string
StreamsOrdered returns the stream names as ordered in the YAML file
typeReplicationState¶added inv1.4.1
type ReplicationState struct {State map[string]map[string]any `json:"state,omitempty"`Store map[string]any `json:"store,omitempty"`Env map[string]any `json:"env,omitempty"`TimestampDateTimeState `json:"timestamp,omitempty"`ExecutionExecutionState `json:"execution,omitempty"`SourceConnState `json:"source,omitempty"`TargetConnState `json:"target,omitempty"`Stream *StreamState `json:"stream,omitempty"`Object *ObjectState `json:"object,omitempty"`Runs map[string]*RunState `json:"runs,omitempty"`Run *RunState `json:"run,omitempty"`}ReplicationState is for runtime state
func (*ReplicationState)GetStore¶added inv1.5.1
func (rs *ReplicationState) GetStore() map[string]any
func (*ReplicationState)Marshall¶added inv1.4.1
func (rs *ReplicationState) Marshall()string
func (*ReplicationState)SetStateData¶added inv1.4.1
func (rs *ReplicationState) SetStateData(idstring, data map[string]any)
func (*ReplicationState)SetStateKeyValue¶added inv1.4.1
func (rs *ReplicationState) SetStateKeyValue(id, keystring, valueany)
func (*ReplicationState)SetStoreData¶added inv1.4.5
func (rs *ReplicationState) SetStoreData(keystring, valueany, delbool)
func (*ReplicationState)StepExecution¶added inv1.4.25
func (rs *ReplicationState) StepExecution() *PipelineStepExecution
func (*ReplicationState)TaskExecution¶added inv1.4.1
func (rs *ReplicationState) TaskExecution() *TaskExecution
typeReplicationStreamConfig¶added inv0.86.36
type ReplicationStreamConfig struct {IDstring `json:"id,omitempty" yaml:"id,omitempty"`Descriptionstring `json:"description,omitempty" yaml:"description,omitempty"`ModeMode `json:"mode,omitempty" yaml:"mode,omitempty"`Objectstring `json:"object,omitempty" yaml:"object,omitempty"`Select []string `json:"select,omitempty" yaml:"select,flow,omitempty"`Files []string `json:"files,omitempty" yaml:"files,omitempty"`// include/exclude filesWherestring `json:"where,omitempty" yaml:"where,omitempty"`PrimaryKeyIany `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"`UpdateKeystring `json:"update_key,omitempty" yaml:"update_key,omitempty"`SQLstring `json:"sql,omitempty" yaml:"sql,omitempty"`Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"`SourceOptions *SourceOptions `json:"source_options,omitempty" yaml:"source_options,omitempty"`TargetOptions *TargetOptions `json:"target_options,omitempty" yaml:"target_options,omitempty"`Schedulestring `json:"schedule,omitempty" yaml:"schedule,omitempty"`Disabledbool `json:"disabled,omitempty" yaml:"disabled,omitempty"`Single *bool `json:"single,omitempty" yaml:"single,omitempty"`Transformsany `json:"transforms,omitempty" yaml:"transforms,omitempty"`Columnsany `json:"columns,omitempty" yaml:"columns,omitempty"`HooksHookMap `json:"hooks,omitempty" yaml:"hooks,omitempty"`// contains filtered or unexported fields}func (*ReplicationStreamConfig)ObjectHasStreamVars¶added inv1.3.4
func (s *ReplicationStreamConfig) ObjectHasStreamVars()bool
func (*ReplicationStreamConfig)PrimaryKey¶added inv0.86.36
func (s *ReplicationStreamConfig) PrimaryKey() []string
typeRouteName¶
type RouteNamestring
RouteName is the name of a route
const (RouteStatusRouteName = "/status"RouteNoticeRouteName = "/notice"RouteErrorRouteName = "/error"RouteSignUpUserRouteName = "/sign-up"RouteUserRouteName = "/user"RouteForgotPasswordRouteName = "/forgot-password"RouteResetPasswordRouteName = "/reset-password"RouteLoginRouteName = "/login"RouteLogoutRouteName = "/logout"RouteProxyRouteName = "/p"RouteAppIndexRouteName = "/app"RouteAppLoginRouteName = "/app/login"RouteAppLogoutRouteName = "/app/logout"RouteAppAPIKeyRouteName = "/app/apikey"RouteAPIRouteName = "/api/v1"RouteMasterStatusRouteName = "/api/v1/master-status"RouteMasterDBResetRouteName = "/api/v1/master-db-reset"RouteUploadsRouteName = "/api/v1/uploads"RouteAPIAccountsRouteName = "/api/v1/accounts"RouteAPIProjectsRouteName = "/api/v1/projects"RouteAPIKeyRouteName = "/api/v1/apikey"RouteAPIUsersRouteName = "/api/v1/users"RouteAPIJobsRouteName = "/api/v1/jobs"RouteAPILogsRouteName = "/api/v1/logs"RouteAPIExecutionsRouteName = "/api/v1/executions"RouteAPIConnectionsRouteName = "/api/v1/connections"RouteAPIConnectionTestRouteName = "/api/v1/connection-test"RouteAPIResetPasswordRouteName = "/api/v1/reset-password"RouteAPIDataRequestRouteName = "/api/v1/data-request"RouteAPIWorkersRouteName = "/api/v1/workers"RouteAPISettingsRouteName = "/api/v1/settings"RouteAlertLogRouteName = "/alert/log"RouteWsRouteName = "/ws"RouteWsClientRouteName = "/ws/client"RouteWsWorkerRouteName = "/ws/worker")
typeRunFile¶added inv1.5.1
type RunFile struct {TypeRunFileTypeFileg.FileItemBodystring// contains filtered or unexported fields}typeRunFileType¶added inv1.5.1
type RunFileTypestring
const (RunFileReplicationRunFileType = "replication"RunFilePipelineRunFileType = "pipeline")
typeRunState¶added inv1.3.5
type RunState struct {IDstring `json:"id,omitempty"`Stream *StreamState `json:"stream"`Object *ObjectState `json:"object"`TotalBytesuint64 `json:"total_bytes"`TotalRowsuint64 `json:"total_rows"`StatusExecStatus `json:"status"`StartTime *time.Time `json:"start_time"`EndTime *time.Time `json:"end_time"`Durationint64 `json:"duration"`Error *string `json:"error"`IncrValueany `json:"incremental_value,omitempty"`Rangestring `json:"range,omitempty"`ConfigReplicationStreamConfig `json:"config"`Task *TaskExecution `json:"-"`Step *PipelineStepExecution `json:"-"`}typeRuntimeState¶added inv1.3.5
typeSource¶
type Source struct {Connstring `json:"conn,omitempty" yaml:"conn,omitempty"`Typedbio.Type `json:"type,omitempty" yaml:"type,omitempty"`Streamstring `json:"stream,omitempty" yaml:"stream,omitempty"`Select []string `json:"select,omitempty" yaml:"select,omitempty"`// Select or exclude columns. Exclude with prefix "-".Files []string `json:"files,omitempty" yaml:"files,omitempty"`// include/exclude filesWherestring `json:"where,omitempty" yaml:"where,omitempty"`Querystring `json:"query,omitempty" yaml:"query,omitempty"`PrimaryKeyIany `json:"primary_key,omitempty" yaml:"primary_key,omitempty"`UpdateKeystring `json:"update_key,omitempty" yaml:"update_key,omitempty"`Options *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"`Data map[string]any `json:"-" yaml:"-"`}Source is a source of data
func (*Source)HasPrimaryKey¶added inv1.0.50
func (*Source)HasUpdateKey¶added inv1.0.50
func (*Source)PrimaryKey¶added inv0.84.0
typeSourceOptions¶
type SourceOptions struct {EmptyAsNull *bool `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`Header *bool `json:"header,omitempty" yaml:"header,omitempty"`Flattenany `json:"flatten,omitempty" yaml:"flatten,omitempty"`FieldsPerRec *int `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`Format *dbio.FileType `json:"format,omitempty" yaml:"format,omitempty"`NullIf *string `json:"null_if,omitempty" yaml:"null_if,omitempty"`DatetimeFormatstring `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`SkipBlankLines *bool `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`Delimiterstring `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`Escapestring `json:"escape,omitempty" yaml:"escape,omitempty"`Quotestring `json:"quote,omitempty" yaml:"quote,omitempty"`MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`JmesPath *string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`Sheet *string `json:"sheet,omitempty" yaml:"sheet,omitempty"`Range *string `json:"range,omitempty" yaml:"range,omitempty"`Limit *int `json:"limit,omitempty" yaml:"limit,omitempty"`Offset *int `json:"offset,omitempty" yaml:"offset,omitempty"`ChunkSizeany `json:"chunk_size,omitempty" yaml:"chunk_size,omitempty"`ChunkCount *int `json:"chunk_count,omitempty" yaml:"chunk_count,omitempty"`ChunkExpr *string `json:"chunk_expr,omitempty" yaml:"chunk_expr,omitempty"`Encoding *iop.Encoding `json:"encoding,omitempty" yaml:"encoding,omitempty"`// columns & transforms were moved out of source_options//https://github.com/slingdata-io/sling-cli/issues/348Columnsany `json:"columns,omitempty" yaml:"columns,omitempty"`// legacyTransformsany `json:"transforms,omitempty" yaml:"transforms,omitempty"`// legacy}SourceOptions are connection and stream processing options
func (*SourceOptions)RangeStartEnd¶added inv1.4.1
func (so *SourceOptions) RangeStartEnd() (start, endstring)
func (*SourceOptions)SetDefaults¶added inv1.0.31
func (o *SourceOptions) SetDefaults(sourceOptionsSourceOptions)
typeStreamState¶added inv1.3.5
type StreamState struct {FileFolderstring `json:"file_folder,omitempty"`FileNamestring `json:"file_name,omitempty"`FileExtstring `json:"file_ext,omitempty"`FilePathstring `json:"file_path,omitempty"`Namestring `json:"name,omitempty"`Schemastring `json:"schema,omitempty"`SchemaLowerstring `json:"schema_lower,omitempty"`SchemaUpperstring `json:"schema_upper,omitempty"`Tablestring `json:"table,omitempty"`TableLowerstring `json:"table_lower,omitempty"`TableUpperstring `json:"table_upper,omitempty"`FullNamestring `json:"full_name,omitempty"`}typeTarget¶
type Target struct {Connstring `json:"conn,omitempty" yaml:"conn,omitempty"`Typedbio.Type `json:"type,omitempty" yaml:"type,omitempty"`Objectstring `json:"object,omitempty" yaml:"object,omitempty"`Columnsany `json:"columns,omitempty" yaml:"columns,omitempty"`Options *TargetOptions `json:"options,omitempty" yaml:"options,omitempty"`Data map[string]any `json:"-" yaml:"-"`TmpTableCreatedbool `json:"-" yaml:"-"`// contains filtered or unexported fields}Target is a target of data
func (*Target)ObjectFileFormat¶added inv1.2.25
typeTargetOptions¶
type TargetOptions struct {Header *bool `json:"header,omitempty" yaml:"header,omitempty"`Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`Concurrencyint `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`BatchLimit *int64 `json:"batch_limit,omitempty" yaml:"batch_limit,omitempty"`DatetimeFormatstring `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`Delimiterstring `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`FileMaxRows *int64 `json:"file_max_rows,omitempty" yaml:"file_max_rows,omitempty"`FileMaxBytes *int64 `json:"file_max_bytes,omitempty" yaml:"file_max_bytes,omitempty"`Formatdbio.FileType `json:"format,omitempty" yaml:"format,omitempty"`MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`UseBulk *bool `json:"use_bulk,omitempty" yaml:"use_bulk,omitempty"`IgnoreExisting *bool `json:"ignore_existing,omitempty" yaml:"ignore_existing,omitempty"`DeleteMissing *string `json:"delete_missing,omitempty" yaml:"delete_missing,omitempty"`AddNewColumns *bool `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"`AdjustColumnType *bool `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"`ColumnCasing *iop.ColumnCasing `json:"column_casing,omitempty" yaml:"column_casing,omitempty"`ColumnTyping *iop.ColumnTyping `json:"column_typing,omitempty" yaml:"column_typing,omitempty"`Encoding *iop.Encoding `json:"encoding,omitempty" yaml:"encoding,omitempty"`DirectInsert *bool `json:"direct_insert,omitempty" yaml:"direct_insert,omitempty"`TableKeysdatabase.TableKeys `json:"table_keys,omitempty" yaml:"table_keys,omitempty"`TableTmpstring `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"`TableDDL *string `json:"table_ddl,omitempty" yaml:"table_ddl,omitempty"`PreSQL *string `json:"pre_sql,omitempty" yaml:"pre_sql,omitempty"`PostSQL *string `json:"post_sql,omitempty" yaml:"post_sql,omitempty"`IsolationLevel *database.IsolationLevel `json:"isolation_level,omitempty" yaml:"isolation_level,omitempty"`MergeStrategy *database.MergeStrategy `json:"merge_strategy,omitempty" yaml:"merge_strategy,omitempty"`}TargetOptions are target connection and stream processing options
func (*TargetOptions)SetDefaults¶added inv1.0.31
func (o *TargetOptions) SetDefaults(targetOptionsTargetOptions)
typeTaskExecution¶
type TaskExecution struct {ExecIDstring `json:"exec_id"`Config *Config `json:"config"`TypeJobType `json:"type"`StatusExecStatus `json:"status"`Errerror `json:"error"`StartTime *time.Time `json:"start_time"`EndTime *time.Time `json:"end_time"`Bytesuint64 `json:"bytes"`Context *g.Context `json:"-"`Progressstring `json:"progress"`OutputLines chan *g.LogLineReplication *ReplicationConfig `json:"replication"`ProgressHist []string `json:"progress_hist"`PBar *ProgressBar `json:"-"`ProcStatsStartg.ProcStats `json:"-"`// process stats at beginning// contains filtered or unexported fields}TaskExecution is a sling ELT task run, synonymous to an execution
funcNewTask¶
func NewTask(execIDstring, cfg *Config) (t *TaskExecution)
NewTask creates a Sling task with given configuration
func (*TaskExecution)AddCleanupTaskFirst¶added inv1.1.8
func (t *TaskExecution) AddCleanupTaskFirst(f func())
func (*TaskExecution)AddCleanupTaskLast¶added inv1.1.8
func (t *TaskExecution) AddCleanupTaskLast(f func())
func (*TaskExecution)AppendOutput¶added inv1.1.6
func (t *TaskExecution) AppendOutput(ll *g.LogLine)
func (*TaskExecution)Cleanup¶added inv0.84.3
func (t *TaskExecution) Cleanup()
func (*TaskExecution)Data¶added inv1.2.19
func (t *TaskExecution) Data() *iop.Dataset
Data return the dataset object
func (*TaskExecution)Df¶added inv1.2.7
func (t *TaskExecution) Df() *iop.Dataflow
Df return the dataflow object
func (*TaskExecution)Execute¶
func (t *TaskExecution) Execute()error
Execute runs a Sling task.This may be a file/db to file/db transfer
func (*TaskExecution)ExecuteHooks¶added inv1.2.23
func (t *TaskExecution) ExecuteHooks(stageHookStage) (errerror)
func (*TaskExecution)GetBytes¶
func (t *TaskExecution) GetBytes() (inBytes, outBytesuint64)
GetBytes return the current total of bytes processed
func (*TaskExecution)GetBytesString¶
func (t *TaskExecution) GetBytesString() (sstring)
func (*TaskExecution)GetCount¶
func (t *TaskExecution) GetCount() (countuint64)
GetCount return the current count of rows processed
func (*TaskExecution)GetRate¶
func (t *TaskExecution) GetRate(secWindowint) (rowRate, byteRateint64)
GetRate return the speed of flow (rows / sec and bytes / sec)secWindow is how many seconds back to measure (0 is since beginning)
func (*TaskExecution)GetSourceTable¶added inv1.3.4
func (t *TaskExecution) GetSourceTable() (sTabledatabase.Table, errerror)
func (*TaskExecution)GetStateMap¶added inv1.2.23
func (t *TaskExecution) GetStateMap() map[string]any
func (*TaskExecution)GetTargetTable¶added inv1.3.4
func (t *TaskExecution) GetTargetTable(tempTableSuffix ...string) (tTabledatabase.Table, errerror)
func (*TaskExecution)GetTotalBytes¶
func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytesuint64)
GetTotalBytes gets the inbound/oubound bytes of the task
func (*TaskExecution)IsStalled¶
func (t *TaskExecution) IsStalled(windowfloat64)bool
IsStalled determines if the task has stalled (no row increment)
func (*TaskExecution)ReadFromApi¶added inv1.4.5
func (t *TaskExecution) ReadFromApi(cfg *Config, srcConn *api.APIConnection) (df *iop.Dataflow, errerror)
ReadFromApi reads from a source api
func (*TaskExecution)ReadFromDB¶
func (t *TaskExecution) ReadFromDB(cfg *Config, srcConndatabase.Connection) (df *iop.Dataflow, errerror)
ReadFromDB reads from a source database
func (*TaskExecution)ReadFromFile¶
func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, errerror)
ReadFromFile reads from a source file
func (*TaskExecution)SetProgress¶
func (t *TaskExecution) SetProgress(textstring, args ...interface{})
SetProgress sets the progress
func (*TaskExecution)StateSet¶added inv1.4.25
func (t *TaskExecution) StateSet()
func (*TaskExecution)WriteToDb¶
func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConndatabase.Connection) (cntuint64, errerror)
WriteToDb writes to a target DBcreate temp tableload into temp tableinsert / incremental / replace into target table
func (*TaskExecution)WriteToFile¶
WriteToFile writes to a target file