Movatterモバイル変換


[0]ホーム

URL:


sling

package
v1.5.1Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License:GPL-3.0Imports:40Imported by:1

Details

Repository

github.com/slingdata-io/sling-cli

Links

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AllExecStatus = []struct {ValueExecStatusTSNamestring}{{ExecStatusCreated, "ExecStatusCreated"},{ExecStatusQueued, "ExecStatusQueued"},{ExecStatusStarted, "ExecStatusStarted"},{ExecStatusRunning, "ExecStatusRunning"},{ExecStatusSuccess, "ExecStatusSuccess"},{ExecStatusTerminated, "ExecStatusTerminated"},{ExecStatusInterrupted, "ExecStatusInterrupted"},{ExecStatusTimedOut, "ExecStatusTimedOut"},{ExecStatusError, "ExecStatusError"},{ExecStatusSkipped, "ExecStatusSkipped"},{ExecStatusStalled, "ExecStatusStalled"},}
View Source
var AllJobType = []struct {ValueJobTypeTSNamestring}{{ConnTest, "ConnTest"},{ConnDiscover, "ConnDiscover"},{ConnExec, "ConnExec"},{DbToDb, "DbToDb"},{FileToDB, "FileToDB"},{DbToFile, "DbToFile"},{FileToFile, "FileToFile"},{ApiToDB, "ApiToDB"},{ApiToFile, "ApiToFile"},{DbSQL, "DbSQL"},}
View Source
var AllMode = []struct {ValueModeTSNamestring}{{FullRefreshMode, "FullRefreshMode"},{IncrementalMode, "IncrementalMode"},{TruncateMode, "TruncateMode"},{SnapshotMode, "SnapshotMode"},{BackfillMode, "BackfillMode"},}
View Source
var HookRunReplication func(string, *Config, ...string)error
View Source
var ParseHook = func(any,ParseOptions) (Hook,error) {returnnil,g.Error("please use the official sling-cli release for using hooks and pipelines")}
View Source
var SourceDBOptionsDefault =SourceOptions{EmptyAsNull:g.Bool(false),DatetimeFormat: "AUTO",MaxDecimals:g.Int(-1),}
View Source
var SourceFileOptionsDefault =SourceOptions{EmptyAsNull:g.Bool(false),Header:g.Bool(true),Flatten:g.Bool(false),Compression:iop.CompressorTypePtr(iop.AutoCompressorType),NullIf:g.String("NULL"),DatetimeFormat: "AUTO",SkipBlankLines:g.Bool(false),FieldsPerRec:g.Int(-1),MaxDecimals:g.Int(-1),}
View Source
var StoreSet = func(valany)error { returnnil }

Set in the store/store.go file for history keeping

View Source
var TargetDBOptionsDefault =TargetOptions{FileMaxRows:lo.Ternary(os.Getenv("FILE_MAX_ROWS") != "",g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))),g.Int64(0),),UseBulk:g.Bool(true),AddNewColumns:g.Bool(true),AdjustColumnType:g.Bool(false),DatetimeFormat:   "auto",MaxDecimals:g.Int(-1),ColumnCasing:g.Ptr(iop.NormalizeColumnCasing),}
View Source
var TargetFileOptionsDefault =TargetOptions{Header:g.Bool(true),Compression:lo.Ternary(os.Getenv("COMPRESSION") != "",iop.CompressorTypePtr(iop.CompressorType(strings.ToLower(os.Getenv("COMPRESSION")))),iop.CompressorTypePtr(iop.AutoCompressorType),),Concurrency:lo.Ternary(os.Getenv("CONCURRENCY") != "",cast.ToInt(os.Getenv("CONCURRENCY")),7,),FileMaxRows:lo.Ternary(os.Getenv("FILE_MAX_ROWS") != "",g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))),g.Int64(0),),FileMaxBytes:lo.Ternary(os.Getenv("FILE_MAX_BYTES") != "",g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_BYTES"))),g.Int64(0),),Format:dbio.FileTypeNone,UseBulk:g.Bool(true),AddNewColumns:g.Bool(true),DatetimeFormat: "auto",Delimiter:      ",",MaxDecimals:g.Int(-1),ColumnCasing:g.Ptr(iop.NormalizeColumnCasing),}

Functions

funcClientDelete

func ClientDelete(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)

ClientDelete sends a DELETE request

funcClientGet

func ClientGet(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)

ClientGet sends a GET request

funcClientOptions

func ClientOptions(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)

ClientOptions sends a HEAD request

funcClientPatch

func ClientPatch(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)

ClientPatch sends a PATCH request

funcClientPost

func ClientPost(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)

ClientPost sends a POST request

funcClientPut

func ClientPut(serverURLstring, routeRouteName, m map[string]interface{}, headers map[string]string) (respStrstring, errerror)

ClientPut sends a PUT request

funcErrorHelperadded inv1.0.61

func ErrorHelper(errerror) (helpStringstring)

funcGetJWTFromKey

func GetJWTFromKey(masterServerURL, keystring) (string,error)

GetJWTFromKey logs in and returns the JWT based on the provided key

funcGetPipelineStoreEnvadded inv1.5.1

func GetPipelineStoreEnv() (store map[string]any)

funcGetSQLTextadded inv1.1.14

func GetSQLText(sqlStringPathstring) (string,error)

GetSQLText process source sql file / text

funcIsJSONorYAMLadded inv1.2.14

func IsJSONorYAML(payloadstring)bool

IsJSONorYAML detects a JSON or YAML payload

funcIsPipelineRunModeadded inv1.4.25

func IsPipelineRunMode()bool

funcIsReplicationRunModeadded inv1.4.25

func IsReplicationRunMode()bool

funcNewExecIDadded inv1.1.13

func NewExecID()string

funcSetPipelineStoreEnvadded inv1.5.1

func SetPipelineStoreEnv(store map[string]any)

funcSetStreamDefaultsadded inv0.86.36

func SetStreamDefaults(namestring, stream *ReplicationStreamConfig, replicationCfgReplicationConfig)

funcSling

func Sling(cfg *Config) (errerror)

Sling accepts a configuration and runs an Extract-Load task

Types

typeConfig

type Config struct {SourceSource            `json:"source,omitempty" yaml:"source,omitempty"`TargetTarget            `json:"target" yaml:"target"`ModeMode              `json:"mode,omitempty" yaml:"mode,omitempty"`Transformsany               `json:"transforms,omitempty" yaml:"transforms,omitempty"`OptionsConfigOptions     `json:"options,omitempty" yaml:"options,omitempty"`Env        map[string]string `json:"env,omitempty" yaml:"env,omitempty"`StreamNamestring                   `json:"stream_name,omitempty" yaml:"stream_name,omitempty"`ReplicationStream *ReplicationStreamConfig `json:"replication_stream,omitempty" yaml:"replication_stream,omitempty"`DependsOn         []string                 `json:"depends_on,omitempty" yaml:"depends_on,omitempty"`SrcConnconnection.Connection `json:"-" yaml:"-"`TgtConnconnection.Connection `json:"-" yaml:"-"`Preparedbool                  `json:"-" yaml:"-"`IncrementalValany    `json:"incremental_val" yaml:"incremental_val"`IncrementalValStrstring `json:"incremental_val_str" yaml:"incremental_val_str"`IncrementalGTEbool   `json:"incremental_gte,omitempty" yaml:"incremental_gte,omitempty"`MetadataLoadedAt  *bool `json:"-" yaml:"-"`MetadataStreamURLbool  `json:"-" yaml:"-"`MetadataRowNumbool  `json:"-" yaml:"-"`MetadataRowIDbool  `json:"-" yaml:"-"`MetadataExecIDbool  `json:"-" yaml:"-"`// contains filtered or unexported fields}

Config is the new config struct

funcNewConfig

func NewConfig(cfgStrstring) (cfg *Config, errerror)

NewConfig return a config object from a YAML / JSON string

func (*Config)AsReplicationadded inv1.1.14

func (cfg *Config) AsReplication() (rcReplicationConfig)

func (*Config)ClearTableForChunkLoadWithRangeadded inv1.4.14

func (cfg *Config) ClearTableForChunkLoadWithRange() (errerror)

ClearTableForChunkLoadWithRange clears the table for chunk load with in modefull-refresh or truncate

func (*Config)ColumnsPreparedadded inv1.2.16

func (cfg *Config) ColumnsPrepared() (columnsiop.Columns)

ColumnsPrepared returns the prepared columns

func (*Config)DetermineTypeadded inv0.84.9

func (cfg *Config) DetermineType() (TypeJobType, errerror)

func (*Config)FormatTargetObjectNameadded inv0.85.54

func (cfg *Config) FormatTargetObjectName() (errerror)

func (*Config)GetFormatMapadded inv1.0.31

func (cfg *Config) GetFormatMap() (m map[string]any, errerror)

GetFormatMap returns a map to format a string with provided with variables

func (*Config)HasIncrementalValadded inv1.2.21

func (cfg *Config) HasIncrementalVal()bool

HasIncrementalVal returns true there is a non-null incremental value

func (*Config)HasWildcardadded inv1.1.14

func (cfg *Config) HasWildcard()bool

func (*Config)IgnoreExistingadded inv1.2.10

func (cfg *Config) IgnoreExisting()bool

IgnoreExisting returns true target_options.ignore_existing is true

func (*Config)IsFileStreamWithStateAndPartsadded inv1.3.6

func (cfg *Config) IsFileStreamWithStateAndParts()bool

func (*Config)IsFullRefreshWithChunkingadded inv1.4.14

func (cfg *Config) IsFullRefreshWithChunking()bool

IsFullRefreshWithChunking returns true is the stream is chunking

func (*Config)IsFullRefreshWithRangeadded inv1.4.14

func (cfg *Config) IsFullRefreshWithRange()bool

IsFullRefreshWithRange returns true is the stream has range

func (*Config)IsIncrementalWithChunkingadded inv1.4.14

func (cfg *Config) IsIncrementalWithChunking()bool

IsIncrementalWithChunking returns true is the stream is chunking

func (*Config)IsIncrementalWithRangeadded inv1.4.14

func (cfg *Config) IsIncrementalWithRange()bool

IsIncrementalWithRange returns true is the stream has range

func (*Config)IsTruncateWithChunkingadded inv1.4.14

func (cfg *Config) IsTruncateWithChunking()bool

IsTruncateWithChunking returns true is the stream is chunking

func (*Config)IsTruncateWithRangeadded inv1.4.14

func (cfg *Config) IsTruncateWithRange()bool

IsTruncateWithRange returns true is the stream has range

func (*Config)MD5added inv1.1.6

func (cfg *Config) MD5()string

func (*Config)Prepare

func (cfg *Config) Prepare() (errerror)

Prepare prepares the config

func (*Config)ReplicationModeadded inv0.87.34

func (cfg *Config) ReplicationMode()bool

ReplicationMode returns true for replication mode

func (*Config)Scan

func (cfg *Config) Scan(valueany)error

Scan scan value into Jsonb, implements sql.Scanner interface

func (*Config)SetDefault

func (cfg *Config) SetDefault()

SetDefault sets default options

func (*Config)SrcConnMD5added inv1.2.15

func (cfg *Config) SrcConnMD5()string

func (*Config)StreamIDadded inv1.2.15

func (cfg *Config) StreamID()string

func (*Config)TgtConnMD5added inv1.2.15

func (cfg *Config) TgtConnMD5()string

func (*Config)TransformsPreparedadded inv1.2.16

func (cfg *Config) TransformsPrepared() (stageTransforms []map[string]string)

TransformsPrepared returns the transforms columns

func (*Config)Unmarshal

func (cfg *Config) Unmarshal(cfgStrstring) (errerror)

Unmarshal parse a configuration file path or config text

func (Config)Value

func (cfgConfig) Value() (driver.Value,error)

Value return json value, implement driver.Valuer interface

func (*Config)WithChunkingadded inv1.4.14

func (cfg *Config) WithChunking()bool

typeConfigOptions

type ConfigOptions struct {Debugbool `json:"debug,omitempty" yaml:"debug,omitempty"`StdInbool `json:"-"`// whether stdin is passedStdOutbool `json:"stdout,omitempty" yaml:"stdout,omitempty"`// whether to output to stdoutDatasetbool `json:"dataset,omitempty" yaml:"dataset,omitempty"`// whether to output to dataset}

ConfigOptions are configuration options

typeConnStateadded inv1.3.5

type ConnState struct {Namestring    `json:"name,omitempty"`Typedbio.Type `json:"type,omitempty"`Kinddbio.Kind `json:"kind,omitempty"`Bucketstring    `json:"bucket,omitempty"`Containerstring    `json:"container,omitempty"`Databasestring    `json:"database,omitempty"`Instancestring    `json:"instance,omitempty"`Schemastring    `json:"schema,omitempty"`}

typeDateTimeStateadded inv1.3.5

type DateTimeState struct {Timestamptime.Time `json:"timestamp,omitempty"`Unixint64     `json:"unix,omitempty"`FileNamestring    `json:"file_name,omitempty"`Rfc3339string    `json:"rfc3339,omitempty"`Datestring    `json:"date,omitempty"`Datetimestring    `json:"datetime,omitempty"`YYYYstring    `json:"YYYY,omitempty"`YYstring    `json:"YY,omitempty"`MMMstring    `json:"MMM,omitempty"`MMstring    `json:"MM,omitempty"`DDstring    `json:"DD,omitempty"`DDDstring    `json:"DDD,omitempty"`HHstring    `json:"HH,omitempty"`}

func (*DateTimeState)Updateadded inv1.3.5

func (dts *DateTimeState) Update()

typeExecStatus

type ExecStatusstring

ExecStatus is the status of an execution

const (// ExecStatusCreated = createdExecStatusCreatedExecStatus = "created"// ExecStatusQueued = queuedExecStatusQueuedExecStatus = "queued"// ExecStatusSubmitted = submittedExecStatusSubmittedExecStatus = "submitted"// ExecStatusStarted = startedExecStatusStartedExecStatus = "started"// ExecStatusRunning = runningExecStatusRunningExecStatus = "running"// ExecStatusSuccess = successExecStatusSuccessExecStatus = "success"// ExecStatusTerminated = terminatedExecStatusTerminatedExecStatus = "terminated"// ExecStatusInterrupted = interruptedExecStatusInterruptedExecStatus = "interrupted"// ExecStatusTimedOut = timed-out (when no heartbeat sent for 30 sec)ExecStatusTimedOutExecStatus = "timed-out"// ExecStatusError = errorExecStatusErrorExecStatus = "error"// ExecStatusSkipped = skippedExecStatusSkippedExecStatus = "skipped"// ExecStatusStalled = stalled (when still heartbeating, but rows are unchanged for a while)ExecStatusStalledExecStatus = "stalled"// ExecStatusWarning = cancelledExecStatusCancelledExecStatus = "cancelled"// ExecStatusWarning = warningExecStatusWarningExecStatus = "warning")

func (ExecStatus)IsFailure

func (sExecStatus) IsFailure()bool

IsFailure returns true if an execution is failed

func (ExecStatus)IsFinished

func (sExecStatus) IsFinished()bool

IsFinished returns true if an execution is finished

func (ExecStatus)IsRunning

func (sExecStatus) IsRunning()bool

IsRunning returns true if an execution is running

func (ExecStatus)IsSuccess

func (sExecStatus) IsSuccess()bool

IsSuccess returns true if an execution is successful

func (ExecStatus)IsWarningadded inv1.2.23

func (sExecStatus) IsWarning()bool

IsWarning returns true if an execution is warning

typeExecutionStateadded inv1.3.6

type ExecutionState struct {IDstring     `json:"id"`FilePathstring     `json:"string"`TotalBytesuint64     `json:"total_bytes"`TotalRowsuint64     `json:"total_rows"`StatusStatusMap  `json:"status"`StartTime  *time.Time `json:"start_time"`EndTime    *time.Time `json:"end_time"`Durationint64      `json:"duration"`Error      *string    `json:"error"`}

typeExecutionStatus

type ExecutionStatus struct {JobIDint        `json:"job_id,omitempty"`ExecIDint64      `json:"exec_id,omitempty"`StatusExecStatus `json:"status,omitempty"`Textstring     `json:"text,omitempty"`Rowsuint64     `json:"rows,omitempty"`Bytesuint64     `json:"bytes,omitempty"`Percentint        `json:"percent,omitempty"`Stalledbool       `json:"stalled,omitempty"`Duration    *int       `json:"duration,omitempty"`AvgDurationint        `json:"avg_duration,omitempty"`}

ExecutionStatus is an execution status object

typeHookadded inv1.2.23

type Hook interface {Type()HookTypeID()stringContext() *g.ContextSetExtra(map[string]any)SetContext(*g.Context)Stage()HookStageStatus()ExecStatusExecute()errorPayloadMap() map[string]anyExecuteOnDone(error) (OnFailType,error)}

typeHookKindadded inv1.4.1

type HookKindstring
const (HookKindHookHookKind = "hook"HookKindStepHookKind = "step")

typeHookMapadded inv1.3.5

type HookMap struct {Start     []any `json:"start,omitempty" yaml:"start,omitempty"`End       []any `json:"end,omitempty" yaml:"end,omitempty"`Pre       []any `json:"pre,omitempty" yaml:"pre,omitempty"`Post      []any `json:"post,omitempty" yaml:"post,omitempty"`PreMerge  []any `json:"pre_merge,omitempty" yaml:"pre_merge,omitempty"`PostMerge []any `json:"post_merge,omitempty" yaml:"post_merge,omitempty"`}

func (HookMap)IsEmptyadded inv1.5.1

func (hmHookMap) IsEmpty()bool

typeHookStageadded inv1.3.5

type HookStagestring
const (HookStagePreHookStage = "pre"HookStagePostHookStage = "post"HookStagePreMergeHookStage = "pre_merge"HookStagePostMergeHookStage = "post_merge"HookStageStartHookStage = "start"HookStageEndHookStage = "end")

typeHookTypeadded inv1.2.23

type HookTypestring

typeHooksadded inv1.2.23

type Hooks []Hook

func (Hooks)Executeadded inv1.3.5

func (hsHooks) Execute() (errerror)

typeJobType

type JobTypestring

JobType is an enum type for jobs

const ApiToDBJobType = "api-db"

ApiToDB is from api to db

const ApiToFileJobType = "api-file"

ApiToFile is from api to file

const ConnDiscoverJobType = "conn-discover"

ConnTest is for a connection discover

const ConnExecJobType = "conn-exec"

ConnTest is for a connection exec

const ConnTestJobType = "conn-test"

ConnTest is for a connection test

const DbSQLJobType = "db-sql"

DbSQL is for a db sql query

const DbToDbJobType = "db-db"

DbToDb is from db to db

const DbToFileJobType = "db-file"

DbToFile is from db to file

const FileToDBJobType = "file-db"

FileToDB is from db to db

const FileToFileJobType = "file-file"

FileToFile is from file to file

typeMode

type Modestring

Mode is a load mode

const (// TruncateMode is to truncateTruncateModeMode = "truncate"// FullRefreshMode is to dropFullRefreshModeMode = "full-refresh"// IncrementalMode is to incrementalIncrementalModeMode = "incremental"// SnapshotMode is to snapshotSnapshotModeMode = "snapshot"// BackfillMode is to backfillBackfillModeMode = "backfill")

typeObjectStateadded inv1.3.5

type ObjectState struct {Schemastring `json:"schema,omitempty"`Tablestring `json:"table,omitempty"`Namestring `json:"name,omitempty"`FullNamestring `json:"full_name,omitempty"`TempSchemastring `json:"temp_schema,omitempty"`TempTablestring `json:"temp_table,omitempty"`TempFullNamestring `json:"temp_full_name,omitempty"`}

typeOnFailTypeadded inv1.4.1

type OnFailTypestring

typeParseOptionsadded inv1.3.5

type ParseOptions struct {// contains filtered or unexported fields}

typePipelineadded inv1.4.1

type Pipeline struct {Steps []any          `json:"steps,omitempty" yaml:"steps,omitempty"`Env   map[string]any `json:"env,omitempty" yaml:"env,omitempty"`Context     *g.Context             `json:"-"`Outputstrings.Builder        `json:"-"`OutputLines chan *g.LogLine        `json:"-"`CurrentStep *PipelineStepExecution `json:"-"`MD5string                 `json:"-"`FileNamestring                 `json:"-"`Bodystring                 `json:"body,omitempty" yaml:"-"`// raw body of pipeline// contains filtered or unexported fields}

funcLoadPipelineConfigadded inv1.4.1

func LoadPipelineConfig(contentstring) (pipeline *Pipeline, errerror)

funcLoadPipelineConfigFromFileadded inv1.4.1

func LoadPipelineConfigFromFile(cfgPathstring) (pipeline *Pipeline, errerror)

func (*Pipeline)Executeadded inv1.4.1

func (pl *Pipeline) Execute() (errerror)

Execute executes the pipeline steps using PipelineStepExecution

func (*Pipeline)GetStepsadded inv1.4.25

func (pl *Pipeline) GetSteps()Hooks

func (*Pipeline)RuntimeStateadded inv1.4.1

func (pl *Pipeline) RuntimeState() (_ *PipelineState, errerror)

RuntimeState returns the state for use

typePipelineStateadded inv1.4.1

type PipelineState struct {State     map[string]map[string]any `json:"state,omitempty"`Store     map[string]any            `json:"store,omitempty"`Env       map[string]any            `json:"env,omitempty"`TimestampDateTimeState             `json:"timestamp,omitempty"`Runs      map[string]*RunState      `json:"runs,omitempty"`Run       *RunState                 `json:"run,omitempty"`}

func (*PipelineState)GetStoreadded inv1.5.1

func (ps *PipelineState) GetStore() map[string]any

func (*PipelineState)Marshalladded inv1.4.1

func (ps *PipelineState) Marshall()string

func (*PipelineState)SetStateDataadded inv1.4.1

func (ps *PipelineState) SetStateData(idstring, data map[string]any)

func (*PipelineState)SetStateKeyValueadded inv1.4.1

func (ps *PipelineState) SetStateKeyValue(id, keystring, valueany)

func (*PipelineState)SetStoreDataadded inv1.4.5

func (ps *PipelineState) SetStoreData(keystring, valueany, delbool)

func (*PipelineState)StepExecutionadded inv1.4.25

func (ps *PipelineState) StepExecution() *PipelineStepExecution

func (*PipelineState)TaskExecutionadded inv1.4.1

func (ps *PipelineState) TaskExecution() *TaskExecution

typePipelineStepExecutionadded inv1.4.25

type PipelineStepExecution struct {ExecIDstring          `json:"exec_id"`StatusExecStatus      `json:"status"`Errerror           `json:"error"`StartTime   *time.Time      `json:"start_time"`EndTime     *time.Time      `json:"end_time"`Progressstring          `json:"progress"`Outputstrings.Builder `json:"-"`OutputLines chan *g.LogLine `json:"-"`Pipeline    *Pipeline       `json:"-"`Map         map[string]any  `json:"-"`StepHook            `json:"-"`// The specific step to execute}

PipelineStepExecution represents a single step execution context

func (*PipelineStepExecution)Contextadded inv1.4.25

func (pse *PipelineStepExecution) Context() *g.Context

func (*PipelineStepExecution)Executeadded inv1.4.25

func (pse *PipelineStepExecution) Execute(skipbool) (errerror)

Execute executes a single pipeline step

func (*PipelineStepExecution)StateSetadded inv1.4.25

func (pse *PipelineStepExecution) StateSet()

typeProgressBar

type ProgressBar struct {// contains filtered or unexported fields}

funcNewPBar

func NewPBar(dtime.Duration) *ProgressBar

NewPBar creates a new progress bar

func (*ProgressBar)Finish

func (pb *ProgressBar) Finish()

func (*ProgressBar)SetStatus

func (pb *ProgressBar) SetStatus(statusstring)

SetStatus sets the progress bar status

func (*ProgressBar)Start

func (pb *ProgressBar) Start()

typeReplicationConfigadded inv0.86.36

type ReplicationConfig struct {Sourcestring                              `json:"source,omitempty" yaml:"source,omitempty"`Targetstring                              `json:"target,omitempty" yaml:"target,omitempty"`HooksHookMap                             `json:"hooks,omitempty" yaml:"hooks,omitempty"`DefaultsReplicationStreamConfig             `json:"defaults,omitempty" yaml:"defaults,omitempty"`Streams  map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"`Env      map[string]any                      `json:"env,omitempty" yaml:"env,omitempty"`// Tasks are compiled tasksTasks    []*Config `json:"tasks"`Compiledbool      `json:"compiled"`FailErrstring// error string to fail all (e.g. when the first tasks fails to connect)Context *g.Context `json:"-"`// contains filtered or unexported fields}

funcLoadReplicationConfigadded inv0.87.17

func LoadReplicationConfig(contentstring) (configReplicationConfig, errerror)

funcLoadReplicationConfigFromFileadded inv1.2.10

func LoadReplicationConfigFromFile(cfgPathstring) (configReplicationConfig, errerror)

funcUnmarshalReplicationadded inv0.86.39

func UnmarshalReplication(replicYAMLstring) (configReplicationConfig, errerror)

UnmarshalReplication converts a yaml file to a replication

func (*ReplicationConfig)AddStreamadded inv1.1.15

func (rd *ReplicationConfig) AddStream(keystring, cfg *ReplicationStreamConfig)

func (*ReplicationConfig)Compileadded inv1.2.2

func (rd *ReplicationConfig) Compile(cfgOverwrite *Config, selectStreams ...string) (errerror)

Compile compiles the replication into tasks

func (*ReplicationConfig)DeleteStreamadded inv1.1.15

func (rd *ReplicationConfig) DeleteStream(keystring)

func (*ReplicationConfig)ExecuteReplicationHookadded inv1.4.15

func (rd *ReplicationConfig) ExecuteReplicationHook(stageHookStage) (errerror)

func (ReplicationConfig)GetStreamadded inv1.1.15

func (rdReplicationConfig) GetStream(namestring) (streamNamestring, cfg *ReplicationStreamConfig, foundbool)

GetStream returns the stream if the it exists

func (*ReplicationConfig)JSONadded inv1.2.14

func (rd *ReplicationConfig) JSON()string

JSON returns json payload

func (*ReplicationConfig)MD5added inv1.1.6

func (rd *ReplicationConfig) MD5()string

MD5 returns a md5 hash of the json payload

func (ReplicationConfig)MatchStreamsadded inv1.1.15

func (rdReplicationConfig) MatchStreams(patternstring) (streams map[string]*ReplicationStreamConfig)

GetStream returns the stream if the it exists

func (ReplicationConfig)Normalizeadded inv1.1.14

func (rdReplicationConfig) Normalize(nstring)string

Normalize normalized the name

func (*ReplicationConfig)OriginalCfgadded inv1.0.63

func (rd *ReplicationConfig) OriginalCfg()string

OriginalCfg returns original config

func (*ReplicationConfig)ParseReplicationHookadded inv1.3.6

func (rd *ReplicationConfig) ParseReplicationHook(stageHookStage) (errerror)

func (*ReplicationConfig)ParseStreamHookadded inv1.3.5

func (rd *ReplicationConfig) ParseStreamHook(stageHookStage, rs *ReplicationStreamConfig) (hooksHooks, errerror)

func (*ReplicationConfig)ProcessChunksadded inv1.4.1

func (rd *ReplicationConfig) ProcessChunks() (errerror)

func (*ReplicationConfig)ProcessWildcardsadded inv0.87.18

func (rd *ReplicationConfig) ProcessWildcards() (errerror)

ProcessWildcards process the streams using wildcardssuch as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`

func (*ReplicationConfig)ProcessWildcardsAPIadded inv1.4.5

func (rd *ReplicationConfig) ProcessWildcardsAPI(cconnection.Connection, patterns []string) (wildcardsWildcards, errerror)

func (*ReplicationConfig)ProcessWildcardsDatabaseadded inv1.0.68

func (rd *ReplicationConfig) ProcessWildcardsDatabase(cconnection.Connection, patterns []string) (wildcardsWildcards, errerror)

func (*ReplicationConfig)ProcessWildcardsFileadded inv1.0.68

func (rd *ReplicationConfig) ProcessWildcardsFile(cconnection.Connection, patterns []string) (wildcardsWildcards, errerror)

func (*ReplicationConfig)RuntimeStateadded inv1.3.5

func (rd *ReplicationConfig) RuntimeState() (_ *ReplicationState, errerror)

StateMap returns map for use

func (*ReplicationConfig)Scanadded inv0.86.36

func (rd *ReplicationConfig) Scan(value interface{})error

Scan scan value into Jsonb, implements sql.Scanner interface

func (*ReplicationConfig)StreamToTaskConfigadded inv1.4.14

func (rd *ReplicationConfig) StreamToTaskConfig(stream *ReplicationStreamConfig, namestring, env map[string]string) (cfgConfig, errerror)

func (ReplicationConfig)StreamsOrderedadded inv1.0.6

func (rdReplicationConfig) StreamsOrdered() []string

StreamsOrdered returns the stream names as ordered in the YAML file

func (ReplicationConfig)Valueadded inv0.86.36

func (rdReplicationConfig) Value() (driver.Value,error)

Value return json value, implement driver.Valuer interface

typeReplicationStateadded inv1.4.1

type ReplicationState struct {State     map[string]map[string]any `json:"state,omitempty"`Store     map[string]any            `json:"store,omitempty"`Env       map[string]any            `json:"env,omitempty"`TimestampDateTimeState             `json:"timestamp,omitempty"`ExecutionExecutionState            `json:"execution,omitempty"`SourceConnState                 `json:"source,omitempty"`TargetConnState                 `json:"target,omitempty"`Stream    *StreamState              `json:"stream,omitempty"`Object    *ObjectState              `json:"object,omitempty"`Runs      map[string]*RunState      `json:"runs,omitempty"`Run       *RunState                 `json:"run,omitempty"`}

ReplicationState is for runtime state

func (*ReplicationState)GetStoreadded inv1.5.1

func (rs *ReplicationState) GetStore() map[string]any

func (*ReplicationState)Marshalladded inv1.4.1

func (rs *ReplicationState) Marshall()string

func (*ReplicationState)SetStateDataadded inv1.4.1

func (rs *ReplicationState) SetStateData(idstring, data map[string]any)

func (*ReplicationState)SetStateKeyValueadded inv1.4.1

func (rs *ReplicationState) SetStateKeyValue(id, keystring, valueany)

func (*ReplicationState)SetStoreDataadded inv1.4.5

func (rs *ReplicationState) SetStoreData(keystring, valueany, delbool)

func (*ReplicationState)StepExecutionadded inv1.4.25

func (rs *ReplicationState) StepExecution() *PipelineStepExecution

func (*ReplicationState)TaskExecutionadded inv1.4.1

func (rs *ReplicationState) TaskExecution() *TaskExecution

typeReplicationStreamConfigadded inv0.86.36

type ReplicationStreamConfig struct {IDstring         `json:"id,omitempty" yaml:"id,omitempty"`Descriptionstring         `json:"description,omitempty" yaml:"description,omitempty"`ModeMode           `json:"mode,omitempty" yaml:"mode,omitempty"`Objectstring         `json:"object,omitempty" yaml:"object,omitempty"`Select        []string       `json:"select,omitempty" yaml:"select,flow,omitempty"`Files         []string       `json:"files,omitempty" yaml:"files,omitempty"`// include/exclude filesWherestring         `json:"where,omitempty" yaml:"where,omitempty"`PrimaryKeyIany            `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"`UpdateKeystring         `json:"update_key,omitempty" yaml:"update_key,omitempty"`SQLstring         `json:"sql,omitempty" yaml:"sql,omitempty"`Tags          []string       `json:"tags,omitempty" yaml:"tags,omitempty"`SourceOptions *SourceOptions `json:"source_options,omitempty" yaml:"source_options,omitempty"`TargetOptions *TargetOptions `json:"target_options,omitempty" yaml:"target_options,omitempty"`Schedulestring         `json:"schedule,omitempty" yaml:"schedule,omitempty"`Disabledbool           `json:"disabled,omitempty" yaml:"disabled,omitempty"`Single        *bool          `json:"single,omitempty" yaml:"single,omitempty"`Transformsany            `json:"transforms,omitempty" yaml:"transforms,omitempty"`Columnsany            `json:"columns,omitempty" yaml:"columns,omitempty"`HooksHookMap        `json:"hooks,omitempty" yaml:"hooks,omitempty"`// contains filtered or unexported fields}

func (*ReplicationStreamConfig)ObjectHasStreamVarsadded inv1.3.4

func (s *ReplicationStreamConfig) ObjectHasStreamVars()bool

func (*ReplicationStreamConfig)PrimaryKeyadded inv0.86.36

func (s *ReplicationStreamConfig) PrimaryKey() []string

typeRouteName

type RouteNamestring

RouteName is the name of a route

const (RouteStatusRouteName = "/status"RouteNoticeRouteName = "/notice"RouteErrorRouteName = "/error"RouteSignUpUserRouteName = "/sign-up"RouteUserRouteName = "/user"RouteForgotPasswordRouteName = "/forgot-password"RouteResetPasswordRouteName = "/reset-password"RouteLoginRouteName = "/login"RouteLogoutRouteName = "/logout"RouteProxyRouteName = "/p"RouteAppIndexRouteName = "/app"RouteAppLoginRouteName = "/app/login"RouteAppLogoutRouteName = "/app/logout"RouteAppAPIKeyRouteName = "/app/apikey"RouteAPIRouteName = "/api/v1"RouteMasterStatusRouteName = "/api/v1/master-status"RouteMasterDBResetRouteName = "/api/v1/master-db-reset"RouteUploadsRouteName = "/api/v1/uploads"RouteAPIAccountsRouteName = "/api/v1/accounts"RouteAPIProjectsRouteName = "/api/v1/projects"RouteAPIKeyRouteName = "/api/v1/apikey"RouteAPIUsersRouteName = "/api/v1/users"RouteAPIJobsRouteName = "/api/v1/jobs"RouteAPILogsRouteName = "/api/v1/logs"RouteAPIExecutionsRouteName = "/api/v1/executions"RouteAPIConnectionsRouteName = "/api/v1/connections"RouteAPIConnectionTestRouteName = "/api/v1/connection-test"RouteAPIResetPasswordRouteName = "/api/v1/reset-password"RouteAPIDataRequestRouteName = "/api/v1/data-request"RouteAPIWorkersRouteName = "/api/v1/workers"RouteAPISettingsRouteName = "/api/v1/settings"RouteAlertLogRouteName = "/alert/log"RouteWsRouteName = "/ws"RouteWsClientRouteName = "/ws/client"RouteWsWorkerRouteName = "/ws/worker")

typeRunFileadded inv1.5.1

type RunFile struct {TypeRunFileTypeFileg.FileItemBodystring// contains filtered or unexported fields}

func (*RunFile)IsValidadded inv1.5.1

func (rf *RunFile) IsValid() (bool,error)

func (*RunFile)Loadadded inv1.5.1

func (rf *RunFile) Load()error

typeRunFileTypeadded inv1.5.1

type RunFileTypestring
const (RunFileReplicationRunFileType = "replication"RunFilePipelineRunFileType = "pipeline")

typeRunFilesadded inv1.5.1

type RunFiles []RunFile

func (RunFiles)Orderadded inv1.5.1

func (rfsRunFiles) Order() (orderedRunFiles)

typeRunStateadded inv1.3.5

type RunState struct {IDstring                  `json:"id,omitempty"`Stream     *StreamState            `json:"stream"`Object     *ObjectState            `json:"object"`TotalBytesuint64                  `json:"total_bytes"`TotalRowsuint64                  `json:"total_rows"`StatusExecStatus              `json:"status"`StartTime  *time.Time              `json:"start_time"`EndTime    *time.Time              `json:"end_time"`Durationint64                   `json:"duration"`Error      *string                 `json:"error"`IncrValueany                     `json:"incremental_value,omitempty"`Rangestring                  `json:"range,omitempty"`ConfigReplicationStreamConfig `json:"config"`Task       *TaskExecution          `json:"-"`Step       *PipelineStepExecution  `json:"-"`}

typeRuntimeStateadded inv1.3.5

type RuntimeState interface {SetStateData(idstring, data map[string]any)SetStateKeyValue(id, keystring, valueany)SetStoreData(keystring, valueany, delbool)GetStore() map[string]anyMarshall()stringTaskExecution() *TaskExecutionStepExecution() *PipelineStepExecution}

typeSource

type Source struct {Connstring         `json:"conn,omitempty" yaml:"conn,omitempty"`Typedbio.Type      `json:"type,omitempty" yaml:"type,omitempty"`Streamstring         `json:"stream,omitempty" yaml:"stream,omitempty"`Select      []string       `json:"select,omitempty" yaml:"select,omitempty"`// Select or exclude columns. Exclude with prefix "-".Files       []string       `json:"files,omitempty" yaml:"files,omitempty"`// include/exclude filesWherestring         `json:"where,omitempty" yaml:"where,omitempty"`Querystring         `json:"query,omitempty" yaml:"query,omitempty"`PrimaryKeyIany            `json:"primary_key,omitempty" yaml:"primary_key,omitempty"`UpdateKeystring         `json:"update_key,omitempty" yaml:"update_key,omitempty"`Options     *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"`Data map[string]any `json:"-" yaml:"-"`}

Source is a source of data

func (*Source)Flattenadded inv1.4.5

func (s *Source) Flatten()int

Flatten returns the flatten depth

func (*Source)HasPrimaryKeyadded inv1.0.50

func (s *Source) HasPrimaryKey()bool

func (*Source)HasUpdateKeyadded inv1.0.50

func (s *Source) HasUpdateKey()bool

func (*Source)Limit

func (s *Source) Limit()int

func (*Source)MD5added inv1.1.6

func (s *Source) MD5()string

func (*Source)Offsetadded inv1.2.14

func (s *Source) Offset()int

func (*Source)PrimaryKeyadded inv0.84.0

func (s *Source) PrimaryKey() []string

typeSourceOptions

type SourceOptions struct {EmptyAsNull    *bool               `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`Header         *bool               `json:"header,omitempty" yaml:"header,omitempty"`Flattenany                 `json:"flatten,omitempty" yaml:"flatten,omitempty"`FieldsPerRec   *int                `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`Compression    *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`Format         *dbio.FileType      `json:"format,omitempty" yaml:"format,omitempty"`NullIf         *string             `json:"null_if,omitempty" yaml:"null_if,omitempty"`DatetimeFormatstring              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`SkipBlankLines *bool               `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`Delimiterstring              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`Escapestring              `json:"escape,omitempty" yaml:"escape,omitempty"`Quotestring              `json:"quote,omitempty" yaml:"quote,omitempty"`MaxDecimals    *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`JmesPath       *string             `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`Sheet          *string             `json:"sheet,omitempty" yaml:"sheet,omitempty"`Range          *string             `json:"range,omitempty" yaml:"range,omitempty"`Limit          *int                `json:"limit,omitempty" yaml:"limit,omitempty"`Offset         *int                `json:"offset,omitempty" yaml:"offset,omitempty"`ChunkSizeany                 `json:"chunk_size,omitempty" yaml:"chunk_size,omitempty"`ChunkCount     *int                `json:"chunk_count,omitempty" yaml:"chunk_count,omitempty"`ChunkExpr      *string             `json:"chunk_expr,omitempty" yaml:"chunk_expr,omitempty"`Encoding       *iop.Encoding       `json:"encoding,omitempty" yaml:"encoding,omitempty"`// columns & transforms were moved out of source_options//https://github.com/slingdata-io/sling-cli/issues/348Columnsany `json:"columns,omitempty" yaml:"columns,omitempty"`// legacyTransformsany `json:"transforms,omitempty" yaml:"transforms,omitempty"`// legacy}

SourceOptions are connection and stream processing options

func (*SourceOptions)RangeStartEndadded inv1.4.1

func (so *SourceOptions) RangeStartEnd() (start, endstring)

func (*SourceOptions)SetDefaultsadded inv1.0.31

func (o *SourceOptions) SetDefaults(sourceOptionsSourceOptions)

typeStatusMapadded inv1.3.6

type StatusMap struct {Countint `json:"count"`Successint `json:"success"`Runningint `json:"running"`Skippedint `json:"skipped"`Cancelledint `json:"cancelled"`Warningint `json:"warning"`Errorint `json:"error"`}

typeStreamStateadded inv1.3.5

type StreamState struct {FileFolderstring `json:"file_folder,omitempty"`FileNamestring `json:"file_name,omitempty"`FileExtstring `json:"file_ext,omitempty"`FilePathstring `json:"file_path,omitempty"`Namestring `json:"name,omitempty"`Schemastring `json:"schema,omitempty"`SchemaLowerstring `json:"schema_lower,omitempty"`SchemaUpperstring `json:"schema_upper,omitempty"`Tablestring `json:"table,omitempty"`TableLowerstring `json:"table_lower,omitempty"`TableUpperstring `json:"table_upper,omitempty"`FullNamestring `json:"full_name,omitempty"`}

typeTarget

type Target struct {Connstring         `json:"conn,omitempty" yaml:"conn,omitempty"`Typedbio.Type      `json:"type,omitempty" yaml:"type,omitempty"`Objectstring         `json:"object,omitempty" yaml:"object,omitempty"`Columnsany            `json:"columns,omitempty" yaml:"columns,omitempty"`Options *TargetOptions `json:"options,omitempty" yaml:"options,omitempty"`Data map[string]any `json:"-" yaml:"-"`TmpTableCreatedbool `json:"-" yaml:"-"`// contains filtered or unexported fields}

Target is a target of data

func (*Target)MD5added inv1.1.6

func (t *Target) MD5()string

func (*Target)ObjectFileFormatadded inv1.2.25

func (t *Target) ObjectFileFormat()dbio.FileType

typeTargetOptions

type TargetOptions struct {Header           *bool               `json:"header,omitempty" yaml:"header,omitempty"`Compression      *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`Concurrencyint                 `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`BatchLimit       *int64              `json:"batch_limit,omitempty" yaml:"batch_limit,omitempty"`DatetimeFormatstring              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`Delimiterstring              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`FileMaxRows      *int64              `json:"file_max_rows,omitempty" yaml:"file_max_rows,omitempty"`FileMaxBytes     *int64              `json:"file_max_bytes,omitempty" yaml:"file_max_bytes,omitempty"`Formatdbio.FileType       `json:"format,omitempty" yaml:"format,omitempty"`MaxDecimals      *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`UseBulk          *bool               `json:"use_bulk,omitempty" yaml:"use_bulk,omitempty"`IgnoreExisting   *bool               `json:"ignore_existing,omitempty" yaml:"ignore_existing,omitempty"`DeleteMissing    *string             `json:"delete_missing,omitempty" yaml:"delete_missing,omitempty"`AddNewColumns    *bool               `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"`AdjustColumnType *bool               `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"`ColumnCasing     *iop.ColumnCasing   `json:"column_casing,omitempty" yaml:"column_casing,omitempty"`ColumnTyping     *iop.ColumnTyping   `json:"column_typing,omitempty" yaml:"column_typing,omitempty"`Encoding         *iop.Encoding       `json:"encoding,omitempty" yaml:"encoding,omitempty"`DirectInsert     *bool               `json:"direct_insert,omitempty" yaml:"direct_insert,omitempty"`TableKeysdatabase.TableKeys       `json:"table_keys,omitempty" yaml:"table_keys,omitempty"`TableTmpstring                   `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"`TableDDL       *string                  `json:"table_ddl,omitempty" yaml:"table_ddl,omitempty"`PreSQL         *string                  `json:"pre_sql,omitempty" yaml:"pre_sql,omitempty"`PostSQL        *string                  `json:"post_sql,omitempty" yaml:"post_sql,omitempty"`IsolationLevel *database.IsolationLevel `json:"isolation_level,omitempty" yaml:"isolation_level,omitempty"`MergeStrategy  *database.MergeStrategy  `json:"merge_strategy,omitempty" yaml:"merge_strategy,omitempty"`}

TargetOptions are target connection and stream processing options

func (*TargetOptions)SetDefaultsadded inv1.0.31

func (o *TargetOptions) SetDefaults(targetOptionsTargetOptions)

typeTaskExecution

type TaskExecution struct {ExecIDstring     `json:"exec_id"`Config    *Config    `json:"config"`TypeJobType    `json:"type"`StatusExecStatus `json:"status"`Errerror      `json:"error"`StartTime *time.Time `json:"start_time"`EndTime   *time.Time `json:"end_time"`Bytesuint64     `json:"bytes"`Context   *g.Context `json:"-"`Progressstring     `json:"progress"`OutputLines chan *g.LogLineReplication    *ReplicationConfig `json:"replication"`ProgressHist   []string           `json:"progress_hist"`PBar           *ProgressBar       `json:"-"`ProcStatsStartg.ProcStats        `json:"-"`// process stats at beginning// contains filtered or unexported fields}

TaskExecution is a sling ELT task run, synonymous to an execution

funcNewTask

func NewTask(execIDstring, cfg *Config) (t *TaskExecution)

NewTask creates a Sling task with given configuration

func (*TaskExecution)AddCleanupTaskFirstadded inv1.1.8

func (t *TaskExecution) AddCleanupTaskFirst(f func())

func (*TaskExecution)AddCleanupTaskLastadded inv1.1.8

func (t *TaskExecution) AddCleanupTaskLast(f func())

func (*TaskExecution)AppendOutputadded inv1.1.6

func (t *TaskExecution) AppendOutput(ll *g.LogLine)

func (*TaskExecution)Cleanupadded inv0.84.3

func (t *TaskExecution) Cleanup()

func (*TaskExecution)Dataadded inv1.2.19

func (t *TaskExecution) Data() *iop.Dataset

Data return the dataset object

func (*TaskExecution)Dfadded inv1.2.7

func (t *TaskExecution) Df() *iop.Dataflow

Df return the dataflow object

func (*TaskExecution)Execute

func (t *TaskExecution) Execute()error

Execute runs a Sling task.This may be a file/db to file/db transfer

func (*TaskExecution)ExecuteHooksadded inv1.2.23

func (t *TaskExecution) ExecuteHooks(stageHookStage) (errerror)

func (*TaskExecution)GetBytes

func (t *TaskExecution) GetBytes() (inBytes, outBytesuint64)

GetBytes return the current total of bytes processed

func (*TaskExecution)GetBytesString

func (t *TaskExecution) GetBytesString() (sstring)

func (*TaskExecution)GetCount

func (t *TaskExecution) GetCount() (countuint64)

GetCount return the current count of rows processed

func (*TaskExecution)GetRate

func (t *TaskExecution) GetRate(secWindowint) (rowRate, byteRateint64)

GetRate return the speed of flow (rows / sec and bytes / sec)secWindow is how many seconds back to measure (0 is since beginning)

func (*TaskExecution)GetSourceTableadded inv1.3.4

func (t *TaskExecution) GetSourceTable() (sTabledatabase.Table, errerror)

func (*TaskExecution)GetStateMapadded inv1.2.23

func (t *TaskExecution) GetStateMap() map[string]any

func (*TaskExecution)GetTargetTableadded inv1.3.4

func (t *TaskExecution) GetTargetTable(tempTableSuffix ...string) (tTabledatabase.Table, errerror)

func (*TaskExecution)GetTotalBytes

func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytesuint64)

GetTotalBytes gets the inbound/oubound bytes of the task

func (*TaskExecution)IsStalled

func (t *TaskExecution) IsStalled(windowfloat64)bool

IsStalled determines if the task has stalled (no row increment)

func (*TaskExecution)ReadFromApiadded inv1.4.5

func (t *TaskExecution) ReadFromApi(cfg *Config, srcConn *api.APIConnection) (df *iop.Dataflow, errerror)

ReadFromApi reads from a source api

func (*TaskExecution)ReadFromDB

func (t *TaskExecution) ReadFromDB(cfg *Config, srcConndatabase.Connection) (df *iop.Dataflow, errerror)

ReadFromDB reads from a source database

func (*TaskExecution)ReadFromFile

func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, errerror)

ReadFromFile reads from a source file

func (*TaskExecution)SetProgress

func (t *TaskExecution) SetProgress(textstring, args ...interface{})

SetProgress sets the progress

func (*TaskExecution)StateSetadded inv1.4.25

func (t *TaskExecution) StateSet()

func (*TaskExecution)WriteToDb

func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConndatabase.Connection) (cntuint64, errerror)

WriteToDb writes to a target DBcreate temp tableload into temp tableinsert / incremental / replace into target table

func (*TaskExecution)WriteToFile

func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cntuint64, errerror)

WriteToFile writes to a target file

typeWildcardadded inv1.2.19

type Wildcard struct {PatternstringStreamNames []stringNodeMap     map[string]filesys.FileNodeTableMap    map[string]database.TableEndpointMap map[string]api.Endpoint}

typeWildcardsadded inv1.2.19

type Wildcards []*Wildcard

func (Wildcards)Patternsadded inv1.2.19

func (wsWildcards) Patterns() []string

Source Files

View all Source files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f orF : Jump to
y orY : Canonical URL
go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic.Learn more.

[8]ページ先頭

©2009-2025 Movatter.jp