Movatterモバイル変換


[0]ホーム

URL:


iop

package
v1.5.1Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License:GPL-3.0Imports:97Imported by:5

Details

Repository

github.com/slingdata-io/sling-cli

Links

README

Input-Process-Output (ipo)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (// RemoveTrailingDecZeros removes the trailing zeros in CastToStringRemoveTrailingDecZeros =falseSampleSize             = 900)
View Source
var (DuckDbVersion     = "1.4.2"DuckDbVersionMD   = "1.4.2"DuckDbUseTempFile =falseDuckDbURISeparator = "|-|+|")
View Source
var (TransformsLegacyMap = map[string]TransformLegacy{}GlobalFunctionMap map[string]goval.ExpressionFunctionGetTransformFunction = func(string)goval.ExpressionFunction {returnnil}FunctionToTransform = func(namestring, fgoval.ExpressionFunction, params ...any)TransformLegacy {returnTransformLegacy{Name: name,Func: func(sp *StreamProcessor, vals ...any) (any,error) {iflen(params) > 0 {vals =append(vals, params...)}val, err := f(vals...)return val, err},}}LocalConnections =cmap.New[map[string]any]())
View Source
var (TransformDecodeLatin1 =TransformLegacy{Name:EncodingLatin1.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeISO8859_1, val)return newVal, err},}TransformDecodeLatin5 =TransformLegacy{Name:EncodingLatin5.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeISO8859_5, val)return newVal, err},}TransformDecodeLatin9 =TransformLegacy{Name:EncodingLatin9.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeISO8859_15, val)return newVal, err},}TransformDecodeUtf8 =TransformLegacy{Name:EncodingUtf8.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeUTF8, val)return newVal, err},}TransformDecodeUtf8Bom =TransformLegacy{Name:EncodingUtf8Bom.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeUTF8BOM, val)return newVal, err},}TransformDecodeUtf16 =TransformLegacy{Name:EncodingUtf16.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeUTF16, val)return newVal, err},}TransformDecodeWindows1250 =TransformLegacy{Name:EncodingWindows1250.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeWindows1250, val)return newVal, err},}TransformDecodeWindows1252 =TransformLegacy{Name:EncodingWindows1252.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeWindows1252, val)return newVal, err},}TransformDuckdbListToText =TransformLegacy{Name: "duckdb_list_to_text",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.duckDbListAsText(val),nil},}TransformEncodeLatin1 =TransformLegacy{Name:EncodingLatin1.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeISO8859_1, val)return newVal, err},}TransformEncodeLatin5 =TransformLegacy{Name:EncodingLatin5.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeISO8859_5, val)return newVal, err},}TransformEncodeLatin9 =TransformLegacy{Name:EncodingLatin9.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeISO8859_15, val)return newVal, err},}TransformEncodeUtf8 =TransformLegacy{Name:EncodingUtf8.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnfmt.Sprintf("%q", val),nilnewVal, _, err :=transform.String(sp.transformers.EncodeUTF8, val)return newVal, err},}TransformEncodeUtf8Bom =TransformLegacy{Name:EncodingUtf8Bom.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeUTF8BOM, val)return newVal, err},}TransformEncodeUtf16 =TransformLegacy{Name:EncodingUtf16.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeUTF16, val)return newVal, err},}TransformEncodeWindows1250 =TransformLegacy{Name:EncodingWindows1250.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeWindows1250, val)return newVal, err},}TransformEncodeWindows1252 =TransformLegacy{Name:EncodingWindows1252.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeWindows1252, val)return newVal, err},}TransformHashMd5 =TransformLegacy{Name: "hash_md5",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returng.MD5(val),nil},}TransformHashSha256 =TransformLegacy{Name: "hash_sha256",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.SHA256(val),nil},}TransformHashSha512 =TransformLegacy{Name: "hash_sha512",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.SHA512(val),nil},}TransformParseBit =TransformLegacy{Name: "parse_bit",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseBit(sp, val)},}TransformBinaryToDecimal =TransformLegacy{Name: "binary_to_decimal",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.BinaryToDecimal(sp, val)},}TransformBinaryToHex =TransformLegacy{Name: "binary_to_hex",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.BinaryToHex(val),nil},}TransformParseFix =TransformLegacy{Name: "parse_fix",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseFIX(sp, val)},}TransformParseUuid =TransformLegacy{Name: "parse_uuid",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseUUID(sp, val)},}TransformParseMsUuid =TransformLegacy{Name: "parse_ms_uuid",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseMsUUID(sp, val)},}TransformReplace0x00 =TransformLegacy{Name: "replace_0x00",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.Replace0x00(sp, val)},}TransformReplaceAccents =TransformLegacy{Name: "replace_accents",FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.Accent, val)return newVal, err},}TransformReplaceNonPrintable =TransformLegacy{Name: "replace_non_printable",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ReplaceNonPrintable(val),nil},}TransformTrimSpace =TransformLegacy{Name: "trim_space",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnstrings.TrimSpace(val),nil},}TransformLower =TransformLegacy{Name: "lower",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnstrings.ToLower(val),nil},}TransformUpper =TransformLegacy{Name: "upper",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnstrings.ToUpper(val),nil},}// used as lookup, cannot return null since is not pointerTransformEmptyAsNull =TransformLegacy{Name: "empty_as_null",Func: func(sp *StreamProcessor, vals ...any) (any,error) {iflen(vals) == 0 {returnnil,nil}if vals[0] == "" {returnnil,nil}return vals[0],nil},})
View Source
var AllCompressorType = []struct {ValueCompressorTypeTSNamestring}{{AutoCompressorType, "AutoCompressorType"},{NoneCompressorType, "NoneCompressorType"},{ZipCompressorType, "ZipCompressorType"},{GzipCompressorType, "GzipCompressorType"},{SnappyCompressorType, "SnappyCompressorType"},{ZStandardCompressorType, "ZStandardCompressorType"},}
View Source
var NewTransform = func(t []map[string]string, _ *StreamProcessor)Transform {returnnil}
View Source
var ParseStageTransforms = func(payloadany) ([]map[string]string,error) {returnnil,g.Error("please use the official sling-cli release for using transforms")}
View Source
var Transforms transformsNS

Functions

funcAppendToBuilderadded inv1.4.10

func AppendToBuilder(builderarray.Builder, col *Column, val interface{})

funcApplySelectadded inv1.5.1

func ApplySelect(fields []string, selectExprs []string) (newFields []string, errerror)

ApplySelect applies select expressions to filter, rename, and reorder fields.Select syntax:

  • "field" -> include field
  • "-field" -> exclude field
  • "field as new" -> rename field to new
  • "*" -> include all fields
  • "prefix*" -> include fields starting with prefix
  • "*suffix" -> include fields ending with suffix
  • "-prefix*" -> exclude fields starting with prefix
  • "-*suffix" -> exclude fields ending with suffix

Field matching is case-insensitive.Returns new fields slice with selected/renamed fields in order specified.

funcAutoDecompress

func AutoDecompress(readerio.Reader) (gReaderio.Reader, errerror)

AutoDecompress auto detects compression to decompress. Otherwise return same reader

funcCleanHeaderRow

func CleanHeaderRow(header []string) []string

CleanHeaderRow cleans the header row from incompatible characters

funcCleanName

func CleanName(namestring) (newNamestring)

funcCloseQueuesadded inv1.4.23

func CloseQueues()

funcColumnsToArrowSchemaadded inv1.4.10

func ColumnsToArrowSchema(columnsColumns) *arrow.Schema

funcCompareColumns

func CompareColumns(columns1Columns, columns2Columns) (reshapebool, errerror)

CompareColumns compared two columns to see if there are similar

funcCreateDummyFields

func CreateDummyFields(numColsint) (cols []string)

CreateDummyFields creates dummy columns for csvs with no header row

funcDecodeJSONIfBase64added inv1.5.1

func DecodeJSONIfBase64(jsonBodystring) (string,error)

DecodeJSONIfBase64 detects if the json body is base64-encoded and decodes them

funcExtractPartitionTimeValueadded inv1.3.6

func ExtractPartitionTimeValue(mask, pathstring) (timestamptime.Time, errerror)

ExtractPartitionTimeValue extracts from partition time value from the pathwith mask `data/{YYYY}/{MM}/{DD}` and path `data/2024/12/21`, the returned timestamp should be 2024-12-21.

funcFormatValueadded inv1.2.21

func FormatValue(valany, columnTypeColumnType, connTypedbio.Type) (newValstring)

FormatValue format as sql expression (adds quotes)

funcGeneratePartURIsFromRangeadded inv1.3.6

func GeneratePartURIsFromRange(mask, updateKeystring, start, endtime.Time) (uris []string, errerror)

GeneratePartURIsFromRange generates all the possible URIsgiven a start/end range. Must first determine the lowest time resolution

funcGetISO8601DateMap

func GetISO8601DateMap(ttime.Time) map[string]any

GetISO8601DateMap return a map of date parts for string formatting

funcGetLowestPartTimeUnitadded inv1.3.6

func GetLowestPartTimeUnit(maskstring) (time.Duration,error)

GetLowestPartTimeUnit loops though possible TimeLevel or PartitionLevel valuesand returns the lowest time.Duration unit

funcGetPartitionDateMapadded inv1.3.6

func GetPartitionDateMap(partKeyPrefixstring, timestamptime.Time) map[string]any

funcGetValueFromArrowArrayadded inv1.4.10

func GetValueFromArrowArray(arrarrow.Array, idxint)any

GetValueFromArrowArray extracts a value from an arrow array at the given index

funcIsDummy

func IsDummy(columns []Column)bool

IsDummy returns true if the columns are injected by CreateDummyFields

funcMakeAwsConfigadded inv1.4.11

func MakeAwsConfig(ctxcontext.Context, props map[string]string) (cfgaws.Config, errerror)

funcMakeDecNumScaleadded inv1.1.6

func MakeDecNumScale(scaleint) *big.Rat

funcMakeRowsChan

func MakeRowsChan() chan []any

MakeRowsChan returns a buffered channel with default size

funcMatchedPartitionMaskadded inv1.3.6

func MatchedPartitionMask(mask, pathstring) (matchesbool)

MatchedPartitionMask determines if the mask and the path have the samepartition structure

funcNewJSONStream

func NewJSONStream(ds *Datastream, decoder decoderLike, flattenint, jmespathstring) *jsonStream

funcNewXMLDecoderadded inv1.4.20

func NewXMLDecoder(readerio.Reader) *xmlDecoder

funcOpenTunnelSSHadded inv1.2.15

func OpenTunnelSSH(tgtHoststring, tgtPortint, tunnelURL, privateKey, passphrasestring) (localPortint, errerror)

funcRow

func Row(vals ...any) []any

Row is a row

funcScanCarrRet

func ScanCarrRet(data []byte, atEOFbool) (advanceint, token []byte, errerror)

ScanCarrRet removes the \r runes that are without \n rightafter

funcSetSampleSizeadded inv1.4.21

func SetSampleSize()

funcStringToDecimalByteArrayadded inv1.1.6

func StringToDecimalByteArray(sstring, numSca *big.Rat, pTypeparquet.Type, lengthint) []byte

funcStripSQLCommentsadded inv1.4.6

func StripSQLComments(sqlstring) (string,error)

StripSQLComments removes all SQL comments (-- or /* */) from the provided SQL string

funcUnzip

func Unzip(srcstring, deststring) (nodes []map[string]any, errerror)

Unzip will decompress a zip archive, moving all files and folderswithin the zip file (parameter 1) to an output directory (parameter 2).

Types

typeArrowReaderadded inv1.4.10

type ArrowReader struct {PathstringIpcReader     *ipc.ReaderIpcFileReader *ipc.FileReaderData          *DatasetContext       *g.ContextMemorymemory.Allocator// contains filtered or unexported fields}

ArrowReader is a arrow reader object using arrow v18

funcNewArrowFileReaderadded inv1.4.17

func NewArrowFileReader(reader *os.File, selected []string) (a *ArrowReader, errerror)

funcNewArrowReaderadded inv1.4.10

func NewArrowReader(readerio.Reader, selected []string) (a *ArrowReader, errerror)

func (*ArrowReader)Columnsadded inv1.4.10

func (a *ArrowReader) Columns()Columns

typeArrowWriteradded inv1.4.10

type ArrowWriter struct {Writer *ipc.FileWriter// contains filtered or unexported fields}

ArrowWriter is an arrow writer object using arrow v18

funcNewArrowWriteradded inv1.4.10

func NewArrowWriter(wio.Writer, columnsColumns) (a *ArrowWriter, errerror)

func (*ArrowWriter)Closeadded inv1.4.10

func (a *ArrowWriter) Close()error

func (*ArrowWriter)Columnsadded inv1.4.10

func (a *ArrowWriter) Columns()Columns

func (*ArrowWriter)WriteRowadded inv1.4.10

func (a *ArrowWriter) WriteRow(row []any)error

typeAvro

type Avro struct {PathstringReader *goavro.OCFReaderData   *Dataset// contains filtered or unexported fields}

Avro is a avro` object

funcNewAvroStream

func NewAvroStream(readerio.ReadSeeker, columnsColumns) (a *Avro, errerror)

func (*Avro)Columns

func (a *Avro) Columns()Columns

typeBatch

type Batch struct {ColumnsColumnsRows     chan []anyPrevious *BatchCountint64Limitint64// contains filtered or unexported fields}

func (*Batch)AddTransform

func (b *Batch) AddTransform(transf func(row []any) []any)

func (*Batch)Close

func (b *Batch) Close()

func (*Batch)ColumnsChanged

func (b *Batch) ColumnsChanged()bool

func (*Batch)Ds

func (b *Batch) Ds() *Datastream

func (*Batch)ID

func (b *Batch) ID()string

func (*Batch)IsFirst

func (b *Batch) IsFirst()bool

func (*Batch)Push

func (b *Batch) Push(row []any)

func (*Batch)Shape

func (b *Batch) Shape(tgtColumnsColumns, pause ...bool) (errerror)

typeBatchReader

type BatchReader struct {Batch   *BatchColumnsColumnsReaderio.ReaderCounterint64}

typeCSV

type CSV struct {PathstringNoHeaderboolDelimiterstringEscapestringQuotestringFieldsPerRecordintColumns         []ColumnFile            *os.FileDataDatasetReaderio.ReaderConfig          map[string]stringNoDebugbool// contains filtered or unexported fields}

CSV is a csv object

func (*CSV)InferSchema

func (c *CSV) InferSchema()error

InferSchema returns a sample of n rows

func (*CSV)NewReader

func (c *CSV) NewReader() (*io.PipeReader,error)

NewReader creates a Reader

func (*CSV)Read

func (c *CSV) Read() (dataDataset, errerror)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV)ReadStream

func (c *CSV) ReadStream() (ds *Datastream, errerror)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV)ReadStreamContextadded inv1.1.15

func (c *CSV) ReadStreamContext(ctxcontext.Context) (ds *Datastream, errerror)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV)Sample

func (c *CSV) Sample(nint) (Dataset,error)

Sample returns a sample of n rows

func (*CSV)SetFields

func (c *CSV) SetFields(fields []string)

SetFields sets the fields

func (*CSV)WriteStream

func (c *CSV) WriteStream(ds *Datastream) (cntuint64, errerror)

WriteStream to CSV file

typeColumn

type Column struct {Positionint         `json:"position"`Namestring      `json:"name"`TypeColumnType  `json:"type"`DbTypestring      `json:"db_type,omitempty"`DbPrecisionint         `json:"db_precision,omitempty"`DbScaleint         `json:"db_scale,omitempty"`Sourcedbool        `json:"-"`// whether col was sourced/inferred from a typed sourceStatsColumnStats `json:"stats,omitempty"`Tablestring `json:"table,omitempty"`Schemastring `json:"schema,omitempty"`Databasestring `json:"database,omitempty"`Descriptionstring `json:"description,omitempty"`FileURIstring `json:"file_uri,omitempty"`Constraint *ColumnConstraint `json:"constraint,omitempty"`Metadata   map[string]string `json:"metadata,omitempty"`// contains filtered or unexported fields}

Column represents a schemata column

funcInferFromStats

func InferFromStats(columns []Column, safebool, noDebugbool) []Column

InferFromStats using the stats to infer data types

func (*Column)EvaluateConstraintadded inv1.2.16

func (col *Column) EvaluateConstraint(valueany, sp *StreamProcessor) (errerror)

EvaluateConstraint evaluates a value against the constraint function

func (*Column)GetNativeTypeadded inv1.2.16

func (col *Column) GetNativeType(tdbio.Type, ctColumnTyping) (nativeTypestring, errerror)

GetNativeType returns the native column type from generic

func (*Column)GoType

func (col *Column) GoType()reflect.Type

func (*Column)HasNullsadded inv1.1.7

func (col *Column) HasNulls()bool

func (*Column)HasNullsPlus1added inv1.1.7

func (col *Column) HasNullsPlus1()bool

HasNullsPlus1 denotes when a column is all nulls plus 1 non-null

func (*Column)IsBinaryadded inv1.2.3

func (col *Column) IsBinary()bool

IsBinary returns whether the column is a binary

func (*Column)IsBool

func (col *Column) IsBool()bool

IsBool returns whether the column is a boolean

func (*Column)IsDateadded inv1.1.14

func (col *Column) IsDate()bool

IsDate returns whether the column is a datet object

func (*Column)IsDatetime

func (col *Column) IsDatetime()bool

IsDatetime returns whether the column is a datetime object

func (*Column)IsDecimal

func (col *Column) IsDecimal()bool

IsDecimal returns whether the column is a decimal

func (*Column)IsFloatadded inv1.1.14

func (col *Column) IsFloat()bool

IsFloat returns whether the column is a float

func (*Column)IsInteger

func (col *Column) IsInteger()bool

IsInteger returns whether the column is an integer

func (*Column)IsKeyTypeadded inv1.1.12

func (col *Column) IsKeyType(keyTypeKeyType)bool

func (*Column)IsNumber

func (col *Column) IsNumber()bool

IsNumber returns whether the column is a decimal or an integer

func (*Column)IsString

func (col *Column) IsString()bool

IsString returns whether the column is a string

func (*Column)IsUnique

func (col *Column) IsUnique()bool

func (*Column)Key

func (col *Column) Key()string

func (*Column)SetConstraintadded inv1.2.16

func (col *Column) SetConstraint()

func (*Column)SetLengthPrecisionScale

func (col *Column) SetLengthPrecisionScale()

SetLengthPrecisionScale parse length, precision, scale

func (*Column)SetMetadata

func (col *Column) SetMetadata(keystring, valuestring)

typeColumnCasingadded inv1.2.22

type ColumnCasingstring

ColumnCasing is the casing method to use

const (// seehttps://github.com/slingdata-io/sling-cli/issues/538NormalizeColumnCasingColumnCasing = "normalize"// normalize to target, leaves mixed cases columns as itSourceColumnCasingColumnCasing = "source"// keeps source column name casing. The default.TargetColumnCasingColumnCasing = "target"// converts casing according to target database. Lower-case for files.SnakeColumnCasingColumnCasing = "snake"// converts snake casing according to target database. Lower-case for files.UpperColumnCasingColumnCasing = "upper"// make it upper caseLowerColumnCasingColumnCasing = "lower"// make it lower caseCamelColumnCasingColumnCasing = "camel"// converts to camelCase)

func (*ColumnCasing)Applyadded inv1.2.22

func (cc *ColumnCasing) Apply(namestring, tgtConnTypedbio.Type)string

Apply applies column casing to provided name.If cc is nil or SourceColumnCasing, it returns the original value

func (*ColumnCasing)ApplyColumnsadded inv1.4.25

func (cc *ColumnCasing) ApplyColumns(colsColumns, tgtTypedbio.Type) (newColsColumns)

IsEmpty return true if nil or blank

func (*ColumnCasing)Equalsadded inv1.2.22

func (cc *ColumnCasing) Equals(valColumnCasing)bool

Equals evaluates equality for column casing (pointer safe)

func (*ColumnCasing)IsEmptyadded inv1.2.22

func (cc *ColumnCasing) IsEmpty()bool

IsEmpty return true if nil or blank

typeColumnConstraintadded inv1.2.16

type ColumnConstraint struct {Expressionstring             `json:"expression,omitempty"`Errors     []string           `json:"errors,omitempty"`FailCntuint64             `json:"fail_cnt,omitempty"`EvalFuncConstraintEvalFunc `json:"-"`}

typeColumnStats

type ColumnStats struct {MinLenint    `json:"min_len,omitempty"`MaxLenint    `json:"max_len,omitempty"`MaxDecLenint    `json:"max_dec_len,omitempty"`Minint64  `json:"min"`Maxint64  `json:"max"`NullCntint64  `json:"null_cnt"`IntCntint64  `json:"int_cnt,omitempty"`DecCntint64  `json:"dec_cnt,omitempty"`BoolCntint64  `json:"bool_cnt,omitempty"`JsonCntint64  `json:"json_cnt,omitempty"`StringCntint64  `json:"string_cnt,omitempty"`DateCntint64  `json:"date_cnt,omitempty"`DateTimeCntint64  `json:"datetime_cnt,omitempty"`DateTimeZCntint64  `json:"datetimez_cnt,omitempty"`TotalCntint64  `json:"total_cnt"`UniqCntint64  `json:"uniq_cnt"`Checksumuint64 `json:"checksum"`LastValany    `json:"-"`// last non-empty value. useful for state incremental}

ColumnStats holds statistics for a column

func (*ColumnStats)DistinctPercent

func (cs *ColumnStats) DistinctPercent()float64

func (*ColumnStats)DuplicateCount

func (cs *ColumnStats) DuplicateCount()int64

func (*ColumnStats)DuplicatePercent

func (cs *ColumnStats) DuplicatePercent()float64

typeColumnType

type ColumnTypestring
const (BigIntTypeColumnType = "bigint"BinaryTypeColumnType = "binary"BoolTypeColumnType = "bool"DateTypeColumnType = "date"DatetimeTypeColumnType = "datetime"DecimalTypeColumnType = "decimal"IntegerTypeColumnType = "integer"JsonTypeColumnType = "json"SmallIntTypeColumnType = "smallint"StringTypeColumnType = "string"UUIDTypeColumnType = "uuid"TextTypeColumnType = "text"TimestampTypeColumnType = "timestamp"TimestampzTypeColumnType = "timestampz"FloatTypeColumnType = "float"TimeTypeColumnType = "time"TimezTypeColumnType = "timez")

funcNativeTypeToGeneraladded inv1.2.19

func NativeTypeToGeneral(name, dbTypestring, connTypedbio.Type) (colTypeColumnType)

func (ColumnType)IsBinaryadded inv1.2.3

func (ctColumnType) IsBinary()bool

IsBinary returns whether the column is a binary

func (ColumnType)IsBool

func (ctColumnType) IsBool()bool

IsBool returns whether the column is a boolean

func (ColumnType)IsDateadded inv1.1.8

func (ctColumnType) IsDate()bool

IsDatetime returns whether the column is a datetime object

func (ColumnType)IsDatetime

func (ctColumnType) IsDatetime()bool

IsDatetime returns whether the column is a datetime object

func (ColumnType)IsDecimal

func (ctColumnType) IsDecimal()bool

IsDecimal returns whether the column is a decimal

func (ColumnType)IsFloatadded inv1.1.14

func (ctColumnType) IsFloat()bool

IsFloat returns whether the column is a float

func (ColumnType)IsInteger

func (ctColumnType) IsInteger()bool

IsInteger returns whether the column is an integer

func (ColumnType)IsJSON

func (ctColumnType) IsJSON()bool

IsJSON returns whether the column is a json

func (ColumnType)IsNumber

func (ctColumnType) IsNumber()bool

IsNumber returns whether the column is a decimal or an integer

func (ColumnType)IsString

func (ctColumnType) IsString()bool

IsString returns whether the column is a string

func (ColumnType)IsTimeadded inv1.4.17

func (ctColumnType) IsTime()bool

IsTime returns whether the column is a time object

func (ColumnType)IsValid

func (ctColumnType) IsValid()bool

IsValid returns whether the column has a valid type

typeColumnTypingadded inv1.4.5

type ColumnTyping struct {String  *StringColumnTyping  `json:"string,omitempty" yaml:"string,omitempty"`Decimal *DecimalColumnTyping `json:"decimal,omitempty" yaml:"decimal,omitempty"`JSON    *JsonColumnTyping    `json:"json,omitempty" yaml:"json,omitempty"`}

ColumnTyping contains type-specific mapping configurations

func (*ColumnTyping)MaxDecimalsadded inv1.4.19

func (ct *ColumnTyping) MaxDecimals()int

typeColumns

type Columns []Column

Columns represent many columns

funcArrowSchemaToColumnsadded inv1.4.10

func ArrowSchemaToColumns(schema *arrow.Schema)Columns

ArrowSchemaToColumns converts arrow schema to Columns

funcNewColumns

func NewColumns(cols ...Column)Columns

NewColumnsFromFields creates Columns from fields

funcNewColumnsFromFields

func NewColumnsFromFields(fields ...string) (colsColumns)

NewColumnsFromFields creates Columns from fields

func (Columns)Clone

func (colsColumns) Clone() (newColsColumns)

Names return the column names

func (Columns)Coerce

func (colsColumns) Coerce(castColsColumns, hasHeaderbool, casingColumnCasing, tgtTypedbio.Type) (newColsColumns)

Coerce casts columns into specified types

func (Columns)Dataadded inv1.2.2

func (colsColumns) Data(includeParentbool) (fields []string, rows [][]any)

func (Columns)Dataset

func (colsColumns) Dataset()Dataset

Dataset return an empty inferred dataset

func (Columns)DbTypes

func (colsColumns) DbTypes(args ...bool) []string

DbTypes return the column names/db typesargs -> (lower bool, cleanUp bool)

func (Columns)FieldMap

func (colsColumns) FieldMap(toLowerbool) map[string]int

FieldMap return the fields map of indexeswhen `toLower` is true, field keys are lower cased

func (Columns)GetColumn

func (colsColumns) GetColumn(namestring) *Column

GetColumn returns the matched Col

func (Columns)GetKeys

func (colsColumns) GetKeys(keyTypeKeyType)Columns

GetKeys gets key columns

func (Columns)GetMissingadded inv1.1.8

func (colsColumns) GetMissing(newCols ...Column) (missingColumns)

GetMissing returns the missing columns from newCols

func (Columns)IsDifferent

func (colsColumns) IsDifferent(newColsColumns)bool

func (Columns)IsDummy

func (colsColumns) IsDummy()bool

IsDummy returns true if the columns are injected by CreateDummyFields

func (Columns)IsSimilarTo

func (colsColumns) IsSimilarTo(otherColsColumns)bool

IsSimilarTo returns true if has same number of columnsand contains the same columns, but may be in different order

func (Columns)JSONadded inv1.2.2

func (colsColumns) JSON(includeParentbool) (outputstring)

PrettyTable returns a text pretty table

func (Columns)Keys

func (colsColumns) Keys() []string

Names return the column namesargs -> (lower bool, cleanUp bool)

func (Columns)MakeRec

func (colsColumns) MakeRec(row []any) map[string]any

func (Columns)MakeShaper

func (colsColumns) MakeShaper(tgtColumnsColumns) (shaper *Shaper, errerror)

func (Columns)Mapadded inv1.5.1

func (colsColumns) Map() map[string]*Column

Map return the map of columns

func (Columns)Mergeadded inv1.1.15

func (colsColumns) Merge(newColsColumns, overwritebool) (col2Columns, added schemaChg, changed []schemaChg)

func (Columns)Names

func (colsColumns) Names(args ...bool) []string

Names return the column namesargs -> (lower bool, cleanUp bool)

func (Columns)PrettyTableadded inv1.1.8

func (colsColumns) PrettyTable(includeParentbool) (outputstring)

PrettyTable returns a text pretty table

func (Columns)SetKeys

func (colsColumns) SetKeys(keyTypeKeyType, colNames ...string) (errerror)

SetKeys sets key columns

func (Columns)SetMetadataadded inv1.2.15

func (colsColumns) SetMetadata(key, valuestring, colNames ...string) (errerror)

SetMetadata sets metadata for columns

func (Columns)Sourcedadded inv1.1.6

func (colsColumns) Sourced() (sourcedbool)

Sourced returns true if the columns are all sourced

func (Columns)Types

func (colsColumns) Types(args ...bool) []string

Types return the column names/typesargs -> (lower bool, cleanUp bool)

func (Columns)ValidateNamesadded inv1.4.5

func (colsColumns) ValidateNames(tgtTypedbio.Type) (newColsColumns)

ValidateNames truncates the column name it exceed the max column length

func (Columns)WithoutMetaadded inv1.2.2

func (colsColumns) WithoutMeta() (newColsColumns)

WithoutMeta returns the columns with metadata columns

typeCompressor

type Compressor interface {Self()CompressorCompress(io.Reader)io.ReaderDecompress(io.Reader) (io.Reader,error)Suffix()string}

Compressor implements differnt kind of compression

funcNewCompressor

func NewCompressor(cpTypeCompressorType)Compressor

typeCompressorType

type CompressorTypestring

CompressorType is an int type for enum for the Compressor Type

const (// AutoCompressorType is for auto compressionAutoCompressorTypeCompressorType = "auto"// NoneCompressorType is for no compressionNoneCompressorTypeCompressorType = "none"// ZipCompressorType is for Zip compressionZipCompressorTypeCompressorType = "zip"// GzipCompressorType is for Gzip compressionGzipCompressorTypeCompressorType = "gzip"// SnappyCompressorType is for Snappy compressionSnappyCompressorTypeCompressorType = "snappy"// ZStandardCompressorType is for ZStandardZStandardCompressorTypeCompressorType = "zstd")

funcCompressorTypePtr

func CompressorTypePtr(vCompressorType) *CompressorType

CompressorTypePtr returns a pointer to the CompressorType value passed in.

func (CompressorType)Normalizeadded inv1.3.4

func (ctCompressorType) Normalize() *CompressorType

Normalize converts to lowercase

func (CompressorType)Stringadded inv1.3.4

func (ctCompressorType) String()string

String converts to lowercase

typeConstraintEvalFuncadded inv1.2.16

type ConstraintEvalFunc func(valueany)bool

typeCsvDuckDbadded inv1.2.19

type CsvDuckDb struct {URIstringDuck *DuckDb// contains filtered or unexported fields}

funcNewCsvReaderDuckDbadded inv1.2.19

func NewCsvReaderDuckDb(uristring, sc *StreamConfig, props ...string) (*CsvDuckDb,error)

func (*CsvDuckDb)Closeadded inv1.2.19

func (r *CsvDuckDb) Close()error

func (*CsvDuckDb)Columnsadded inv1.2.19

func (r *CsvDuckDb) Columns() (Columns,error)

func (*CsvDuckDb)MakeQueryadded inv1.2.19

func (r *CsvDuckDb) MakeQuery(fscFileStreamConfig)string

typeDataflow

type Dataflow struct {ColumnsColumnsBuffer      [][]interface{}StreamCh    chan *DatastreamStreams     []*DatastreamContext     *g.ContextLimituint64EgressBytesuint64ReadyboolInferredboolFsURLstringOnColumnChanged func(colColumn)errorOnColumnAdded   func(colColumn)errorStreamMap map[string]*DatastreamSchemaVersionint// for column type version// contains filtered or unexported fields}

Dataflow is a collection of concurrent Datastreams

funcMakeDataFlow

func MakeDataFlow(dss ...*Datastream) (df *Dataflow, errerror)

MakeDataFlow create a dataflow from datastreams

funcNewDataflow

func NewDataflow(limit ...int) (df *Dataflow)

NewDataflow creates a new dataflow

funcNewDataflowContextadded inv1.1.15

func NewDataflowContext(ctxcontext.Context, limit ...int) (df *Dataflow)

func (*Dataflow)AddColumns

func (df *Dataflow) AddColumns(newColsColumns, overwritebool, exceptDs ...string) (addedColumns, processOkbool)

SetColumns sets the columns

func (*Dataflow)AddEgressBytesadded inv1.2.2

func (df *Dataflow) AddEgressBytes(bytesuint64)

AddEgressBytes add egress bytes

func (*Dataflow)BufferDatasetadded inv1.2.25

func (df *Dataflow) BufferDataset()Dataset

BufferDataset return the buffer as a dataset

func (*Dataflow)Bytes

func (df *Dataflow) Bytes() (inBytes, outBytesuint64)

func (*Dataflow)ChangeColumn

func (df *Dataflow) ChangeColumn(iint, newTypeColumnType, exceptDs ...string)bool

SetColumns sets the columns

func (*Dataflow)CleanUp

func (df *Dataflow) CleanUp()

CleanUp refers the defer functions

func (*Dataflow)Close

func (df *Dataflow) Close()

Close closes the df

func (*Dataflow)CloseCurrentBatches

func (df *Dataflow) CloseCurrentBatches()

func (*Dataflow)Collect

func (df *Dataflow) Collect() (dataDataset, errerror)

Collect reads from one or more streams and return a dataset

func (*Dataflow)Count

func (df *Dataflow) Count() (cntuint64)

Count returns the aggregate count

func (*Dataflow)Defer

func (df *Dataflow) Defer(f func())

Defer runs a given function as close of Dataflow

func (*Dataflow)DsTotalBytes

func (df *Dataflow) DsTotalBytes() (bytesuint64)

func (*Dataflow)Err

func (df *Dataflow) Err() (errerror)

Err return the error if any

func (*Dataflow)IsClosed

func (df *Dataflow) IsClosed()bool

IsClosed is true is ds is closed

func (*Dataflow)IsEmpty

func (df *Dataflow) IsEmpty()bool

IsEmpty returns true is ds.Rows of all channels as empty

func (*Dataflow)MakeStreamCh

func (df *Dataflow) MakeStreamCh(forceMergebool) (streamCh chan *Datastream)

MakeStreamCh determines whether to merge all the streams into oneor keep them separate. If data is small per stream, it's best to mergeFor example, Bigquery has limits on number of operations can be called within a time limit

func (*Dataflow)MergeColumnsadded inv1.1.15

func (df *Dataflow) MergeColumns(columns []Column, inferredbool) (processOkbool)

SetColumns sets the columns

func (*Dataflow)Pause

func (df *Dataflow) Pause(exceptDs ...string)bool

Pause pauses all streams

func (*Dataflow)PropagateColumadded inv1.4.24

func (df *Dataflow) PropagateColum(colIndexint)

PropagateColum propagates the dataflow column properties to underlying datastreams

func (*Dataflow)PushStreamChan

func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)

func (*Dataflow)SetBatchLimitadded inv1.2.11

func (df *Dataflow) SetBatchLimit(limitint64)

SetBatchLimit set the ds.Batch.Limit

func (*Dataflow)SetConfigadded inv1.1.15

func (df *Dataflow) SetConfig(cfgStreamConfig)

SetConfig set the Sp config

func (*Dataflow)SetEmpty

func (df *Dataflow) SetEmpty()

SetEmpty sets all underlying datastreams empty

func (*Dataflow)SetReady

func (df *Dataflow) SetReady()

SetReady sets the df.ready

func (*Dataflow)Size

func (df *Dataflow) Size()int

Size is the number of streams

func (*Dataflow)StreamConfigadded inv1.2.15

func (df *Dataflow) StreamConfig() (cfgStreamConfig)

StreamConfig get the first Sp config

func (*Dataflow)SyncColumns

func (df *Dataflow) SyncColumns()

SyncColumns a workaround to synch the ds.Columns to the df.Columns

func (*Dataflow)SyncStats

func (df *Dataflow) SyncStats()

SyncStats sync stream processor stats aggregated to the df.Columns

func (*Dataflow)Unpause

func (df *Dataflow) Unpause(exceptDs ...string)

Unpause unpauses all streams

func (*Dataflow)WaitClosed

func (df *Dataflow) WaitClosed()

WaitClosed waits until dataflow is closedhack to make sure all streams are pushed

func (*Dataflow)WaitReady

func (df *Dataflow) WaitReady()error

WaitReady waits until dataflow is ready

typeDataset

type Dataset struct {Result        *sqlx.Rows       `json:"-"`ColumnsColumns          `json:"columns"`Rows          [][]any          `json:"rows"`SQLstring           `json:"sql"`Durationfloat64          `json:"duration"`Sp            *StreamProcessor `json:"-"`Inferredbool             `json:"inferred"`SafeInferencebool             `json:"safe_inference"`NoDebugbool             `json:"no_debug"`}

Dataset is a query returned dataset

funcNewDataset

func NewDataset(columnsColumns) (dataDataset)

NewDataset return a new dataset

funcNewDatasetFromMap

func NewDatasetFromMap(m map[string]any) (dataDataset)

NewDatasetFromMap return a new dataset

funcNewExcelDatasetadded inv1.2.2

func NewExcelDataset(readerio.Reader, props map[string]string) (dataDataset, errerror)

funcReadCsv

func ReadCsv(pathstring) (Dataset,error)

ReadCsv reads CSV and returns dataset

func (*Dataset)AddColumns

func (data *Dataset) AddColumns(newColsColumns, overwritebool) (addedColumns)

SetColumns sets the columns

func (*Dataset)Append

func (data *Dataset) Append(row ...[]any)

Append appends a new row

func (*Dataset)ColValues

func (data *Dataset) ColValues(colint) []any

ColValues returns the values of a one column as array

func (*Dataset)ColValuesStr

func (data *Dataset) ColValuesStr(colint) []string

ColValuesStr returns the values of a one column as array or string

func (*Dataset)FirstRow

func (data *Dataset) FirstRow() []any

FirstRow returns the first row

func (*Dataset)FirstVal

func (data *Dataset) FirstVal()any

FirstVal returns the first value from the first row

func (*Dataset)GetFields

func (data *Dataset) GetFields(lower ...bool) []string

GetFields return the fields of the Data

func (*Dataset)InferColumnTypes

func (data *Dataset) InferColumnTypes()

InferColumnTypes determines the columns types

func (*Dataset)Pick

func (data *Dataset) Pick(colNames ...string) (nDataDataset)

Pick returns a new dataset with specified columns

func (*Dataset)PrettyTableadded inv1.1.8

func (data *Dataset) PrettyTable(fields ...string) (outputstring)

func (*Dataset)Print

func (data *Dataset) Print(limitint)

Print pretty prints the data with a limit0 is unlimited

func (*Dataset)Records

func (data *Dataset) Records(lower ...bool) []map[string]any

Records return rows of maps

func (*Dataset)RecordsCasted

func (data *Dataset) RecordsCasted(lower ...bool) []map[string]any

RecordsCasted return rows of maps or casted values

func (*Dataset)RecordsString

func (data *Dataset) RecordsString(lower ...bool) []map[string]string

RecordsString return rows of maps or string values

func (*Dataset)SetFields

func (data *Dataset) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Dataset)Sort

func (data *Dataset) Sort(args ...any)

Sort sorts by colsexample: `data.Sort(0, 2, 3, false)` will sortcol0, col2, col3 descendingexample: `data.Sort(0, 2, true)` will sortcol0, col2 ascending

func (*Dataset)Stream

func (data *Dataset) Stream(Props ...map[string]string) *Datastream

Stream returns a datastream of the dataset

func (*Dataset)ToJSONMap

func (data *Dataset) ToJSONMap() map[string]any

ToJSONMap converst to a JSON object

func (*Dataset)WriteCsv

func (data *Dataset) WriteCsv(destio.Writer) (tbwint, errerror)

WriteCsv writes to a writer

typeDatastream

type Datastream struct {ColumnsColumnsBuffer        [][]anyBatchChan     chan *BatchBatches       []*BatchCurrentBatch  *BatchCountuint64Context       *g.ContextReadyboolBytesatomic.Uint64Sp            *StreamProcessorSafeInferenceboolNoDebugboolInferredboolIDstringMetadataMetadata// map of column name to metadata type// contains filtered or unexported fields}

Datastream is a stream of rows

funcMergeDataflow

func MergeDataflow(df *Dataflow) (dsN *Datastream)

MergeDataflow merges the dataflow streams into one

funcNewDatastream

func NewDatastream(columnsColumns) (ds *Datastream)

NewDatastream return a new datastream

funcNewDatastreamContext

func NewDatastreamContext(ctxcontext.Context, columnsColumns) (ds *Datastream)

NewDatastreamContext return a new datastream

funcNewDatastreamIt

func NewDatastreamIt(ctxcontext.Context, columnsColumns, nextFunc func(it *Iterator)bool) (ds *Datastream)

NewDatastreamIt with it

funcReadCsvStream

func ReadCsvStream(pathstring) (ds *Datastream, errerror)

ReadCsvStream reads CSV and returns datasream

func (*Datastream)AddBytes

func (ds *Datastream) AddBytes(bint64)

AddBytes add bytes as processed

func (*Datastream)AddColumns

func (ds *Datastream) AddColumns(newColsColumns, overwritebool) (addedColumns)

SetColumns sets the columns

func (*Datastream)CastRowToString

func (ds *Datastream) CastRowToString(row []any) []string

CastRowToString returns the row as string casted

func (*Datastream)CastToStringSafeMaskadded inv1.2.14

func (ds *Datastream) CastToStringSafeMask(row []any) []string

CastToStringSafeMask returns the row as string mask casted ( evensafer)

func (*Datastream)ChangeColumn

func (ds *Datastream) ChangeColumn(iint, newTypeColumnType)

ChangeColumn applies a column type change

func (*Datastream)Chunk

func (ds *Datastream) Chunk(limituint64) (chDs chan *Datastream)

Chunk splits the datastream into chunk datastreams (in sequence)

func (*Datastream)Close

func (ds *Datastream) Close()

Close closes the datastream

func (*Datastream)Collect

func (ds *Datastream) Collect(limitint) (Dataset,error)

Collect reads a stream and return a datasetlimit of 0 is unlimited

func (*Datastream)ConsumeArrowReaderadded inv1.4.10

func (ds *Datastream) ConsumeArrowReader(readerio.Reader) (errerror)

func (*Datastream)ConsumeArrowReaderSeekeradded inv1.4.10

func (ds *Datastream) ConsumeArrowReaderSeeker(reader *os.File) (errerror)

ConsumeArrowReaderSeeker uses the provided reader to stream rows

func (*Datastream)ConsumeArrowReaderStreamadded inv1.4.17

func (ds *Datastream) ConsumeArrowReaderStream(readerio.Reader) (errerror)

ConsumeArrowReaderStream uses the provided reader to stream rows

func (*Datastream)ConsumeAvroReader

func (ds *Datastream) ConsumeAvroReader(readerio.Reader) (errerror)

ConsumeAvroReader uses the provided reader to stream rows

func (*Datastream)ConsumeAvroReaderSeeker

func (ds *Datastream) ConsumeAvroReaderSeeker(readerio.ReadSeeker) (errerror)

ConsumeAvroReaderSeeker uses the provided reader to stream rows

func (*Datastream)ConsumeCsvReader

func (ds *Datastream) ConsumeCsvReader(readerio.Reader) (errerror)

ConsumeCsvReader uses the provided reader to stream rows

func (*Datastream)ConsumeCsvReaderChladded inv1.2.4

func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (errerror)

ConsumeCsvReaderChl reads a channel of readers. Should be safe to use withheader top row

func (*Datastream)ConsumeCsvReaderDuckDbadded inv1.2.19

func (ds *Datastream) ConsumeCsvReaderDuckDb(uristring, scFileStreamConfig) (errerror)

ConsumeCsvReaderDuckDb uses the provided reader to stream rows

func (*Datastream)ConsumeDeltaReaderadded inv1.2.16

func (ds *Datastream) ConsumeDeltaReader(uristring, scFileStreamConfig) (errerror)

ConsumeDeltaReader uses the provided reader to stream rows

func (*Datastream)ConsumeExcelReaderadded inv1.2.2

func (ds *Datastream) ConsumeExcelReader(readerio.Reader, props map[string]string) (errerror)

ConsumeSASReader uses the provided reader to stream rows

func (*Datastream)ConsumeExcelReaderSeekeradded inv1.2.2

func (ds *Datastream) ConsumeExcelReaderSeeker(readerio.ReadSeeker, props map[string]string) (errerror)

ConsumeSASReaderSeeker uses the provided reader to stream rows

func (*Datastream)ConsumeIcebergReaderadded inv1.2.16

func (ds *Datastream) ConsumeIcebergReader(uristring, scFileStreamConfig) (errerror)

ConsumeIcebergReader uses the provided reader to stream rows

func (*Datastream)ConsumeJsonReader

func (ds *Datastream) ConsumeJsonReader(readerio.Reader) (errerror)

ConsumeJsonReader uses the provided reader to stream JSONThis will put each JSON rec as one string valueso payload can be processed downstream

func (*Datastream)ConsumeJsonReaderChladded inv1.2.6

func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXMLbool) (errerror)

func (*Datastream)ConsumeParquetReader

func (ds *Datastream) ConsumeParquetReader(readerio.Reader) (errerror)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream)ConsumeParquetReaderDuckDbadded inv1.2.16

func (ds *Datastream) ConsumeParquetReaderDuckDb(uristring, scFileStreamConfig) (errerror)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream)ConsumeParquetReaderSeeker

func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (errerror)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream)ConsumeSASReader

func (ds *Datastream) ConsumeSASReader(readerio.Reader) (errerror)

ConsumeSASReader uses the provided reader to stream rows

func (*Datastream)ConsumeSASReaderSeeker

func (ds *Datastream) ConsumeSASReaderSeeker(readerio.ReadSeeker) (errerror)

ConsumeSASReaderSeeker uses the provided reader to stream rows

func (*Datastream)ConsumeXmlReader

func (ds *Datastream) ConsumeXmlReader(readerio.Reader) (errerror)

ConsumeXmlReader uses the provided reader to stream XMLThis will put each XML rec as one string valueso payload can be processed downstream

func (*Datastream)Defer

func (ds *Datastream) Defer(f func())

Defer runs a given function as close of Datastream

func (*Datastream)Df

func (ds *Datastream) Df() *Dataflow

func (*Datastream)Err

func (ds *Datastream) Err() (errerror)

Err return the error if any

func (*Datastream)GetConfig

func (ds *Datastream) GetConfig() (configMap map[string]string)

GetConfig get config

func (*Datastream)GetFields

func (ds *Datastream) GetFields(args ...bool) []string

GetFields return the fields of the Data

func (*Datastream)IsClosed

func (ds *Datastream) IsClosed()bool

IsClosed is true is ds is closed

func (*Datastream)LatestBatch

func (ds *Datastream) LatestBatch() *Batch

func (*Datastream)Limitedadded inv1.2.4

func (ds *Datastream) Limited(limit ...int)bool

func (*Datastream)Map

func (ds *Datastream) Map(newColumnsColumns, transf func([]any) []any) (nDs *Datastream)

Map applies the provided function to every rowand returns the result

func (*Datastream)MapParallel

func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkersint) (nDs *Datastream)

MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.

func (*Datastream)NewArrowReaderChnladded inv1.4.10

func (ds *Datastream) NewArrowReaderChnl(scStreamConfig) (readerChn chan *BatchReader)

NewArrowReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes

func (*Datastream)NewBatch

func (ds *Datastream) NewBatch(columnsColumns) *Batch

NewBatch create new batch with fixed columnsshould be used each time column type changes, or columns are added

func (*Datastream)NewCsvBufferReader

func (ds *Datastream) NewCsvBufferReader(scStreamConfig) *bytes.Reader

NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream)NewCsvBufferReaderChnl

func (ds *Datastream) NewCsvBufferReaderChnl(scStreamConfig) (readerChn chan *bytes.Reader)

NewCsvBufferReaderChnl provides a channel of readers as the limit is reacheddata is read in memory, whereas NewCsvReaderChnl does not hold in memory

func (*Datastream)NewCsvBytesChnl

func (ds *Datastream) NewCsvBytesChnl(scStreamConfig) (dataChn chan *[]byte)

NewCsvBytesChnl returns a channel yield chunk of bytes of csv

func (*Datastream)NewCsvReader

func (ds *Datastream) NewCsvReader(scStreamConfig) *io.PipeReader

NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream)NewCsvReaderChnl

func (ds *Datastream) NewCsvReaderChnl(scStreamConfig) (readerChn chan *BatchReader)

NewCsvReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes

func (*Datastream)NewExcelReaderChnladded inv1.2.2

func (ds *Datastream) NewExcelReaderChnl(scStreamConfig) (readerChn chan *BatchReader)

func (*Datastream)NewIterator

func (ds *Datastream) NewIterator(columnsColumns, nextFunc func(it *Iterator)bool) *Iterator

func (*Datastream)NewJsonLinesReaderChnl

func (ds *Datastream) NewJsonLinesReaderChnl(scStreamConfig) (readerChn chan *io.PipeReader)

NewJsonLinesReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes

func (*Datastream)NewJsonReaderChnl

func (ds *Datastream) NewJsonReaderChnl(scStreamConfig) (readerChn chan *io.PipeReader)

func (*Datastream)NewParquetArrowReaderChnladded inv1.1.7

func (ds *Datastream) NewParquetArrowReaderChnl(scStreamConfig) (readerChn chan *BatchReader)

NewParquetArrowReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumesWARN: Not using this one since it doesn't write Decimals properly.

func (*Datastream)NewParquetReaderChnl

func (ds *Datastream) NewParquetReaderChnl(scStreamConfig) (readerChn chan *BatchReader)

NewParquetReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes

func (*Datastream)Pause

func (ds *Datastream) Pause()

func (*Datastream)Push

func (ds *Datastream) Push(row []any)

Push return the fields of the Data

func (*Datastream)Records

func (ds *Datastream) Records() <-chan map[string]any

Records return rows of maps

func (*Datastream)Rows

func (ds *Datastream) Rows() chan []any

func (*Datastream)SetConfig

func (ds *Datastream) SetConfig(configMap map[string]string)

SetConfig sets the ds.config values

func (*Datastream)SetEmpty

func (ds *Datastream) SetEmpty()

SetEmpty sets the ds.Rows channel as empty

func (*Datastream)SetFields

func (ds *Datastream) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Datastream)SetFileURIadded inv1.1.15

func (ds *Datastream) SetFileURI()

SetFileURI sets the FileURI of the columns of the Datastream

func (*Datastream)SetIteratoradded inv1.1.14

func (ds *Datastream) SetIterator(it *Iterator)

func (*Datastream)SetMetadata

func (ds *Datastream) SetMetadata(jsonStrstring)

func (*Datastream)SetReady

func (ds *Datastream) SetReady()

SetReady sets the ds.ready

func (*Datastream)Shape

func (ds *Datastream) Shape(columnsColumns) (nDs *Datastream, errerror)

Shape changes the column types as needed, to the provided columns varIt will cast the already wrongly casted rows, and not recast thecorrectly casted rows

func (*Datastream)Split

func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)

Split splits the datastream into parallel datastreams

func (*Datastream)Start

func (ds *Datastream) Start() (errerror)

Start generates the streamShould cycle the Iter Func until done

func (*Datastream)TryPause

func (ds *Datastream) TryPause()bool

func (*Datastream)Unpause

func (ds *Datastream) Unpause()

Unpause unpauses all streams

func (*Datastream)WaitClosed

func (ds *Datastream) WaitClosed()

WaitClosed waits until dataflow is closedhack to make sure all streams are pushed

func (*Datastream)WaitReady

func (ds *Datastream) WaitReady()error

WaitReady waits until datastream is ready

typeDecimalColumnTypingadded inv1.4.5

type DecimalColumnTyping struct {MinPrecision *int   `json:"min_precision,omitempty" yaml:"min_precision,omitempty"`// Total number of digitsMaxPrecisionint    `json:"max_precision,omitempty" yaml:"max_precision,omitempty"`// Total number of digitsMinScale     *int   `json:"min_scale,omitempty" yaml:"min_scale,omitempty"`// Number of digits after decimal pointMaxScaleint    `json:"max_scale,omitempty" yaml:"max_scale,omitempty"`CastAsstring `json:"cast_as,omitempty" yaml:"cast_as,omitempty"`}

DecimalColumnTyping contains decimal type mapping configurations

func (*DecimalColumnTyping)Applyadded inv1.4.5

func (dct *DecimalColumnTyping) Apply(colColumn) (precision, scaleint)

typeDeltaReaderadded inv1.2.16

type DeltaReader struct {URIstringDuck *DuckDb// contains filtered or unexported fields}

funcNewDeltaReaderadded inv1.2.16

func NewDeltaReader(uristring, props ...string) (*DeltaReader,error)

func (*DeltaReader)Closeadded inv1.2.16

func (r *DeltaReader) Close()error

func (*DeltaReader)Columnsadded inv1.2.16

func (r *DeltaReader) Columns() (Columns,error)

func (*DeltaReader)MakeQueryadded inv1.2.19

func (r *DeltaReader) MakeQuery(scFileStreamConfig)string

typeDuckDbadded inv1.2.16

type DuckDb struct {Context *g.ContextProc    *process.Proc// contains filtered or unexported fields}

DuckDb is a Duck DB compute layer

funcNewDuckDbadded inv1.2.16

func NewDuckDb(ctxcontext.Context, props ...string) *DuckDb

NewDuckDb creates a new DuckDb instance with the given context and properties

func (*DuckDb)AddExtensionadded inv1.2.16

func (duck *DuckDb) AddExtension(extensionstring)

AddExtension adds an extension to the DuckDb instance if it's not already present

func (*DuckDb)AddSecretadded inv1.4.11

func (duck *DuckDb) AddSecret(secretDuckDbSecret)

AddSecret registers a new secret in session (also adds needed extension)

func (*DuckDb)CheckExtensionadded inv1.4.18

func (duck *DuckDb) CheckExtension(extensionstring) (bool,error)

CheckExtension checks if an extension is installed in DuckDB

func (*DuckDb)Closeadded inv1.2.16

func (duck *DuckDb) Close()error

Close closes the connection

func (*DuckDb)DataflowToHttpStreamadded inv1.3.6

func (duck *DuckDb) DataflowToHttpStream(df *Dataflow, scStreamConfig) (streamPartChn chanHttpStreamPart, errerror)

func (*DuckDb)DefaultCsvConfigadded inv1.3.6

func (duck *DuckDb) DefaultCsvConfig() (configStreamConfig)

func (*DuckDb)Describeadded inv1.2.16

func (duck *DuckDb) Describe(querystring) (columnsColumns, errerror)

Describe returns the columns of a query

func (*DuckDb)EnsureBinDuckDBadded inv1.4.23

func (duck *DuckDb) EnsureBinDuckDB(versionstring) (binPathstring, errerror)

EnsureBinDuckDB ensures duckdb binary existsif missing, downloads and uses

func (*DuckDb)Execadded inv1.2.16

func (duck *DuckDb) Exec(sqlstring, args ...any) (resultsql.Result, errerror)

Exec executes a SQL query and returns the result

func (*DuckDb)ExecContextadded inv1.2.16

func (duck *DuckDb) ExecContext(ctxcontext.Context, sqlstring, args ...any) (resultsql.Result, errerror)

ExecContext executes a SQL query with context and returns the result

func (*DuckDb)ExecMultiContextadded inv1.2.16

func (duck *DuckDb) ExecMultiContext(ctxcontext.Context, sqls ...string) (resultsql.Result, errerror)

ExecMultiContext executes multiple SQL queries with context and returns the result

func (*DuckDb)GenerateCopyStatementadded inv1.2.25

func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPathstring, optionsDuckDbCopyOptions) (sqlstring, errerror)

func (*DuckDb)GenerateCsvColumnsadded inv1.3.6

func (duck *DuckDb) GenerateCsvColumns(columnsColumns) (colStrstring)

func (*DuckDb)GetPropadded inv1.2.16

func (duck *DuckDb) GetProp(keystring)string

GetProp retrieves a property value for the DuckDb instance

func (*DuckDb)GetScannerFuncadded inv1.2.19

func (duck *DuckDb) GetScannerFunc(formatdbio.FileType) (scanFuncstring)

func (*DuckDb)MakeScanQueryadded inv1.2.19

func (duck *DuckDb) MakeScanQuery(formatdbio.FileType, uristring, fscFileStreamConfig) (sqlstring)

func (*DuckDb)Openadded inv1.2.16

func (duck *DuckDb) Open(timeOut ...int) (errerror)

Open initializes the DuckDb connection

func (*DuckDb)PrepareFsSecretAndURIadded inv1.2.16

func (duck *DuckDb) PrepareFsSecretAndURI(uristring)string

PrepareFsSecretAndURI prepares the secret configuration from the fs_props and modifies the URI if necessaryfor different storage types (S3, Google Cloud Storage, Azure Blob Storage).It returns the modified URI string.

The function handles the following storage types:- Local files: Removes the "file://" prefix- S3: Configures AWS credentials and handles Cloudflare R2 storage- Google Cloud Storage: Sets up GCS credentials- Azure Blob Storage: Configures Azure connection string or account name

It uses the DuckDb instance's properties to populate the secret configuration.

func (*DuckDb)Propsadded inv1.2.16

func (duck *DuckDb) Props() map[string]string

Props returns all properties of the DuckDb instance as a map

func (*DuckDb)Queryadded inv1.2.16

func (duck *DuckDb) Query(sqlstring, options ...map[string]any) (dataDataset, errerror)

Query runs a sql query, returns `Dataset`

func (*DuckDb)QueryContextadded inv1.2.16

func (duck *DuckDb) QueryContext(ctxcontext.Context, sqlstring, options ...map[string]any) (dataDataset, errerror)

QueryContext runs a sql query with context, returns `Dataset`

func (*DuckDb)Quoteadded inv1.2.16

func (duck *DuckDb) Quote(colstring) (qNamestring)

Quote quotes a column name

func (*DuckDb)SetPropadded inv1.2.16

func (duck *DuckDb) SetProp(keystring, valuestring)

SetProp sets a property for the DuckDb instance

func (*DuckDb)Streamadded inv1.2.16

func (duck *DuckDb) Stream(sqlstring, options ...map[string]any) (ds *Datastream, errerror)

Stream runs a sql query, returns `Datastream`

func (*DuckDb)StreamContextadded inv1.2.16

func (duck *DuckDb) StreamContext(ctxcontext.Context, sqlstring, options ...map[string]any) (ds *Datastream, errerror)

StreamContext runs a sql query with context, returns `Datastream`

func (*DuckDb)SubmitSQLadded inv1.2.19

func (duck *DuckDb) SubmitSQL(sqlstring, showChangesbool) (errerror)

SubmitSQL submits a sql query to duckdb via stdin

typeDuckDbCopyOptionsadded inv1.2.25

type DuckDbCopyOptions struct {Formatdbio.FileTypeCompressionCompressorTypePartitionFields    []PartitionLevel// part_year, part_month, part_day, etc.PartitionKeystringWritePartitionColsboolFileSizeBytesint64}

typeDuckDbSecretadded inv1.4.11

type DuckDbSecret struct {TypeDuckDbSecretType  `json:"type"`Namestring            `json:"name"`Props map[string]string `json:"props"`}

funcNewDuckDbSecretadded inv1.4.11

func NewDuckDbSecret(namestring, secretTypeDuckDbSecretType, props map[string]string)DuckDbSecret

func (*DuckDbSecret)AddPropadded inv1.4.11

func (dds *DuckDbSecret) AddProp(key, valuestring)

func (*DuckDbSecret)Renderadded inv1.4.11

func (dds *DuckDbSecret) Render()string

typeDuckDbSecretTypeadded inv1.4.11

type DuckDbSecretTypestring
const (DuckDbSecretTypeUnknownDuckDbSecretType = ""DuckDbSecretTypeS3DuckDbSecretType = "s3"DuckDbSecretTypeR2DuckDbSecretType = "r2"DuckDbSecretTypeGCSDuckDbSecretType = "gcs"DuckDbSecretTypeAzureDuckDbSecretType = "azure"DuckDbSecretTypeIcebergDuckDbSecretType = "iceberg")

typeEncodingadded inv1.4.13

type Encodingstring
var (EncodingLatin1Encoding = "latin1"EncodingLatin5Encoding = "latin5"EncodingLatin9Encoding = "latin9"EncodingUtf8Encoding = "utf8"EncodingUtf8BomEncoding = "utf8_bom"EncodingUtf16Encoding = "utf16"EncodingWindows1250Encoding = "windows1250"EncodingWindows1252Encoding = "windows1252"Encodings = []Encoding{EncodingLatin1,EncodingLatin5,EncodingLatin9,EncodingUtf8,EncodingUtf8Bom,EncodingUtf16,EncodingWindows1250,EncodingWindows1252,})

func (Encoding)DecodeStringadded inv1.4.17

func (eEncoding) DecodeString()string

func (Encoding)EncodeStringadded inv1.4.17

func (eEncoding) EncodeString()string

func (Encoding)Stringadded inv1.4.13

func (eEncoding) String()string

typeEvaluatoradded inv1.4.14

type Evaluator struct {Eval            *goval.EvaluatorState           map[string]anyNoComputeKeystringVarPrefixes     []stringKeepMissingExprbool// allows us to leave any missing sub-expression intactAllowNoPrefixboolIgnoreSyntaxErrbool}

funcNewEvaluatoradded inv1.4.14

func NewEvaluator(varPrefixes []string, states ...map[string]any) *Evaluator

func (*Evaluator)Checkadded inv1.4.20

func (e *Evaluator) Check(exprstring) (errerror)

func (*Evaluator)ExtractVarsadded inv1.4.14

func (e *Evaluator) ExtractVars(exprstring) []string

ExtractVars identifies variable references in a string expression,ignoring those inside double quotes. It can recognize patterns like env.VAR, state.VAR, secrets.VAR, and auth.VAR.When AllowNoPrefix is true, it also captures unprefixed variables like MY_VAR, some_value, etc.

func (*Evaluator)FillMissingKeysadded inv1.4.25

func (e *Evaluator) FillMissingKeys(stateMap map[string]any, varsToCheck []string) map[string]any

func (*Evaluator)FindMatchesadded inv1.5.1

func (e *Evaluator) FindMatches(inputStrstring) (expressions []string, errerror)

FindMatches parses the input string and extracts expressions within curly braces,properly handling nested brackets and quoted strings.Returns an error if brackets are unbalanced.

func (*Evaluator)RenderAnyadded inv1.4.14

func (e *Evaluator) RenderAny(inputany, extras ...map[string]any) (outputany, errerror)

func (*Evaluator)RenderPayloadadded inv1.4.14

func (e *Evaluator) RenderPayload(valany, extras ...map[string]any) (newValany, errerror)

func (*Evaluator)RenderStringadded inv1.4.14

func (e *Evaluator) RenderString(valany, extras ...map[string]any) (newValstring, errerror)

typeExceladded inv1.2.2

type Excel struct {File   *excelize.FileSheets []stringPathstring// contains filtered or unexported fields}

Excel represent an Excel object pointing to its file

funcNewExceladded inv1.2.2

func NewExcel() (xls *Excel)

NewExcel creates a new excel file

funcNewExcelFromFileadded inv1.2.2

func NewExcelFromFile(pathstring) (xls *Excel, errerror)

NewExcelFromFile return a new Excel instance from a local file

funcNewExcelFromReaderadded inv1.2.2

func NewExcelFromReader(readerio.Reader, opts ...excelize.Options) (xls *Excel, errerror)

NewExcelFromReader return a new Excel instance from a reader

func (*Excel)GetDatasetadded inv1.2.2

func (xls *Excel) GetDataset(sheetstring) (dataDataset)

GetDataset returns a dataset of the provided sheet

func (*Excel)GetDatasetFromRangeadded inv1.2.2

func (xls *Excel) GetDatasetFromRange(sheet, cellRangestring) (dataDataset, errerror)

GetDatasetFromRange returns a dataset of the provided sheet / rangecellRange example: `$AH$13:$AI$20` or `AH13:AI20` or `A:E`

func (*Excel)RefreshSheetsadded inv1.2.2

func (xls *Excel) RefreshSheets() (errerror)

RefreshSheets refresh sheet index data

func (*Excel)TitleToNumberadded inv1.4.24

func (xls *Excel) TitleToNumber(sstring)int

TitleToNumber provides a function to convert Excel sheet column title toint (this function doesn't do value check currently). For example convertAK and ak to column title 36:

excelize.TitleToNumber("AK")excelize.TitleToNumber("ak")

func (*Excel)WriteSheetadded inv1.2.2

func (xls *Excel) WriteSheet(shtNamestring, ds *Datastream, modestring) (errerror)

WriteSheet write a datastream into a sheetmode can be: `new`, `append` or `overwrite`. Default is `new`

func (*Excel)WriteToFileadded inv1.2.2

func (xls *Excel) WriteToFile(pathstring) (errerror)

WriteToFile write to a file

func (*Excel)WriteToWriteradded inv1.2.2

func (xls *Excel) WriteToWriter(wio.Writer) (errerror)

WriteToWriter write to a provided writer

typeFileStreamConfigadded inv1.2.19

type FileStreamConfig struct {Limitint               `json:"limit"`Select           []string          `json:"select"`SQLstring            `json:"sql"`Formatdbio.FileType     `json:"format"`IncrementalKeystring            `json:"incremental_key"`IncrementalValuestring            `json:"incremental_value"`FileSelect       []string          `json:"file_select"`// a list of files to include.DuckDBFilenamebool              `json:"duckdb_filename"`// stream URLProps            map[string]string `json:"props"`}

func (*FileStreamConfig)GetPropadded inv1.3.5

func (sc *FileStreamConfig) GetProp(keystring)string

func (*FileStreamConfig)SetPropadded inv1.3.5

func (sc *FileStreamConfig) SetProp(key, valstring)

func (*FileStreamConfig)ShouldUseDuckDBadded inv1.2.19

func (sc *FileStreamConfig) ShouldUseDuckDB()bool

typeGoogleSheetadded inv1.2.2

type GoogleSheet struct {Sheets        []stringSpreadsheetIDstring// contains filtered or unexported fields}

GoogleSheet represent a Google Sheet object

funcNewGoogleSheetadded inv1.2.2

func NewGoogleSheet(props ...string) (ggs *GoogleSheet, errerror)

NewGoogleSheet is a blank spreadsheettitle is the new spreadsheet title

funcNewGoogleSheetFromURLadded inv1.2.2

func NewGoogleSheetFromURL(urlStrstring, props ...string) (ggs *GoogleSheet, errerror)

NewGoogleSheetFromURL return a new GoogleSheet instance from a provided url

func (*GoogleSheet)DeleteSheetadded inv1.2.2

func (ggs *GoogleSheet) DeleteSheet(shtNamestring) (errerror)

func (*GoogleSheet)GetDatasetadded inv1.2.2

func (ggs *GoogleSheet) GetDataset(shtNamestring) (dataDataset, errerror)

GetDataset returns a dataset of the sheet

func (*GoogleSheet)GetDatasetFromRangeadded inv1.2.2

func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRangestring) (dataDataset, errerror)

GetDatasetFromRange returns a dataset from the specified range

func (*GoogleSheet)RefreshSheetsadded inv1.2.2

func (ggs *GoogleSheet) RefreshSheets() (errerror)

RefreshSheets refreshes sheets data

func (*GoogleSheet)URLadded inv1.2.2

func (ggs *GoogleSheet) URL()string

func (*GoogleSheet)WriteSheetadded inv1.2.2

func (ggs *GoogleSheet) WriteSheet(shtNamestring, ds *Datastream, modestring) (errerror)

WriteSheet write a datastream into a sheetmode can be: `new`, `append` or `overwrite`. Default is `new`

typeGzipCompressor

type GzipCompressor struct {Compressor// contains filtered or unexported fields}

func (*GzipCompressor)Compress

func (cp *GzipCompressor) Compress(readerio.Reader)io.Reader

Compress uses gzip to compress

func (*GzipCompressor)Decompress

func (cp *GzipCompressor) Decompress(readerio.Reader) (gReaderio.Reader, errerror)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*GzipCompressor)Suffix

func (cp *GzipCompressor) Suffix()string

typeHttpStreamPartadded inv1.3.6

type HttpStreamPart struct {IndexintFromExprstringColumnsColumns}

typeIcebergReaderadded inv1.2.16

type IcebergReader struct {URIstringDuck *DuckDb// contains filtered or unexported fields}

funcNewIcebergReaderadded inv1.2.16

func NewIcebergReader(uristring, props ...string) (*IcebergReader,error)

func (*IcebergReader)Closeadded inv1.2.16

func (i *IcebergReader) Close()error

func (*IcebergReader)Columnsadded inv1.2.16

func (r *IcebergReader) Columns() (Columns,error)

func (*IcebergReader)MakeQueryadded inv1.2.19

func (r *IcebergReader) MakeQuery(scFileStreamConfig)string

typeIterator

type Iterator struct {Row          []anyReprocess    chan []anyIsCastedboolRowIsCastedboolCounteruint64StreamRowNumuint64Context      *g.ContextClosedbool// contains filtered or unexported fields}

Iterator is the row provider for a datastream

func (*Iterator)BelowEqualIncrementalValadded inv1.2.10

func (it *Iterator) BelowEqualIncrementalVal()bool

BelowEqualIncrementalVal evaluates the incremental value against the incrementalColthis is used when the stream is a file with incremental mode(unable to filter at source like a database)it.incrementalVal and it.incrementalColI need to be set

func (*Iterator)Ds

func (it *Iterator) Ds() *Datastream

typeJsonColumnTypingadded inv1.4.7

type JsonColumnTyping struct {AsTextbool `json:"as_text,omitempty" yaml:"as_text,omitempty"`}

JsonColumnTyping contains json type mapping configurations

func (*JsonColumnTyping)Applyadded inv1.4.7

func (jct *JsonColumnTyping) Apply(col *Column)

typeKeyType

type KeyTypestring
const (AggregateKeyKeyType = "aggregate"ClusterKeyKeyType = "cluster"DistributionKeyKeyType = "distribution"DuplicateKeyKeyType = "duplicate"HashKeyKeyType = "hash"IndexKeyKeyType = "index"PartitionKeyKeyType = "partition"PrimaryKeyKeyType = "primary"SortKeyKeyType = "sort"UniqueKeyKeyType = "unique"UpdateKeyKeyType = "update")

func (KeyType)MetadataKeyadded inv1.2.15

func (ktKeyType) MetadataKey()string

typeKeyValue

type KeyValue struct {Keystring `json:"key"`Valueany    `json:"value"`}

typeMetadata

type Metadata struct {StreamURLKeyValue `json:"stream_url"`LoadedAtKeyValue `json:"loaded_at"`RowNumKeyValue `json:"row_num"`RowIDKeyValue `json:"row_id"`ExecIDKeyValue `json:"exec_id"`}

func (*Metadata)AsMap

func (m *Metadata) AsMap() map[string]any

AsMap return as map

typeNoneCompressor

type NoneCompressor struct {Compressor// contains filtered or unexported fields}

func (*NoneCompressor)Compress

func (cp *NoneCompressor) Compress(readerio.Reader)io.Reader

func (*NoneCompressor)Decompress

func (cp *NoneCompressor) Decompress(readerio.Reader) (gReaderio.Reader, errerror)

func (*NoneCompressor)Suffix

func (cp *NoneCompressor) Suffix()string

typeParquet

type Parquet struct {PathstringReader *parquet.ReaderData   *Dataset// contains filtered or unexported fields}

Parquet is a parquet object

funcNewParquetReaderadded inv1.1.7

func NewParquetReader(readerio.ReaderAt, columnsColumns) (p *Parquet, errerror)

func (*Parquet)Columns

func (p *Parquet) Columns()Columns

typeParquetArrowReaderadded inv1.1.6

type ParquetArrowReader struct {PathstringReader  *pqarrow.FileReaderFile    *os.FileData    *DatasetContext *g.ContextMemorymemory.Allocator// contains filtered or unexported fields}

ParquetArrowReader is a parquet reader object using arrow v18

funcNewParquetArrowReaderadded inv1.1.7

func NewParquetArrowReader(reader *os.File, selected []string) (p *ParquetArrowReader, errerror)

func (*ParquetArrowReader)Columnsadded inv1.1.6

func (p *ParquetArrowReader) Columns()Columns

typeParquetArrowWriteradded inv1.1.6

type ParquetArrowWriter struct {Writer *pqarrow.FileWriter// contains filtered or unexported fields}

funcNewParquetArrowWriteradded inv1.1.6

func NewParquetArrowWriter(wio.Writer, columnsColumns, codeccompress.Compression) (p *ParquetArrowWriter, errerror)

func (*ParquetArrowWriter)Closeadded inv1.1.6

func (p *ParquetArrowWriter) Close()error

func (*ParquetArrowWriter)Columnsadded inv1.1.6

func (p *ParquetArrowWriter) Columns()Columns

func (*ParquetArrowWriter)WriteRowadded inv1.1.6

func (p *ParquetArrowWriter) WriteRow(row []any)error

typeParquetDuckDbadded inv1.2.16

type ParquetDuckDb struct {URIstringDuck *DuckDb// contains filtered or unexported fields}

funcNewParquetReaderDuckDbadded inv1.2.16

func NewParquetReaderDuckDb(uristring, props ...string) (*ParquetDuckDb,error)

func (*ParquetDuckDb)Closeadded inv1.2.16

func (r *ParquetDuckDb) Close()error

func (*ParquetDuckDb)Columnsadded inv1.2.16

func (r *ParquetDuckDb) Columns() (Columns,error)

func (*ParquetDuckDb)MakeQueryadded inv1.2.19

func (r *ParquetDuckDb) MakeQuery(scFileStreamConfig)string

typeParquetWriteradded inv1.1.7

type ParquetWriter struct {Writer    *parquet.WriterWriterMap *parquet.GenericWriter[map[string]any]// contains filtered or unexported fields}

funcNewParquetWriteradded inv1.1.7

func NewParquetWriter(wio.Writer, columnsColumns, codeccompress.Codec) (p *ParquetWriter, errerror)

funcNewParquetWriterMapadded inv1.2.12

func NewParquetWriterMap(wio.Writer, columnsColumns, codeccompress.Codec) (p *ParquetWriter, errerror)

func (*ParquetWriter)Closeadded inv1.1.7

func (pw *ParquetWriter) Close()error

func (*ParquetWriter)WriteRecadded inv1.2.12

func (pw *ParquetWriter) WriteRec(row []any)error

func (*ParquetWriter)WriteRowadded inv1.1.7

func (pw *ParquetWriter) WriteRow(row []any)error

typePartitionLeveladded inv1.3.4

type PartitionLevelstring
const (PartitionLevelSecondPartitionLevel = "second"PartitionLevelMinutePartitionLevel = "minute"PartitionLevelHourPartitionLevel = "hour"PartitionLevelDayPartitionLevel = "day"PartitionLevelWeekPartitionLevel = "week"PartitionLevelYearMonthPartitionLevel = "year_month"PartitionLevelMonthPartitionLevel = "month"PartitionLevelYearPartitionLevel = "year")

funcExtractPartitionFieldsadded inv1.3.4

func ExtractPartitionFields(pathstring) (levels []PartitionLevel)

ExtractPartitionFields extract the partition fields from the given path

funcGetLowestPartTimeLeveladded inv1.3.6

func GetLowestPartTimeLevel(urlstring) (PartitionLevel,error)

func (PartitionLevel)IsValidadded inv1.3.4

func (levelPartitionLevel) IsValid()bool

func (PartitionLevel)Stradded inv1.3.6

func (levelPartitionLevel) Str()string

func (PartitionLevel)TruncateTimeadded inv1.3.4

func (levelPartitionLevel) TruncateTime(ttime.Time) (time.Time,error)

typeQueueadded inv1.4.11

type Queue struct {Pathstring        `json:"path"`File   *os.File      `json:"-"`Reader *bufio.Reader `json:"-"`Writer *bufio.Writer `json:"-"`// contains filtered or unexported fields}

funcNewQueueadded inv1.4.11

func NewQueue(namestring) (q *Queue, errerror)

NewQueue creates a new queue with a temporary file

func (*Queue)Appendadded inv1.4.11

func (q *Queue) Append(dataany)error

Append writes a line to the queue

func (*Queue)Closeadded inv1.4.11

func (q *Queue) Close()error

Close closes and optionally removes the queue file

func (*Queue)Nextadded inv1.4.11

func (q *Queue) Next() (any,bool,error)

Next reads the next line from the queueReturns the line, a boolean indicating if there are more lines, and any error

func (*Queue)Resetadded inv1.4.11

func (q *Queue) Reset()error

Reset positions the reader at the beginning of the file

typeReaderReadyadded inv1.2.6

type ReaderReady struct {Readerio.ReaderURIstring}

typeRecNode

type RecNode struct {// contains filtered or unexported fields}

funcNewRecNode

func NewRecNode(colsColumns, optionalbool) *RecNode

func (*RecNode)Compression

func (rn *RecNode) Compression()compress.Codec

func (*RecNode)Encoding

func (rn *RecNode) Encoding()encoding.Encoding

func (*RecNode)Fields

func (rn *RecNode) Fields() []parquet.Field

func (*RecNode)GoType

func (rn *RecNode) GoType()reflect.Type

func (*RecNode)ID

func (rn *RecNode) ID()int

func (*RecNode)Leaf

func (rn *RecNode) Leaf()bool

func (*RecNode)Optional

func (rn *RecNode) Optional()bool

func (*RecNode)Repeated

func (rn *RecNode) Repeated()bool

func (*RecNode)Required

func (rn *RecNode) Required()bool

func (*RecNode)String

func (rn *RecNode) String()string

func (*RecNode)Type

func (rn *RecNode) Type()parquet.Type

typeRecord

type Record struct {Columns *ColumnsValues  []any}

typeSAS

type SAS struct {PathstringReader *datareader.SAS7BDATData   *Dataset// contains filtered or unexported fields}

SAS is a sas7bdat object

funcNewSASStream

func NewSASStream(readerio.ReadSeeker, columnsColumns) (s *SAS, errerror)

func (*SAS)Columns

func (s *SAS) Columns()Columns

typeSSHClient

type SSHClient struct {HoststringPortintUserstringPasswordstringTgtHoststringTgtPortintPrivateKeystringPassphrasestringErrerror// contains filtered or unexported fields}

SSHClient is a client to connect to a ssh serverwith the main goal of forwarding ports

func (*SSHClient)Close

func (s *SSHClient) Close()

Close stops the client connection

func (*SSHClient)Connect

func (s *SSHClient) Connect() (errerror)

Connect connects to the server

func (*SSHClient)GetOutput

func (s *SSHClient) GetOutput() (stdoutstring, stderrstring)

GetOutput return stdout & stderr outputs

func (*SSHClient)NewSessionadded inv1.5.1

func (s *SSHClient) NewSession() (*ssh.Session,error)

NewSession creates a new SSH session

func (*SSHClient)OpenPortForward

func (s *SSHClient) OpenPortForward() (localPortint, errerror)

OpenPortForward forwards the port as specified

func (*SSHClient)RunAsProcess

func (s *SSHClient) RunAsProcess() (localPortint, errerror)

RunAsProcess uses a separate processenables to use public key authhttps://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key

func (*SSHClient)SftpClient

func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, errerror)

SftpClient returns an SftpClient

typeSelectoradded inv1.5.1

type Selector struct {ConnTypedbio.Type// contains filtered or unexported fields}

Selector provides efficient field selection, exclusion, and renaming.Use NewSelector to build from select expressions, then Apply for each field.

funcNewSelectoradded inv1.5.1

func NewSelector(selectExprs []string, casingColumnCasing) *Selector

NewSelector creates a Selector from select expressions.All field matching is case-insensitive.

func (*Selector)Applyadded inv1.5.1

func (s *Selector) Apply(namestring) (newstring, includedbool)

Apply checks if a field should be included and returns its (possibly renamed) name.Results are cached for O(1) lookups after the first call for each field.Priority: rename > exclude > glob exclude > All > include > glob include

typeShaper

type Shaper struct {Func       func([]any) []anySrcColumnsColumnsTgtColumnsColumnsColMap     map[int]int}

typeSnappyCompressor

type SnappyCompressor struct {Compressor// contains filtered or unexported fields}

func (*SnappyCompressor)Compress

func (cp *SnappyCompressor) Compress(readerio.Reader)io.Reader

Compress uses gzip to compress

func (*SnappyCompressor)Decompress

func (cp *SnappyCompressor) Decompress(readerio.Reader) (sReaderio.Reader, errerror)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*SnappyCompressor)Suffix

func (cp *SnappyCompressor) Suffix()string

typeStreamConfigadded inv1.1.15

type StreamConfig struct {EmptyAsNullbool           `json:"empty_as_null"`Headerbool           `json:"header"`CompressionCompressorType `json:"compression"`// AUTO | ZIP | GZIP | SNAPPY | NONENullIfstring         `json:"null_if"`NullAsstring         `json:"null_as"`DatetimeFormatstring         `json:"datetime_format"`SkipBlankLinesbool           `json:"skip_blank_lines"`Formatdbio.FileType  `json:"format"`Delimiterstring         `json:"delimiter"`Escapestring         `json:"escape"`Quotestring         `json:"quote"`FileMaxRowsint64          `json:"file_max_rows"`FileMaxBytesint64          `json:"file_max_bytes"`BatchLimitint64          `json:"batch_limit"`MaxDecimalsint            `json:"max_decimals"`Flattenint            `json:"flatten"`FieldsPerRecint            `json:"fields_per_rec"`Jmespathstring         `json:"jmespath"`Sheetstring         `json:"sheet"`ColumnCasingColumnCasing   `json:"column_casing"`TargetTypedbio.Type      `json:"target_type"`DeleteFilebool           `json:"delete"`// whether to delete before writingBoolAsIntbool           `json:"-"`ColumnsColumns        `json:"columns"`// list of column types. Can be partial list! likely is!TransformsTransformMap map[string]string `json:"-"`}

funcDefaultStreamConfigadded inv1.1.15

func DefaultStreamConfig()StreamConfig

funcLoaderStreamConfigadded inv1.2.22

func LoaderStreamConfig(headerbool)StreamConfig

func (*StreamConfig)ToMapadded inv1.2.22

func (sc *StreamConfig) ToMap() map[string]string

typeStreamProcessor

type StreamProcessor struct {Nuint64ConfigStreamConfig// contains filtered or unexported fields}

StreamProcessor processes rows and values

funcNewStreamProcessor

func NewStreamProcessor() *StreamProcessor

NewStreamProcessor returns a new StreamProcessor

func (*StreamProcessor)CastRow

func (sp *StreamProcessor) CastRow(row []any, columnsColumns) []any

CastRow casts each value of a rowslows down processing about 40%?

func (*StreamProcessor)CastToBooladded inv1.2.12

func (sp *StreamProcessor) CastToBool(iany) (bbool, errerror)

CastToBool converts interface to bool

func (*StreamProcessor)CastToString

func (sp *StreamProcessor) CastToString(valany) (valStringstring)

func (*StreamProcessor)CastToStringCSVadded inv1.4.17

func (sp *StreamProcessor) CastToStringCSV(iint, valany, valType ...ColumnType)string

CastToStringCSV to string. used for csv writingslows processing down 5% with upstream CastRow or 35% without upstream CastRow

func (*StreamProcessor)CastToStringEadded inv1.4.17

func (sp *StreamProcessor) CastToStringE(valany) (valStringstring, errerror)

func (*StreamProcessor)CastToStringSafeMaskadded inv1.2.14

func (sp *StreamProcessor) CastToStringSafeMask(iint, valany, valType ...ColumnType)string

CastToStringSafe to masks to count bytes (even safer)

func (*StreamProcessor)CastToTime

func (sp *StreamProcessor) CastToTime(iany) (ttime.Time, errerror)

CastToTime converts interface to time

func (*StreamProcessor)CastType

func (sp *StreamProcessor) CastType(valany, typColumnType)any

CastType casts the type of an interfaceCastType is used to cast the interface place holders?

func (*StreamProcessor)CastVal

func (sp *StreamProcessor) CastVal(iint, valany, col *Column)any

CastVal casts values with stats collectionwhich degrades performance by ~10%go test -benchmem -run='^$ github.com/slingdata-io/sling-cli/core/dbio/iop' -bench '^BenchmarkProcessVal'

func (*StreamProcessor)CastValWithoutStats

func (sp *StreamProcessor) CastValWithoutStats(iint, valany, typColumnType)any

CastValWithoutStats casts the value without counting stats

func (*StreamProcessor)CheckTypeadded inv1.4.18

func (sp *StreamProcessor) CheckType(vany) (typColumnType)

CheckType returns the type of an interface

func (*StreamProcessor)ColStatsadded inv1.2.10

func (sp *StreamProcessor) ColStats() map[int]*ColumnStats

func (*StreamProcessor)CountDigitsadded inv1.4.20

func (sp *StreamProcessor) CountDigits(numberstring) (precision, scaleint)

func (*StreamProcessor)GetType

func (sp *StreamProcessor) GetType(valany) (typColumnType)

GetType returns the type of an interface

func (*StreamProcessor)ParseString

func (sp *StreamProcessor) ParseString(sstring, jj ...int) (valany)

ParseString return an interfacestring: "varchar"integer: "integer"decimal: "decimal"date: "date"datetime: "timestamp"timestamp: "timestamp"text: "text"

func (*StreamProcessor)ParseTime

func (sp *StreamProcessor) ParseTime(iany) (ttime.Time, errerror)

ParseTime parses a date string and returns time.Time

func (*StreamProcessor)ParseVal

func (sp *StreamProcessor) ParseVal(valany)any

ParseVal parses the value into its appropriate type

func (*StreamProcessor)ProcessRow

func (sp *StreamProcessor) ProcessRow(row []any) []any

ProcessRow processes a row

func (*StreamProcessor)ProcessVal

func (sp *StreamProcessor) ProcessVal(valany)any

ProcessVal processes a value

func (*StreamProcessor)ResetConfigadded inv1.1.15

func (sp *StreamProcessor) ResetConfig()

func (*StreamProcessor)SetConfig

func (sp *StreamProcessor) SetConfig(configMap map[string]string)

SetConfig sets the data.Sp.config values

func (*StreamProcessor)TruncateDecimalStringadded inv1.4.20

func (sp *StreamProcessor) TruncateDecimalString(numberstring, decCountint) (newNumberstring)

TruncateDecimalString return up to specified scale, without converting

typeStringColumnTypingadded inv1.4.5

type StringColumnTyping struct {LengthFactorint  `json:"length_factor,omitempty" yaml:"length_factor,omitempty"`MinLengthint  `json:"min_length,omitempty" yaml:"min_length,omitempty"`MaxLengthint  `json:"max_length,omitempty" yaml:"max_length,omitempty"`UseMaxbool `json:"use_max,omitempty" yaml:"use_max,omitempty"`Notestring `json:"note,omitempty" yaml:"note,omitempty"`}

StringColumnTyping contains string type mapping configurations

func (*StringColumnTyping)Applyadded inv1.4.5

func (sct *StringColumnTyping) Apply(length, maxint) (newLengthint)

typeTimeLeveladded inv1.3.6

type TimeLevelstring
const (TimeLevelYearTimeLevel = "YYYY"TimeLevelYearShortTimeLevel = "YY"TimeLevelMonthNameTimeLevel = "MMM"TimeLevelMonthTimeLevel = "MM"TimeLevelDayTimeLevel = "DD"TimeLevelDayOfYearTimeLevel = "DDD"TimeLevelHour24TimeLevel = "HH"TimeLevelHour12TimeLevel = "hh"TimeLevelMinuteTimeLevel = "mm"TimeLevelSecondTimeLevel = "ss")

funcExtractISO8601DateFieldsadded inv1.3.6

func ExtractISO8601DateFields(pathstring) (list []TimeLevel)

ExtractISO8601DateFields return a list of found date fields

func (TimeLevel)AsPartitionLeveladded inv1.3.6

func (tlTimeLevel) AsPartitionLevel()PartitionLevel

typeTransformadded inv1.2.2

type Transform interface {Evaluate(row []any) (newRow []any, errerror)Casted()bool}

typeTransformLegacyadded inv1.4.17

type TransformLegacy struct {NamestringFunc       func(*StreamProcessor, ...any) (any,error)FuncString func(*StreamProcessor,string) (string,error)FuncTime   func(*StreamProcessor, *time.Time)error}

typeTransformLegacyListadded inv1.4.17

type TransformLegacyList []TransformLegacy

func (TransformLegacyList)HasTransformadded inv1.4.17

typeTransformersadded inv1.2.2

type Transformers struct {Accenttransform.TransformerDecodeUTF8transform.TransformerDecodeUTF8BOMtransform.TransformerDecodeUTF16transform.TransformerDecodeISO8859_1transform.TransformerDecodeISO8859_5transform.TransformerDecodeISO8859_15transform.TransformerDecodeWindows1250transform.TransformerDecodeWindows1252transform.TransformerEncodeUTF8transform.TransformerEncodeUTF8BOMtransform.TransformerEncodeUTF16transform.TransformerEncodeISO8859_1transform.TransformerEncodeISO8859_5transform.TransformerEncodeISO8859_15transform.TransformerEncodeWindows1250transform.TransformerEncodeWindows1252transform.Transformer}

funcNewTransformersadded inv1.2.2

func NewTransformers()Transformers

typeZStandardCompressor

type ZStandardCompressor struct {Compressor// contains filtered or unexported fields}

func (*ZStandardCompressor)Compress

func (cp *ZStandardCompressor) Compress(readerio.Reader)io.Reader

Compress uses gzip to compress

func (*ZStandardCompressor)Decompress

func (cp *ZStandardCompressor) Decompress(readerio.Reader) (sReaderio.Reader, errerror)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*ZStandardCompressor)Suffix

func (cp *ZStandardCompressor) Suffix()string

Source Files

View all Source files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f orF : Jump to
y orY : Canonical URL
go.dev uses cookies from Google to deliver and enhance the quality of its services and to analyze traffic.Learn more.

[8]ページ先頭

©2009-2025 Movatter.jp