iop
packageThis package is not in the latest version of its module.
Details
Validgo.mod file
The Go module system was introduced in Go 1.11 and is the official dependency management solution for Go.
Redistributable license
Redistributable licenses place minimal restrictions on how software can be used, modified, and redistributed.
Tagged version
Modules with tagged versions give importers more predictable builds.
Stable version
When a project reaches major version v1 it is considered stable.
- Learn more about best practices
Repository
Links
README¶
Input-Process-Output (ipo)
Documentation¶
Index¶
- Variables
- func AppendToBuilder(builder array.Builder, col *Column, val interface{})
- func ApplySelect(fields []string, selectExprs []string) (newFields []string, err error)
- func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)
- func CleanHeaderRow(header []string) []string
- func CleanName(name string) (newName string)
- func CloseQueues()
- func ColumnsToArrowSchema(columns Columns) *arrow.Schema
- func CompareColumns(columns1 Columns, columns2 Columns) (reshape bool, err error)
- func CreateDummyFields(numCols int) (cols []string)
- func DecodeJSONIfBase64(jsonBody string) (string, error)
- func ExtractPartitionTimeValue(mask, path string) (timestamp time.Time, err error)
- func FormatValue(val any, columnType ColumnType, connType dbio.Type) (newVal string)
- func GeneratePartURIsFromRange(mask, updateKey string, start, end time.Time) (uris []string, err error)
- func GetISO8601DateMap(t time.Time) map[string]any
- func GetLowestPartTimeUnit(mask string) (time.Duration, error)
- func GetPartitionDateMap(partKeyPrefix string, timestamp time.Time) map[string]any
- func GetValueFromArrowArray(arr arrow.Array, idx int) any
- func IsDummy(columns []Column) bool
- func Iso8601ToGoLayout(dateFormat string) (goDateFormat string)
- func MakeAwsConfig(ctx context.Context, props map[string]string) (cfg aws.Config, err error)
- func MakeDecNumScale(scale int) *big.Rat
- func MakeRowsChan() chan []any
- func MatchedPartitionMask(mask, path string) (matches bool)
- func NewJSONStream(ds *Datastream, decoder decoderLike, flatten int, jmespath string) *jsonStream
- func NewXMLDecoder(reader io.Reader) *xmlDecoder
- func OpenTunnelSSH(tgtHost string, tgtPort int, tunnelURL, privateKey, passphrase string) (localPort int, err error)
- func Row(vals ...any) []any
- func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)
- func SetSampleSize()
- func StringToDecimalByteArray(s string, numSca *big.Rat, pType parquet.Type, length int) []byte
- func StripSQLComments(sql string) (string, error)
- func Unzip(src string, dest string) (nodes []map[string]any, err error)
- type ArrowReader
- type ArrowWriter
- type Avro
- type Batch
- func (b *Batch) AddTransform(transf func(row []any) []any)
- func (b *Batch) Close()
- func (b *Batch) ColumnsChanged() bool
- func (b *Batch) Ds() *Datastream
- func (b *Batch) ID() string
- func (b *Batch) IsFirst() bool
- func (b *Batch) Push(row []any)
- func (b *Batch) Shape(tgtColumns Columns, pause ...bool) (err error)
- type BatchReader
- type CSV
- func (c *CSV) InferSchema() error
- func (c *CSV) NewReader() (*io.PipeReader, error)
- func (c *CSV) Read() (data Dataset, err error)
- func (c *CSV) ReadStream() (ds *Datastream, err error)
- func (c *CSV) ReadStreamContext(ctx context.Context) (ds *Datastream, err error)
- func (c *CSV) Sample(n int) (Dataset, error)
- func (c *CSV) SetFields(fields []string)
- func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
- type Column
- func (col *Column) EvaluateConstraint(value any, sp *StreamProcessor) (err error)
- func (col *Column) GetNativeType(t dbio.Type, ct ColumnTyping) (nativeType string, err error)
- func (col *Column) GoType() reflect.Type
- func (col *Column) HasNulls() bool
- func (col *Column) HasNullsPlus1() bool
- func (col *Column) IsBinary() bool
- func (col *Column) IsBool() bool
- func (col *Column) IsDate() bool
- func (col *Column) IsDatetime() bool
- func (col *Column) IsDecimal() bool
- func (col *Column) IsFloat() bool
- func (col *Column) IsInteger() bool
- func (col *Column) IsKeyType(keyType KeyType) bool
- func (col *Column) IsNumber() bool
- func (col *Column) IsString() bool
- func (col *Column) IsUnique() bool
- func (col *Column) Key() string
- func (col *Column) SetConstraint()
- func (col *Column) SetLengthPrecisionScale()
- func (col *Column) SetMetadata(key string, value string)
- type ColumnCasing
- type ColumnConstraint
- type ColumnStats
- type ColumnType
- func (ct ColumnType) IsBinary() bool
- func (ct ColumnType) IsBool() bool
- func (ct ColumnType) IsDate() bool
- func (ct ColumnType) IsDatetime() bool
- func (ct ColumnType) IsDecimal() bool
- func (ct ColumnType) IsFloat() bool
- func (ct ColumnType) IsInteger() bool
- func (ct ColumnType) IsJSON() bool
- func (ct ColumnType) IsNumber() bool
- func (ct ColumnType) IsString() bool
- func (ct ColumnType) IsTime() bool
- func (ct ColumnType) IsValid() bool
- type ColumnTyping
- type Columns
- func (cols Columns) Clone() (newCols Columns)
- func (cols Columns) Coerce(castCols Columns, hasHeader bool, casing ColumnCasing, tgtType dbio.Type) (newCols Columns)
- func (cols Columns) Data(includeParent bool) (fields []string, rows [][]any)
- func (cols Columns) Dataset() Dataset
- func (cols Columns) DbTypes(args ...bool) []string
- func (cols Columns) FieldMap(toLower bool) map[string]int
- func (cols Columns) GetColumn(name string) *Column
- func (cols Columns) GetKeys(keyType KeyType) Columns
- func (cols Columns) GetMissing(newCols ...Column) (missing Columns)
- func (cols Columns) IsDifferent(newCols Columns) bool
- func (cols Columns) IsDummy() bool
- func (cols Columns) IsSimilarTo(otherCols Columns) bool
- func (cols Columns) JSON(includeParent bool) (output string)
- func (cols Columns) Keys() []string
- func (cols Columns) MakeRec(row []any) map[string]any
- func (cols Columns) MakeShaper(tgtColumns Columns) (shaper *Shaper, err error)
- func (cols Columns) Map() map[string]*Column
- func (cols Columns) Merge(newCols Columns, overwrite bool) (col2 Columns, added schemaChg, changed []schemaChg)
- func (cols Columns) Names(args ...bool) []string
- func (cols Columns) PrettyTable(includeParent bool) (output string)
- func (cols Columns) SetKeys(keyType KeyType, colNames ...string) (err error)
- func (cols Columns) SetMetadata(key, value string, colNames ...string) (err error)
- func (cols Columns) Sourced() (sourced bool)
- func (cols Columns) Types(args ...bool) []string
- func (cols Columns) ValidateNames(tgtType dbio.Type) (newCols Columns)
- func (cols Columns) WithoutMeta() (newCols Columns)
- type Compressor
- type CompressorType
- type ConstraintEvalFunc
- type CsvDuckDb
- type Dataflow
- func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)
- func (df *Dataflow) AddEgressBytes(bytes uint64)
- func (df *Dataflow) BufferDataset() Dataset
- func (df *Dataflow) Bytes() (inBytes, outBytes uint64)
- func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool
- func (df *Dataflow) CleanUp()
- func (df *Dataflow) Close()
- func (df *Dataflow) CloseCurrentBatches()
- func (df *Dataflow) Collect() (data Dataset, err error)
- func (df *Dataflow) Count() (cnt uint64)
- func (df *Dataflow) Defer(f func())
- func (df *Dataflow) DsTotalBytes() (bytes uint64)
- func (df *Dataflow) Err() (err error)
- func (df *Dataflow) IsClosed() bool
- func (df *Dataflow) IsEmpty() bool
- func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)
- func (df *Dataflow) MergeColumns(columns []Column, inferred bool) (processOk bool)
- func (df *Dataflow) Pause(exceptDs ...string) bool
- func (df *Dataflow) PropagateColum(colIndex int)
- func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
- func (df *Dataflow) SetBatchLimit(limit int64)
- func (df *Dataflow) SetConfig(cfg StreamConfig)
- func (df *Dataflow) SetEmpty()
- func (df *Dataflow) SetReady()
- func (df *Dataflow) Size() int
- func (df *Dataflow) StreamConfig() (cfg StreamConfig)
- func (df *Dataflow) SyncColumns()
- func (df *Dataflow) SyncStats()
- func (df *Dataflow) Unpause(exceptDs ...string)
- func (df *Dataflow) WaitClosed()
- func (df *Dataflow) WaitReady() error
- type Dataset
- func (data *Dataset) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (data *Dataset) Append(row ...[]any)
- func (data *Dataset) ColValues(col int) []any
- func (data *Dataset) ColValuesStr(col int) []string
- func (data *Dataset) FirstRow() []any
- func (data *Dataset) FirstVal() any
- func (data *Dataset) GetFields(lower ...bool) []string
- func (data *Dataset) InferColumnTypes()
- func (data *Dataset) Pick(colNames ...string) (nData Dataset)
- func (data *Dataset) PrettyTable(fields ...string) (output string)
- func (data *Dataset) Print(limit int)
- func (data *Dataset) Records(lower ...bool) []map[string]any
- func (data *Dataset) RecordsCasted(lower ...bool) []map[string]any
- func (data *Dataset) RecordsString(lower ...bool) []map[string]string
- func (data *Dataset) SetFields(fields []string)
- func (data *Dataset) Sort(args ...any)
- func (data *Dataset) Stream(Props ...map[string]string) *Datastream
- func (data *Dataset) ToJSONMap() map[string]any
- func (data *Dataset) WriteCsv(dest io.Writer) (tbw int, err error)
- type Datastream
- func MergeDataflow(df *Dataflow) (dsN *Datastream)
- func NewDatastream(columns Columns) (ds *Datastream)
- func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
- func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
- func ReadCsvStream(path string) (ds *Datastream, err error)
- func (ds *Datastream) AddBytes(b int64)
- func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (ds *Datastream) CastRowToString(row []any) []string
- func (ds *Datastream) CastToStringSafeMask(row []any) []string
- func (ds *Datastream) ChangeColumn(i int, newType ColumnType)
- func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
- func (ds *Datastream) Close()
- func (ds *Datastream) Collect(limit int) (Dataset, error)
- func (ds *Datastream) ConsumeArrowReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeArrowReaderSeeker(reader *os.File) (err error)
- func (ds *Datastream) ConsumeArrowReaderStream(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)
- func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (err error)
- func (ds *Datastream) ConsumeCsvReaderDuckDb(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeDeltaReader(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeExcelReader(reader io.Reader, props map[string]string) (err error)
- func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[string]string) (err error)
- func (ds *Datastream) ConsumeIcebergReader(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXML bool) (err error)
- func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeParquetReaderDuckDb(uri string, sc FileStreamConfig) (err error)
- func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error)
- func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)
- func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
- func (ds *Datastream) Defer(f func())
- func (ds *Datastream) Df() *Dataflow
- func (ds *Datastream) Err() (err error)
- func (ds *Datastream) GetConfig() (configMap map[string]string)
- func (ds *Datastream) GetFields(args ...bool) []string
- func (ds *Datastream) IsClosed() bool
- func (ds *Datastream) LatestBatch() *Batch
- func (ds *Datastream) Limited(limit ...int) bool
- func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)
- func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)
- func (ds *Datastream) NewArrowReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) NewBatch(columns Columns) *Batch
- func (ds *Datastream) NewCsvBufferReader(sc StreamConfig) *bytes.Reader
- func (ds *Datastream) NewCsvBufferReaderChnl(sc StreamConfig) (readerChn chan *bytes.Reader)
- func (ds *Datastream) NewCsvBytesChnl(sc StreamConfig) (dataChn chan *[]byte)
- func (ds *Datastream) NewCsvReader(sc StreamConfig) *io.PipeReader
- func (ds *Datastream) NewCsvReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) NewExcelReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator
- func (ds *Datastream) NewJsonLinesReaderChnl(sc StreamConfig) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewJsonReaderChnl(sc StreamConfig) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewParquetArrowReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) NewParquetReaderChnl(sc StreamConfig) (readerChn chan *BatchReader)
- func (ds *Datastream) Pause()
- func (ds *Datastream) Push(row []any)
- func (ds *Datastream) Records() <-chan map[string]any
- func (ds *Datastream) Rows() chan []any
- func (ds *Datastream) SetConfig(configMap map[string]string)
- func (ds *Datastream) SetEmpty()
- func (ds *Datastream) SetFields(fields []string)
- func (ds *Datastream) SetFileURI()
- func (ds *Datastream) SetIterator(it *Iterator)
- func (ds *Datastream) SetMetadata(jsonStr string)
- func (ds *Datastream) SetReady()
- func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
- func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
- func (ds *Datastream) Start() (err error)
- func (ds *Datastream) TryPause() bool
- func (ds *Datastream) Unpause()
- func (ds *Datastream) WaitClosed()
- func (ds *Datastream) WaitReady() error
- type DecimalColumnTyping
- type DeltaReader
- type DuckDb
- func (duck *DuckDb) AddExtension(extension string)
- func (duck *DuckDb) AddSecret(secret DuckDbSecret)
- func (duck *DuckDb) CheckExtension(extension string) (bool, error)
- func (duck *DuckDb) Close() error
- func (duck *DuckDb) DataflowToHttpStream(df *Dataflow, sc StreamConfig) (streamPartChn chan HttpStreamPart, err error)
- func (duck *DuckDb) DefaultCsvConfig() (config StreamConfig)
- func (duck *DuckDb) Describe(query string) (columns Columns, err error)
- func (duck *DuckDb) EnsureBinDuckDB(version string) (binPath string, err error)
- func (duck *DuckDb) Exec(sql string, args ...any) (result sql.Result, err error)
- func (duck *DuckDb) ExecContext(ctx context.Context, sql string, args ...any) (result sql.Result, err error)
- func (duck *DuckDb) ExecMultiContext(ctx context.Context, sqls ...string) (result sql.Result, err error)
- func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options DuckDbCopyOptions) (sql string, err error)
- func (duck *DuckDb) GenerateCsvColumns(columns Columns) (colStr string)
- func (duck *DuckDb) GetProp(key string) string
- func (duck *DuckDb) GetScannerFunc(format dbio.FileType) (scanFunc string)
- func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStreamConfig) (sql string)
- func (duck *DuckDb) Open(timeOut ...int) (err error)
- func (duck *DuckDb) PrepareFsSecretAndURI(uri string) string
- func (duck *DuckDb) Props() map[string]string
- func (duck *DuckDb) Query(sql string, options ...map[string]any) (data Dataset, err error)
- func (duck *DuckDb) QueryContext(ctx context.Context, sql string, options ...map[string]any) (data Dataset, err error)
- func (duck *DuckDb) Quote(col string) (qName string)
- func (duck *DuckDb) SetProp(key string, value string)
- func (duck *DuckDb) Stream(sql string, options ...map[string]any) (ds *Datastream, err error)
- func (duck *DuckDb) StreamContext(ctx context.Context, sql string, options ...map[string]any) (ds *Datastream, err error)
- func (duck *DuckDb) SubmitSQL(sql string, showChanges bool) (err error)
- type DuckDbCopyOptions
- type DuckDbSecret
- type DuckDbSecretType
- type Encoding
- type Evaluator
- func (e *Evaluator) Check(expr string) (err error)
- func (e *Evaluator) ExtractVars(expr string) []string
- func (e *Evaluator) FillMissingKeys(stateMap map[string]any, varsToCheck []string) map[string]any
- func (e *Evaluator) FindMatches(inputStr string) (expressions []string, err error)
- func (e *Evaluator) RenderAny(input any, extras ...map[string]any) (output any, err error)
- func (e *Evaluator) RenderPayload(val any, extras ...map[string]any) (newVal any, err error)
- func (e *Evaluator) RenderString(val any, extras ...map[string]any) (newVal string, err error)
- type Excel
- func (xls *Excel) GetDataset(sheet string) (data Dataset)
- func (xls *Excel) GetDatasetFromRange(sheet, cellRange string) (data Dataset, err error)
- func (xls *Excel) RefreshSheets() (err error)
- func (xls *Excel) TitleToNumber(s string) int
- func (xls *Excel) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
- func (xls *Excel) WriteToFile(path string) (err error)
- func (xls *Excel) WriteToWriter(w io.Writer) (err error)
- type FileStreamConfig
- type GoogleSheet
- func (ggs *GoogleSheet) DeleteSheet(shtName string) (err error)
- func (ggs *GoogleSheet) GetDataset(shtName string) (data Dataset, err error)
- func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRange string) (data Dataset, err error)
- func (ggs *GoogleSheet) RefreshSheets() (err error)
- func (ggs *GoogleSheet) URL() string
- func (ggs *GoogleSheet) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
- type GzipCompressor
- type HttpStreamPart
- type IcebergReader
- type Iterator
- type JsonColumnTyping
- type KeyType
- type KeyValue
- type Metadata
- type NoneCompressor
- type Parquet
- type ParquetArrowReader
- type ParquetArrowWriter
- type ParquetDuckDb
- type ParquetWriter
- type PartitionLevel
- type Queue
- type ReaderReady
- type RecNode
- func (rn *RecNode) Compression() compress.Codec
- func (rn *RecNode) Encoding() encoding.Encoding
- func (rn *RecNode) Fields() []parquet.Field
- func (rn *RecNode) GoType() reflect.Type
- func (rn *RecNode) ID() int
- func (rn *RecNode) Leaf() bool
- func (rn *RecNode) Optional() bool
- func (rn *RecNode) Repeated() bool
- func (rn *RecNode) Required() bool
- func (rn *RecNode) String() string
- func (rn *RecNode) Type() parquet.Type
- type Record
- type SAS
- type SSHClient
- func (s *SSHClient) Close()
- func (s *SSHClient) Connect() (err error)
- func (s *SSHClient) GetOutput() (stdout string, stderr string)
- func (s *SSHClient) NewSession() (*ssh.Session, error)
- func (s *SSHClient) OpenPortForward() (localPort int, err error)
- func (s *SSHClient) RunAsProcess() (localPort int, err error)
- func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)
- type Selector
- type Shaper
- type SnappyCompressor
- type StreamConfig
- type StreamProcessor
- func (sp *StreamProcessor) CastRow(row []any, columns Columns) []any
- func (sp *StreamProcessor) CastToBool(i any) (b bool, err error)
- func (sp *StreamProcessor) CastToString(val any) (valString string)
- func (sp *StreamProcessor) CastToStringCSV(i int, val any, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToStringE(val any) (valString string, err error)
- func (sp *StreamProcessor) CastToStringSafeMask(i int, val any, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToTime(i any) (t time.Time, err error)
- func (sp *StreamProcessor) CastType(val any, typ ColumnType) any
- func (sp *StreamProcessor) CastVal(i int, val any, col *Column) any
- func (sp *StreamProcessor) CastValWithoutStats(i int, val any, typ ColumnType) any
- func (sp *StreamProcessor) CheckType(v any) (typ ColumnType)
- func (sp *StreamProcessor) ColStats() map[int]*ColumnStats
- func (sp *StreamProcessor) CountDigits(number string) (precision, scale int)
- func (sp *StreamProcessor) GetType(val any) (typ ColumnType)
- func (sp *StreamProcessor) ParseString(s string, jj ...int) (val any)
- func (sp *StreamProcessor) ParseTime(i any) (t time.Time, err error)
- func (sp *StreamProcessor) ParseVal(val any) any
- func (sp *StreamProcessor) ProcessRow(row []any) []any
- func (sp *StreamProcessor) ProcessVal(val any) any
- func (sp *StreamProcessor) ResetConfig()
- func (sp *StreamProcessor) SetConfig(configMap map[string]string)
- func (sp *StreamProcessor) TruncateDecimalString(number string, decCount int) (newNumber string)
- type StringColumnTyping
- type TimeLevel
- type Transform
- type TransformLegacy
- type TransformLegacyList
- type Transformers
- type ZStandardCompressor
Constants¶
This section is empty.
Variables¶
var (// RemoveTrailingDecZeros removes the trailing zeros in CastToStringRemoveTrailingDecZeros =falseSampleSize = 900)
var (DuckDbVersion = "1.4.2"DuckDbVersionMD = "1.4.2"DuckDbUseTempFile =falseDuckDbURISeparator = "|-|+|")
var (TransformsLegacyMap = map[string]TransformLegacy{}GlobalFunctionMap map[string]goval.ExpressionFunctionGetTransformFunction = func(string)goval.ExpressionFunction {returnnil}FunctionToTransform = func(namestring, fgoval.ExpressionFunction, params ...any)TransformLegacy {returnTransformLegacy{Name: name,Func: func(sp *StreamProcessor, vals ...any) (any,error) {iflen(params) > 0 {vals =append(vals, params...)}val, err := f(vals...)return val, err},}}LocalConnections =cmap.New[map[string]any]())
var (TransformDecodeLatin1 =TransformLegacy{Name:EncodingLatin1.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeISO8859_1, val)return newVal, err},}TransformDecodeLatin5 =TransformLegacy{Name:EncodingLatin5.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeISO8859_5, val)return newVal, err},}TransformDecodeLatin9 =TransformLegacy{Name:EncodingLatin9.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeISO8859_15, val)return newVal, err},}TransformDecodeUtf8 =TransformLegacy{Name:EncodingUtf8.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeUTF8, val)return newVal, err},}TransformDecodeUtf8Bom =TransformLegacy{Name:EncodingUtf8Bom.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeUTF8BOM, val)return newVal, err},}TransformDecodeUtf16 =TransformLegacy{Name:EncodingUtf16.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeUTF16, val)return newVal, err},}TransformDecodeWindows1250 =TransformLegacy{Name:EncodingWindows1250.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeWindows1250, val)return newVal, err},}TransformDecodeWindows1252 =TransformLegacy{Name:EncodingWindows1252.DecodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.DecodeWindows1252, val)return newVal, err},}TransformDuckdbListToText =TransformLegacy{Name: "duckdb_list_to_text",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.duckDbListAsText(val),nil},}TransformEncodeLatin1 =TransformLegacy{Name:EncodingLatin1.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeISO8859_1, val)return newVal, err},}TransformEncodeLatin5 =TransformLegacy{Name:EncodingLatin5.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeISO8859_5, val)return newVal, err},}TransformEncodeLatin9 =TransformLegacy{Name:EncodingLatin9.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeISO8859_15, val)return newVal, err},}TransformEncodeUtf8 =TransformLegacy{Name:EncodingUtf8.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnfmt.Sprintf("%q", val),nilnewVal, _, err :=transform.String(sp.transformers.EncodeUTF8, val)return newVal, err},}TransformEncodeUtf8Bom =TransformLegacy{Name:EncodingUtf8Bom.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeUTF8BOM, val)return newVal, err},}TransformEncodeUtf16 =TransformLegacy{Name:EncodingUtf16.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeUTF16, val)return newVal, err},}TransformEncodeWindows1250 =TransformLegacy{Name:EncodingWindows1250.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeWindows1250, val)return newVal, err},}TransformEncodeWindows1252 =TransformLegacy{Name:EncodingWindows1252.EncodeString(),FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.EncodeWindows1252, val)return newVal, err},}TransformHashMd5 =TransformLegacy{Name: "hash_md5",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returng.MD5(val),nil},}TransformHashSha256 =TransformLegacy{Name: "hash_sha256",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.SHA256(val),nil},}TransformHashSha512 =TransformLegacy{Name: "hash_sha512",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.SHA512(val),nil},}TransformParseBit =TransformLegacy{Name: "parse_bit",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseBit(sp, val)},}TransformBinaryToDecimal =TransformLegacy{Name: "binary_to_decimal",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.BinaryToDecimal(sp, val)},}TransformBinaryToHex =TransformLegacy{Name: "binary_to_hex",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.BinaryToHex(val),nil},}TransformParseFix =TransformLegacy{Name: "parse_fix",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseFIX(sp, val)},}TransformParseUuid =TransformLegacy{Name: "parse_uuid",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseUUID(sp, val)},}TransformParseMsUuid =TransformLegacy{Name: "parse_ms_uuid",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ParseMsUUID(sp, val)},}TransformReplace0x00 =TransformLegacy{Name: "replace_0x00",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.Replace0x00(sp, val)},}TransformReplaceAccents =TransformLegacy{Name: "replace_accents",FuncString: func(sp *StreamProcessor, valstring) (string,error) {newVal, _, err :=transform.String(sp.transformers.Accent, val)return newVal, err},}TransformReplaceNonPrintable =TransformLegacy{Name: "replace_non_printable",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnTransforms.ReplaceNonPrintable(val),nil},}TransformTrimSpace =TransformLegacy{Name: "trim_space",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnstrings.TrimSpace(val),nil},}TransformLower =TransformLegacy{Name: "lower",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnstrings.ToLower(val),nil},}TransformUpper =TransformLegacy{Name: "upper",FuncString: func(sp *StreamProcessor, valstring) (string,error) {returnstrings.ToUpper(val),nil},}// used as lookup, cannot return null since is not pointerTransformEmptyAsNull =TransformLegacy{Name: "empty_as_null",Func: func(sp *StreamProcessor, vals ...any) (any,error) {iflen(vals) == 0 {returnnil,nil}if vals[0] == "" {returnnil,nil}return vals[0],nil},})
var AllCompressorType = []struct {ValueCompressorTypeTSNamestring}{{AutoCompressorType, "AutoCompressorType"},{NoneCompressorType, "NoneCompressorType"},{ZipCompressorType, "ZipCompressorType"},{GzipCompressorType, "GzipCompressorType"},{SnappyCompressorType, "SnappyCompressorType"},{ZStandardCompressorType, "ZStandardCompressorType"},}
var KeyTypes = []KeyType{AggregateKey,ClusterKey,DuplicateKey,HashKey,IndexKey,PartitionKey,PrimaryKey,SortKey,UniqueKey,UpdateKey}var NewTransform = func(t []map[string]string, _ *StreamProcessor)Transform {returnnil}
var ParseStageTransforms = func(payloadany) ([]map[string]string,error) {returnnil,g.Error("please use the official sling-cli release for using transforms")}
var PartitionLevelsAscending = []PartitionLevel{PartitionLevelSecond,PartitionLevelMinute,PartitionLevelHour,PartitionLevelDay,PartitionLevelWeek,PartitionLevelMonth,PartitionLevelYearMonth,PartitionLevelYear,}
var PartitionLevelsDescending = []PartitionLevel{PartitionLevelYear,PartitionLevelYearMonth,PartitionLevelMonth,PartitionLevelWeek,PartitionLevelDay,PartitionLevelHour,PartitionLevelMinute,PartitionLevelSecond,}
var TimeLevelAscending = []TimeLevel{TimeLevelSecond,TimeLevelMinute,TimeLevelHour12,TimeLevelHour24,TimeLevelDayOfYear,TimeLevelDay,TimeLevelMonth,TimeLevelMonthName,TimeLevelYearShort,TimeLevelYear,}
var TimeLevelDescending = []TimeLevel{TimeLevelYear,TimeLevelYearShort,TimeLevelMonthName,TimeLevelMonth,TimeLevelDay,TimeLevelDayOfYear,TimeLevelHour24,TimeLevelHour12,TimeLevelMinute,TimeLevelSecond,}
var Transforms transformsNSFunctions¶
funcAppendToBuilder¶added inv1.4.10
funcApplySelect¶added inv1.5.1
ApplySelect applies select expressions to filter, rename, and reorder fields.Select syntax:
- "field" -> include field
- "-field" -> exclude field
- "field as new" -> rename field to new
- "*" -> include all fields
- "prefix*" -> include fields starting with prefix
- "*suffix" -> include fields ending with suffix
- "-prefix*" -> exclude fields starting with prefix
- "-*suffix" -> exclude fields ending with suffix
Field matching is case-insensitive.Returns new fields slice with selected/renamed fields in order specified.
funcAutoDecompress¶
AutoDecompress auto detects compression to decompress. Otherwise return same reader
funcCleanHeaderRow¶
CleanHeaderRow cleans the header row from incompatible characters
funcCloseQueues¶added inv1.4.23
func CloseQueues()
funcColumnsToArrowSchema¶added inv1.4.10
funcCompareColumns¶
CompareColumns compared two columns to see if there are similar
funcCreateDummyFields¶
CreateDummyFields creates dummy columns for csvs with no header row
funcDecodeJSONIfBase64¶added inv1.5.1
DecodeJSONIfBase64 detects if the json body is base64-encoded and decodes them
funcExtractPartitionTimeValue¶added inv1.3.6
ExtractPartitionTimeValue extracts from partition time value from the pathwith mask `data/{YYYY}/{MM}/{DD}` and path `data/2024/12/21`, the returned timestamp should be 2024-12-21.
funcFormatValue¶added inv1.2.21
func FormatValue(valany, columnTypeColumnType, connTypedbio.Type) (newValstring)
FormatValue format as sql expression (adds quotes)
funcGeneratePartURIsFromRange¶added inv1.3.6
func GeneratePartURIsFromRange(mask, updateKeystring, start, endtime.Time) (uris []string, errerror)
GeneratePartURIsFromRange generates all the possible URIsgiven a start/end range. Must first determine the lowest time resolution
funcGetISO8601DateMap¶
GetISO8601DateMap return a map of date parts for string formatting
funcGetLowestPartTimeUnit¶added inv1.3.6
GetLowestPartTimeUnit loops though possible TimeLevel or PartitionLevel valuesand returns the lowest time.Duration unit
funcGetPartitionDateMap¶added inv1.3.6
funcGetValueFromArrowArray¶added inv1.4.10
GetValueFromArrowArray extracts a value from an arrow array at the given index
funcIso8601ToGoLayout¶
https://www.w3.org/QA/Tips/iso-datehttps://www.w3.org/TR/NOTE-datetimehttps://www.iso.org/iso-8601-date-and-time-format.html
funcMakeAwsConfig¶added inv1.4.11
funcMakeDecNumScale¶added inv1.1.6
funcMakeRowsChan¶
func MakeRowsChan() chan []any
MakeRowsChan returns a buffered channel with default size
funcMatchedPartitionMask¶added inv1.3.6
MatchedPartitionMask determines if the mask and the path have the samepartition structure
funcNewJSONStream¶
func NewJSONStream(ds *Datastream, decoder decoderLike, flattenint, jmespathstring) *jsonStream
funcNewXMLDecoder¶added inv1.4.20
funcOpenTunnelSSH¶added inv1.2.15
funcScanCarrRet¶
ScanCarrRet removes the \r runes that are without \n rightafter
funcSetSampleSize¶added inv1.4.21
func SetSampleSize()
funcStringToDecimalByteArray¶added inv1.1.6
funcStripSQLComments¶added inv1.4.6
StripSQLComments removes all SQL comments (-- or /* */) from the provided SQL string
Types¶
typeArrowReader¶added inv1.4.10
type ArrowReader struct {PathstringIpcReader *ipc.ReaderIpcFileReader *ipc.FileReaderData *DatasetContext *g.ContextMemorymemory.Allocator// contains filtered or unexported fields}ArrowReader is a arrow reader object using arrow v18
funcNewArrowFileReader¶added inv1.4.17
func NewArrowFileReader(reader *os.File, selected []string) (a *ArrowReader, errerror)
funcNewArrowReader¶added inv1.4.10
func NewArrowReader(readerio.Reader, selected []string) (a *ArrowReader, errerror)
func (*ArrowReader)Columns¶added inv1.4.10
func (a *ArrowReader) Columns()Columns
typeArrowWriter¶added inv1.4.10
type ArrowWriter struct {Writer *ipc.FileWriter// contains filtered or unexported fields}ArrowWriter is an arrow writer object using arrow v18
funcNewArrowWriter¶added inv1.4.10
func NewArrowWriter(wio.Writer, columnsColumns) (a *ArrowWriter, errerror)
func (*ArrowWriter)Close¶added inv1.4.10
func (a *ArrowWriter) Close()error
func (*ArrowWriter)Columns¶added inv1.4.10
func (a *ArrowWriter) Columns()Columns
func (*ArrowWriter)WriteRow¶added inv1.4.10
func (a *ArrowWriter) WriteRow(row []any)error
typeAvro¶
type Avro struct {PathstringReader *goavro.OCFReaderData *Dataset// contains filtered or unexported fields}Avro is a avro` object
funcNewAvroStream¶
func NewAvroStream(readerio.ReadSeeker, columnsColumns) (a *Avro, errerror)
typeBatch¶
type Batch struct {ColumnsColumnsRows chan []anyPrevious *BatchCountint64Limitint64// contains filtered or unexported fields}func (*Batch)AddTransform¶
func (*Batch)ColumnsChanged¶
func (*Batch)Ds¶
func (b *Batch) Ds() *Datastream
typeCSV¶
type CSV struct {PathstringNoHeaderboolDelimiterstringEscapestringQuotestringFieldsPerRecordintColumns []ColumnFile *os.FileDataDatasetReaderio.ReaderConfig map[string]stringNoDebugbool// contains filtered or unexported fields}CSV is a csv object
func (*CSV)ReadStream¶
func (c *CSV) ReadStream() (ds *Datastream, errerror)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV)ReadStreamContext¶added inv1.1.15
func (c *CSV) ReadStreamContext(ctxcontext.Context) (ds *Datastream, errerror)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV)WriteStream¶
func (c *CSV) WriteStream(ds *Datastream) (cntuint64, errerror)
WriteStream to CSV file
typeColumn¶
type Column struct {Positionint `json:"position"`Namestring `json:"name"`TypeColumnType `json:"type"`DbTypestring `json:"db_type,omitempty"`DbPrecisionint `json:"db_precision,omitempty"`DbScaleint `json:"db_scale,omitempty"`Sourcedbool `json:"-"`// whether col was sourced/inferred from a typed sourceStatsColumnStats `json:"stats,omitempty"`Tablestring `json:"table,omitempty"`Schemastring `json:"schema,omitempty"`Databasestring `json:"database,omitempty"`Descriptionstring `json:"description,omitempty"`FileURIstring `json:"file_uri,omitempty"`Constraint *ColumnConstraint `json:"constraint,omitempty"`Metadata map[string]string `json:"metadata,omitempty"`// contains filtered or unexported fields}Column represents a schemata column
funcInferFromStats¶
InferFromStats using the stats to infer data types
func (*Column)EvaluateConstraint¶added inv1.2.16
func (col *Column) EvaluateConstraint(valueany, sp *StreamProcessor) (errerror)
EvaluateConstraint evaluates a value against the constraint function
func (*Column)GetNativeType¶added inv1.2.16
GetNativeType returns the native column type from generic
func (*Column)HasNullsPlus1¶added inv1.1.7
HasNullsPlus1 denotes when a column is all nulls plus 1 non-null
func (*Column)IsDatetime¶
IsDatetime returns whether the column is a datetime object
func (*Column)SetConstraint¶added inv1.2.16
func (col *Column) SetConstraint()
func (*Column)SetLengthPrecisionScale¶
func (col *Column) SetLengthPrecisionScale()
SetLengthPrecisionScale parse length, precision, scale
func (*Column)SetMetadata¶
typeColumnCasing¶added inv1.2.22
type ColumnCasingstring
ColumnCasing is the casing method to use
const (// seehttps://github.com/slingdata-io/sling-cli/issues/538NormalizeColumnCasingColumnCasing = "normalize"// normalize to target, leaves mixed cases columns as itSourceColumnCasingColumnCasing = "source"// keeps source column name casing. The default.TargetColumnCasingColumnCasing = "target"// converts casing according to target database. Lower-case for files.SnakeColumnCasingColumnCasing = "snake"// converts snake casing according to target database. Lower-case for files.UpperColumnCasingColumnCasing = "upper"// make it upper caseLowerColumnCasingColumnCasing = "lower"// make it lower caseCamelColumnCasingColumnCasing = "camel"// converts to camelCase)
func (*ColumnCasing)Apply¶added inv1.2.22
func (cc *ColumnCasing) Apply(namestring, tgtConnTypedbio.Type)string
Apply applies column casing to provided name.If cc is nil or SourceColumnCasing, it returns the original value
func (*ColumnCasing)ApplyColumns¶added inv1.4.25
func (cc *ColumnCasing) ApplyColumns(colsColumns, tgtTypedbio.Type) (newColsColumns)
IsEmpty return true if nil or blank
func (*ColumnCasing)Equals¶added inv1.2.22
func (cc *ColumnCasing) Equals(valColumnCasing)bool
Equals evaluates equality for column casing (pointer safe)
func (*ColumnCasing)IsEmpty¶added inv1.2.22
func (cc *ColumnCasing) IsEmpty()bool
IsEmpty return true if nil or blank
typeColumnConstraint¶added inv1.2.16
type ColumnConstraint struct {Expressionstring `json:"expression,omitempty"`Errors []string `json:"errors,omitempty"`FailCntuint64 `json:"fail_cnt,omitempty"`EvalFuncConstraintEvalFunc `json:"-"`}typeColumnStats¶
type ColumnStats struct {MinLenint `json:"min_len,omitempty"`MaxLenint `json:"max_len,omitempty"`MaxDecLenint `json:"max_dec_len,omitempty"`Minint64 `json:"min"`Maxint64 `json:"max"`NullCntint64 `json:"null_cnt"`IntCntint64 `json:"int_cnt,omitempty"`DecCntint64 `json:"dec_cnt,omitempty"`BoolCntint64 `json:"bool_cnt,omitempty"`JsonCntint64 `json:"json_cnt,omitempty"`StringCntint64 `json:"string_cnt,omitempty"`DateCntint64 `json:"date_cnt,omitempty"`DateTimeCntint64 `json:"datetime_cnt,omitempty"`DateTimeZCntint64 `json:"datetimez_cnt,omitempty"`TotalCntint64 `json:"total_cnt"`UniqCntint64 `json:"uniq_cnt"`Checksumuint64 `json:"checksum"`LastValany `json:"-"`// last non-empty value. useful for state incremental}ColumnStats holds statistics for a column
func (*ColumnStats)DistinctPercent¶
func (cs *ColumnStats) DistinctPercent()float64
func (*ColumnStats)DuplicateCount¶
func (cs *ColumnStats) DuplicateCount()int64
func (*ColumnStats)DuplicatePercent¶
func (cs *ColumnStats) DuplicatePercent()float64
typeColumnType¶
type ColumnTypestring
const (BigIntTypeColumnType = "bigint"BinaryTypeColumnType = "binary"BoolTypeColumnType = "bool"DateTypeColumnType = "date"DatetimeTypeColumnType = "datetime"DecimalTypeColumnType = "decimal"IntegerTypeColumnType = "integer"JsonTypeColumnType = "json"SmallIntTypeColumnType = "smallint"StringTypeColumnType = "string"UUIDTypeColumnType = "uuid"TextTypeColumnType = "text"TimestampTypeColumnType = "timestamp"TimestampzTypeColumnType = "timestampz"FloatTypeColumnType = "float"TimeTypeColumnType = "time"TimezTypeColumnType = "timez")
funcNativeTypeToGeneral¶added inv1.2.19
func NativeTypeToGeneral(name, dbTypestring, connTypedbio.Type) (colTypeColumnType)
func (ColumnType)IsBinary¶added inv1.2.3
func (ctColumnType) IsBinary()bool
IsBinary returns whether the column is a binary
func (ColumnType)IsBool¶
func (ctColumnType) IsBool()bool
IsBool returns whether the column is a boolean
func (ColumnType)IsDate¶added inv1.1.8
func (ctColumnType) IsDate()bool
IsDatetime returns whether the column is a datetime object
func (ColumnType)IsDatetime¶
func (ctColumnType) IsDatetime()bool
IsDatetime returns whether the column is a datetime object
func (ColumnType)IsDecimal¶
func (ctColumnType) IsDecimal()bool
IsDecimal returns whether the column is a decimal
func (ColumnType)IsFloat¶added inv1.1.14
func (ctColumnType) IsFloat()bool
IsFloat returns whether the column is a float
func (ColumnType)IsInteger¶
func (ctColumnType) IsInteger()bool
IsInteger returns whether the column is an integer
func (ColumnType)IsNumber¶
func (ctColumnType) IsNumber()bool
IsNumber returns whether the column is a decimal or an integer
func (ColumnType)IsString¶
func (ctColumnType) IsString()bool
IsString returns whether the column is a string
func (ColumnType)IsTime¶added inv1.4.17
func (ctColumnType) IsTime()bool
IsTime returns whether the column is a time object
func (ColumnType)IsValid¶
func (ctColumnType) IsValid()bool
IsValid returns whether the column has a valid type
typeColumnTyping¶added inv1.4.5
type ColumnTyping struct {String *StringColumnTyping `json:"string,omitempty" yaml:"string,omitempty"`Decimal *DecimalColumnTyping `json:"decimal,omitempty" yaml:"decimal,omitempty"`JSON *JsonColumnTyping `json:"json,omitempty" yaml:"json,omitempty"`}ColumnTyping contains type-specific mapping configurations
func (*ColumnTyping)MaxDecimals¶added inv1.4.19
func (ct *ColumnTyping) MaxDecimals()int
typeColumns¶
type Columns []Column
Columns represent many columns
funcArrowSchemaToColumns¶added inv1.4.10
ArrowSchemaToColumns converts arrow schema to Columns
funcNewColumns¶
NewColumnsFromFields creates Columns from fields
funcNewColumnsFromFields¶
NewColumnsFromFields creates Columns from fields
func (Columns)Coerce¶
func (colsColumns) Coerce(castColsColumns, hasHeaderbool, casingColumnCasing, tgtTypedbio.Type) (newColsColumns)
Coerce casts columns into specified types
func (Columns)FieldMap¶
FieldMap return the fields map of indexeswhen `toLower` is true, field keys are lower cased
func (Columns)GetMissing¶added inv1.1.8
GetMissing returns the missing columns from newCols
func (Columns)IsDifferent¶
func (Columns)IsSimilarTo¶
IsSimilarTo returns true if has same number of columnsand contains the same columns, but may be in different order
func (Columns)MakeShaper¶
func (Columns)PrettyTable¶added inv1.1.8
PrettyTable returns a text pretty table
func (Columns)SetMetadata¶added inv1.2.15
SetMetadata sets metadata for columns
func (Columns)ValidateNames¶added inv1.4.5
ValidateNames truncates the column name it exceed the max column length
func (Columns)WithoutMeta¶added inv1.2.2
WithoutMeta returns the columns with metadata columns
typeCompressor¶
type Compressor interface {Self()CompressorCompress(io.Reader)io.ReaderDecompress(io.Reader) (io.Reader,error)Suffix()string}Compressor implements differnt kind of compression
funcNewCompressor¶
func NewCompressor(cpTypeCompressorType)Compressor
typeCompressorType¶
type CompressorTypestring
CompressorType is an int type for enum for the Compressor Type
const (// AutoCompressorType is for auto compressionAutoCompressorTypeCompressorType = "auto"// NoneCompressorType is for no compressionNoneCompressorTypeCompressorType = "none"// ZipCompressorType is for Zip compressionZipCompressorTypeCompressorType = "zip"// GzipCompressorType is for Gzip compressionGzipCompressorTypeCompressorType = "gzip"// SnappyCompressorType is for Snappy compressionSnappyCompressorTypeCompressorType = "snappy"// ZStandardCompressorType is for ZStandardZStandardCompressorTypeCompressorType = "zstd")
funcCompressorTypePtr¶
func CompressorTypePtr(vCompressorType) *CompressorType
CompressorTypePtr returns a pointer to the CompressorType value passed in.
func (CompressorType)Normalize¶added inv1.3.4
func (ctCompressorType) Normalize() *CompressorType
Normalize converts to lowercase
func (CompressorType)String¶added inv1.3.4
func (ctCompressorType) String()string
String converts to lowercase
typeConstraintEvalFunc¶added inv1.2.16
typeCsvDuckDb¶added inv1.2.19
funcNewCsvReaderDuckDb¶added inv1.2.19
func NewCsvReaderDuckDb(uristring, sc *StreamConfig, props ...string) (*CsvDuckDb,error)
func (*CsvDuckDb)MakeQuery¶added inv1.2.19
func (r *CsvDuckDb) MakeQuery(fscFileStreamConfig)string
typeDataflow¶
type Dataflow struct {ColumnsColumnsBuffer [][]interface{}StreamCh chan *DatastreamStreams []*DatastreamContext *g.ContextLimituint64EgressBytesuint64ReadyboolInferredboolFsURLstringOnColumnChanged func(colColumn)errorOnColumnAdded func(colColumn)errorStreamMap map[string]*DatastreamSchemaVersionint// for column type version// contains filtered or unexported fields}Dataflow is a collection of concurrent Datastreams
funcMakeDataFlow¶
func MakeDataFlow(dss ...*Datastream) (df *Dataflow, errerror)
MakeDataFlow create a dataflow from datastreams
funcNewDataflowContext¶added inv1.1.15
func (*Dataflow)AddColumns¶
func (df *Dataflow) AddColumns(newColsColumns, overwritebool, exceptDs ...string) (addedColumns, processOkbool)
SetColumns sets the columns
func (*Dataflow)AddEgressBytes¶added inv1.2.2
AddEgressBytes add egress bytes
func (*Dataflow)BufferDataset¶added inv1.2.25
BufferDataset return the buffer as a dataset
func (*Dataflow)ChangeColumn¶
func (df *Dataflow) ChangeColumn(iint, newTypeColumnType, exceptDs ...string)bool
SetColumns sets the columns
func (*Dataflow)CloseCurrentBatches¶
func (df *Dataflow) CloseCurrentBatches()
func (*Dataflow)Defer¶
func (df *Dataflow) Defer(f func())
Defer runs a given function as close of Dataflow
func (*Dataflow)DsTotalBytes¶
func (*Dataflow)MakeStreamCh¶
func (df *Dataflow) MakeStreamCh(forceMergebool) (streamCh chan *Datastream)
MakeStreamCh determines whether to merge all the streams into oneor keep them separate. If data is small per stream, it's best to mergeFor example, Bigquery has limits on number of operations can be called within a time limit
func (*Dataflow)MergeColumns¶added inv1.1.15
SetColumns sets the columns
func (*Dataflow)PropagateColum¶added inv1.4.24
PropagateColum propagates the dataflow column properties to underlying datastreams
func (*Dataflow)PushStreamChan¶
func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
func (*Dataflow)SetBatchLimit¶added inv1.2.11
SetBatchLimit set the ds.Batch.Limit
func (*Dataflow)SetConfig¶added inv1.1.15
func (df *Dataflow) SetConfig(cfgStreamConfig)
SetConfig set the Sp config
func (*Dataflow)SetEmpty¶
func (df *Dataflow) SetEmpty()
SetEmpty sets all underlying datastreams empty
func (*Dataflow)StreamConfig¶added inv1.2.15
func (df *Dataflow) StreamConfig() (cfgStreamConfig)
StreamConfig get the first Sp config
func (*Dataflow)SyncColumns¶
func (df *Dataflow) SyncColumns()
SyncColumns a workaround to synch the ds.Columns to the df.Columns
func (*Dataflow)SyncStats¶
func (df *Dataflow) SyncStats()
SyncStats sync stream processor stats aggregated to the df.Columns
func (*Dataflow)WaitClosed¶
func (df *Dataflow) WaitClosed()
WaitClosed waits until dataflow is closedhack to make sure all streams are pushed
typeDataset¶
type Dataset struct {Result *sqlx.Rows `json:"-"`ColumnsColumns `json:"columns"`Rows [][]any `json:"rows"`SQLstring `json:"sql"`Durationfloat64 `json:"duration"`Sp *StreamProcessor `json:"-"`Inferredbool `json:"inferred"`SafeInferencebool `json:"safe_inference"`NoDebugbool `json:"no_debug"`}Dataset is a query returned dataset
funcNewDatasetFromMap¶
NewDatasetFromMap return a new dataset
funcNewExcelDataset¶added inv1.2.2
func (*Dataset)AddColumns¶
SetColumns sets the columns
func (*Dataset)ColValuesStr¶
ColValuesStr returns the values of a one column as array or string
func (*Dataset)InferColumnTypes¶
func (data *Dataset) InferColumnTypes()
InferColumnTypes determines the columns types
func (*Dataset)PrettyTable¶added inv1.1.8
func (*Dataset)RecordsCasted¶
RecordsCasted return rows of maps or casted values
func (*Dataset)RecordsString¶
RecordsString return rows of maps or string values
func (*Dataset)Sort¶
Sort sorts by colsexample: `data.Sort(0, 2, 3, false)` will sortcol0, col2, col3 descendingexample: `data.Sort(0, 2, true)` will sortcol0, col2 ascending
func (*Dataset)Stream¶
func (data *Dataset) Stream(Props ...map[string]string) *Datastream
Stream returns a datastream of the dataset
typeDatastream¶
type Datastream struct {ColumnsColumnsBuffer [][]anyBatchChan chan *BatchBatches []*BatchCurrentBatch *BatchCountuint64Context *g.ContextReadyboolBytesatomic.Uint64Sp *StreamProcessorSafeInferenceboolNoDebugboolInferredboolIDstringMetadataMetadata// map of column name to metadata type// contains filtered or unexported fields}Datastream is a stream of rows
funcMergeDataflow¶
func MergeDataflow(df *Dataflow) (dsN *Datastream)
MergeDataflow merges the dataflow streams into one
funcNewDatastream¶
func NewDatastream(columnsColumns) (ds *Datastream)
NewDatastream return a new datastream
funcNewDatastreamContext¶
func NewDatastreamContext(ctxcontext.Context, columnsColumns) (ds *Datastream)
NewDatastreamContext return a new datastream
funcNewDatastreamIt¶
func NewDatastreamIt(ctxcontext.Context, columnsColumns, nextFunc func(it *Iterator)bool) (ds *Datastream)
NewDatastreamIt with it
funcReadCsvStream¶
func ReadCsvStream(pathstring) (ds *Datastream, errerror)
ReadCsvStream reads CSV and returns datasream
func (*Datastream)AddColumns¶
func (ds *Datastream) AddColumns(newColsColumns, overwritebool) (addedColumns)
SetColumns sets the columns
func (*Datastream)CastRowToString¶
func (ds *Datastream) CastRowToString(row []any) []string
CastRowToString returns the row as string casted
func (*Datastream)CastToStringSafeMask¶added inv1.2.14
func (ds *Datastream) CastToStringSafeMask(row []any) []string
CastToStringSafeMask returns the row as string mask casted ( evensafer)
func (*Datastream)ChangeColumn¶
func (ds *Datastream) ChangeColumn(iint, newTypeColumnType)
ChangeColumn applies a column type change
func (*Datastream)Chunk¶
func (ds *Datastream) Chunk(limituint64) (chDs chan *Datastream)
Chunk splits the datastream into chunk datastreams (in sequence)
func (*Datastream)Collect¶
func (ds *Datastream) Collect(limitint) (Dataset,error)
Collect reads a stream and return a datasetlimit of 0 is unlimited
func (*Datastream)ConsumeArrowReader¶added inv1.4.10
func (ds *Datastream) ConsumeArrowReader(readerio.Reader) (errerror)
func (*Datastream)ConsumeArrowReaderSeeker¶added inv1.4.10
func (ds *Datastream) ConsumeArrowReaderSeeker(reader *os.File) (errerror)
ConsumeArrowReaderSeeker uses the provided reader to stream rows
func (*Datastream)ConsumeArrowReaderStream¶added inv1.4.17
func (ds *Datastream) ConsumeArrowReaderStream(readerio.Reader) (errerror)
ConsumeArrowReaderStream uses the provided reader to stream rows
func (*Datastream)ConsumeAvroReader¶
func (ds *Datastream) ConsumeAvroReader(readerio.Reader) (errerror)
ConsumeAvroReader uses the provided reader to stream rows
func (*Datastream)ConsumeAvroReaderSeeker¶
func (ds *Datastream) ConsumeAvroReaderSeeker(readerio.ReadSeeker) (errerror)
ConsumeAvroReaderSeeker uses the provided reader to stream rows
func (*Datastream)ConsumeCsvReader¶
func (ds *Datastream) ConsumeCsvReader(readerio.Reader) (errerror)
ConsumeCsvReader uses the provided reader to stream rows
func (*Datastream)ConsumeCsvReaderChl¶added inv1.2.4
func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (errerror)
ConsumeCsvReaderChl reads a channel of readers. Should be safe to use withheader top row
func (*Datastream)ConsumeCsvReaderDuckDb¶added inv1.2.19
func (ds *Datastream) ConsumeCsvReaderDuckDb(uristring, scFileStreamConfig) (errerror)
ConsumeCsvReaderDuckDb uses the provided reader to stream rows
func (*Datastream)ConsumeDeltaReader¶added inv1.2.16
func (ds *Datastream) ConsumeDeltaReader(uristring, scFileStreamConfig) (errerror)
ConsumeDeltaReader uses the provided reader to stream rows
func (*Datastream)ConsumeExcelReader¶added inv1.2.2
ConsumeSASReader uses the provided reader to stream rows
func (*Datastream)ConsumeExcelReaderSeeker¶added inv1.2.2
func (ds *Datastream) ConsumeExcelReaderSeeker(readerio.ReadSeeker, props map[string]string) (errerror)
ConsumeSASReaderSeeker uses the provided reader to stream rows
func (*Datastream)ConsumeIcebergReader¶added inv1.2.16
func (ds *Datastream) ConsumeIcebergReader(uristring, scFileStreamConfig) (errerror)
ConsumeIcebergReader uses the provided reader to stream rows
func (*Datastream)ConsumeJsonReader¶
func (ds *Datastream) ConsumeJsonReader(readerio.Reader) (errerror)
ConsumeJsonReader uses the provided reader to stream JSONThis will put each JSON rec as one string valueso payload can be processed downstream
func (*Datastream)ConsumeJsonReaderChl¶added inv1.2.6
func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXMLbool) (errerror)
func (*Datastream)ConsumeParquetReader¶
func (ds *Datastream) ConsumeParquetReader(readerio.Reader) (errerror)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream)ConsumeParquetReaderDuckDb¶added inv1.2.16
func (ds *Datastream) ConsumeParquetReaderDuckDb(uristring, scFileStreamConfig) (errerror)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream)ConsumeParquetReaderSeeker¶
func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (errerror)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream)ConsumeSASReader¶
func (ds *Datastream) ConsumeSASReader(readerio.Reader) (errerror)
ConsumeSASReader uses the provided reader to stream rows
func (*Datastream)ConsumeSASReaderSeeker¶
func (ds *Datastream) ConsumeSASReaderSeeker(readerio.ReadSeeker) (errerror)
ConsumeSASReaderSeeker uses the provided reader to stream rows
func (*Datastream)ConsumeXmlReader¶
func (ds *Datastream) ConsumeXmlReader(readerio.Reader) (errerror)
ConsumeXmlReader uses the provided reader to stream XMLThis will put each XML rec as one string valueso payload can be processed downstream
func (*Datastream)Defer¶
func (ds *Datastream) Defer(f func())
Defer runs a given function as close of Datastream
func (*Datastream)Df¶
func (ds *Datastream) Df() *Dataflow
func (*Datastream)GetConfig¶
func (ds *Datastream) GetConfig() (configMap map[string]string)
GetConfig get config
func (*Datastream)GetFields¶
func (ds *Datastream) GetFields(args ...bool) []string
GetFields return the fields of the Data
func (*Datastream)LatestBatch¶
func (ds *Datastream) LatestBatch() *Batch
func (*Datastream)Limited¶added inv1.2.4
func (ds *Datastream) Limited(limit ...int)bool
func (*Datastream)Map¶
func (ds *Datastream) Map(newColumnsColumns, transf func([]any) []any) (nDs *Datastream)
Map applies the provided function to every rowand returns the result
func (*Datastream)MapParallel¶
func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkersint) (nDs *Datastream)
MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.
func (*Datastream)NewArrowReaderChnl¶added inv1.4.10
func (ds *Datastream) NewArrowReaderChnl(scStreamConfig) (readerChn chan *BatchReader)
NewArrowReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes
func (*Datastream)NewBatch¶
func (ds *Datastream) NewBatch(columnsColumns) *Batch
NewBatch create new batch with fixed columnsshould be used each time column type changes, or columns are added
func (*Datastream)NewCsvBufferReader¶
func (ds *Datastream) NewCsvBufferReader(scStreamConfig) *bytes.Reader
NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream)NewCsvBufferReaderChnl¶
func (ds *Datastream) NewCsvBufferReaderChnl(scStreamConfig) (readerChn chan *bytes.Reader)
NewCsvBufferReaderChnl provides a channel of readers as the limit is reacheddata is read in memory, whereas NewCsvReaderChnl does not hold in memory
func (*Datastream)NewCsvBytesChnl¶
func (ds *Datastream) NewCsvBytesChnl(scStreamConfig) (dataChn chan *[]byte)
NewCsvBytesChnl returns a channel yield chunk of bytes of csv
func (*Datastream)NewCsvReader¶
func (ds *Datastream) NewCsvReader(scStreamConfig) *io.PipeReader
NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream)NewCsvReaderChnl¶
func (ds *Datastream) NewCsvReaderChnl(scStreamConfig) (readerChn chan *BatchReader)
NewCsvReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes
func (*Datastream)NewExcelReaderChnl¶added inv1.2.2
func (ds *Datastream) NewExcelReaderChnl(scStreamConfig) (readerChn chan *BatchReader)
func (*Datastream)NewIterator¶
func (ds *Datastream) NewIterator(columnsColumns, nextFunc func(it *Iterator)bool) *Iterator
func (*Datastream)NewJsonLinesReaderChnl¶
func (ds *Datastream) NewJsonLinesReaderChnl(scStreamConfig) (readerChn chan *io.PipeReader)
NewJsonLinesReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes
func (*Datastream)NewJsonReaderChnl¶
func (ds *Datastream) NewJsonReaderChnl(scStreamConfig) (readerChn chan *io.PipeReader)
func (*Datastream)NewParquetArrowReaderChnl¶added inv1.1.7
func (ds *Datastream) NewParquetArrowReaderChnl(scStreamConfig) (readerChn chan *BatchReader)
NewParquetArrowReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumesWARN: Not using this one since it doesn't write Decimals properly.
func (*Datastream)NewParquetReaderChnl¶
func (ds *Datastream) NewParquetReaderChnl(scStreamConfig) (readerChn chan *BatchReader)
NewParquetReaderChnl provides a channel of readers as the limit is reachedeach channel flows as fast as the consumer consumes
func (*Datastream)Pause¶
func (ds *Datastream) Pause()
func (*Datastream)Records¶
func (ds *Datastream) Records() <-chan map[string]any
Records return rows of maps
func (*Datastream)Rows¶
func (ds *Datastream) Rows() chan []any
func (*Datastream)SetConfig¶
func (ds *Datastream) SetConfig(configMap map[string]string)
SetConfig sets the ds.config values
func (*Datastream)SetEmpty¶
func (ds *Datastream) SetEmpty()
SetEmpty sets the ds.Rows channel as empty
func (*Datastream)SetFields¶
func (ds *Datastream) SetFields(fields []string)
SetFields sets the fields/columns of the Datastream
func (*Datastream)SetFileURI¶added inv1.1.15
func (ds *Datastream) SetFileURI()
SetFileURI sets the FileURI of the columns of the Datastream
func (*Datastream)SetIterator¶added inv1.1.14
func (ds *Datastream) SetIterator(it *Iterator)
func (*Datastream)SetMetadata¶
func (ds *Datastream) SetMetadata(jsonStrstring)
func (*Datastream)Shape¶
func (ds *Datastream) Shape(columnsColumns) (nDs *Datastream, errerror)
Shape changes the column types as needed, to the provided columns varIt will cast the already wrongly casted rows, and not recast thecorrectly casted rows
func (*Datastream)Split¶
func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
Split splits the datastream into parallel datastreams
func (*Datastream)Start¶
func (ds *Datastream) Start() (errerror)
Start generates the streamShould cycle the Iter Func until done
func (*Datastream)TryPause¶
func (ds *Datastream) TryPause()bool
func (*Datastream)WaitClosed¶
func (ds *Datastream) WaitClosed()
WaitClosed waits until dataflow is closedhack to make sure all streams are pushed
func (*Datastream)WaitReady¶
func (ds *Datastream) WaitReady()error
WaitReady waits until datastream is ready
typeDecimalColumnTyping¶added inv1.4.5
type DecimalColumnTyping struct {MinPrecision *int `json:"min_precision,omitempty" yaml:"min_precision,omitempty"`// Total number of digitsMaxPrecisionint `json:"max_precision,omitempty" yaml:"max_precision,omitempty"`// Total number of digitsMinScale *int `json:"min_scale,omitempty" yaml:"min_scale,omitempty"`// Number of digits after decimal pointMaxScaleint `json:"max_scale,omitempty" yaml:"max_scale,omitempty"`CastAsstring `json:"cast_as,omitempty" yaml:"cast_as,omitempty"`}DecimalColumnTyping contains decimal type mapping configurations
func (*DecimalColumnTyping)Apply¶added inv1.4.5
func (dct *DecimalColumnTyping) Apply(colColumn) (precision, scaleint)
typeDeltaReader¶added inv1.2.16
funcNewDeltaReader¶added inv1.2.16
func NewDeltaReader(uristring, props ...string) (*DeltaReader,error)
func (*DeltaReader)Close¶added inv1.2.16
func (r *DeltaReader) Close()error
func (*DeltaReader)Columns¶added inv1.2.16
func (r *DeltaReader) Columns() (Columns,error)
func (*DeltaReader)MakeQuery¶added inv1.2.19
func (r *DeltaReader) MakeQuery(scFileStreamConfig)string
typeDuckDb¶added inv1.2.16
DuckDb is a Duck DB compute layer
funcNewDuckDb¶added inv1.2.16
NewDuckDb creates a new DuckDb instance with the given context and properties
func (*DuckDb)AddExtension¶added inv1.2.16
AddExtension adds an extension to the DuckDb instance if it's not already present
func (*DuckDb)AddSecret¶added inv1.4.11
func (duck *DuckDb) AddSecret(secretDuckDbSecret)
AddSecret registers a new secret in session (also adds needed extension)
func (*DuckDb)CheckExtension¶added inv1.4.18
CheckExtension checks if an extension is installed in DuckDB
func (*DuckDb)DataflowToHttpStream¶added inv1.3.6
func (duck *DuckDb) DataflowToHttpStream(df *Dataflow, scStreamConfig) (streamPartChn chanHttpStreamPart, errerror)
func (*DuckDb)DefaultCsvConfig¶added inv1.3.6
func (duck *DuckDb) DefaultCsvConfig() (configStreamConfig)
func (*DuckDb)EnsureBinDuckDB¶added inv1.4.23
EnsureBinDuckDB ensures duckdb binary existsif missing, downloads and uses
func (*DuckDb)ExecContext¶added inv1.2.16
func (duck *DuckDb) ExecContext(ctxcontext.Context, sqlstring, args ...any) (resultsql.Result, errerror)
ExecContext executes a SQL query with context and returns the result
func (*DuckDb)ExecMultiContext¶added inv1.2.16
func (duck *DuckDb) ExecMultiContext(ctxcontext.Context, sqls ...string) (resultsql.Result, errerror)
ExecMultiContext executes multiple SQL queries with context and returns the result
func (*DuckDb)GenerateCopyStatement¶added inv1.2.25
func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPathstring, optionsDuckDbCopyOptions) (sqlstring, errerror)
func (*DuckDb)GenerateCsvColumns¶added inv1.3.6
func (*DuckDb)GetScannerFunc¶added inv1.2.19
func (*DuckDb)MakeScanQuery¶added inv1.2.19
func (*DuckDb)PrepareFsSecretAndURI¶added inv1.2.16
PrepareFsSecretAndURI prepares the secret configuration from the fs_props and modifies the URI if necessaryfor different storage types (S3, Google Cloud Storage, Azure Blob Storage).It returns the modified URI string.
The function handles the following storage types:- Local files: Removes the "file://" prefix- S3: Configures AWS credentials and handles Cloudflare R2 storage- Google Cloud Storage: Sets up GCS credentials- Azure Blob Storage: Configures Azure connection string or account name
It uses the DuckDb instance's properties to populate the secret configuration.
func (*DuckDb)QueryContext¶added inv1.2.16
func (duck *DuckDb) QueryContext(ctxcontext.Context, sqlstring, options ...map[string]any) (dataDataset, errerror)
QueryContext runs a sql query with context, returns `Dataset`
func (*DuckDb)StreamContext¶added inv1.2.16
func (duck *DuckDb) StreamContext(ctxcontext.Context, sqlstring, options ...map[string]any) (ds *Datastream, errerror)
StreamContext runs a sql query with context, returns `Datastream`
typeDuckDbCopyOptions¶added inv1.2.25
type DuckDbCopyOptions struct {Formatdbio.FileTypeCompressionCompressorTypePartitionFields []PartitionLevel// part_year, part_month, part_day, etc.PartitionKeystringWritePartitionColsboolFileSizeBytesint64}typeDuckDbSecret¶added inv1.4.11
type DuckDbSecret struct {TypeDuckDbSecretType `json:"type"`Namestring `json:"name"`Props map[string]string `json:"props"`}funcNewDuckDbSecret¶added inv1.4.11
func NewDuckDbSecret(namestring, secretTypeDuckDbSecretType, props map[string]string)DuckDbSecret
func (*DuckDbSecret)AddProp¶added inv1.4.11
func (dds *DuckDbSecret) AddProp(key, valuestring)
func (*DuckDbSecret)Render¶added inv1.4.11
func (dds *DuckDbSecret) Render()string
typeDuckDbSecretType¶added inv1.4.11
type DuckDbSecretTypestring
const (DuckDbSecretTypeUnknownDuckDbSecretType = ""DuckDbSecretTypeS3DuckDbSecretType = "s3"DuckDbSecretTypeR2DuckDbSecretType = "r2"DuckDbSecretTypeGCSDuckDbSecretType = "gcs"DuckDbSecretTypeAzureDuckDbSecretType = "azure"DuckDbSecretTypeIcebergDuckDbSecretType = "iceberg")
typeEncoding¶added inv1.4.13
type Encodingstring
var (EncodingLatin1Encoding = "latin1"EncodingLatin5Encoding = "latin5"EncodingLatin9Encoding = "latin9"EncodingUtf8Encoding = "utf8"EncodingUtf8BomEncoding = "utf8_bom"EncodingUtf16Encoding = "utf16"EncodingWindows1250Encoding = "windows1250"EncodingWindows1252Encoding = "windows1252"Encodings = []Encoding{EncodingLatin1,EncodingLatin5,EncodingLatin9,EncodingUtf8,EncodingUtf8Bom,EncodingUtf16,EncodingWindows1250,EncodingWindows1252,})
func (Encoding)DecodeString¶added inv1.4.17
func (Encoding)EncodeString¶added inv1.4.17
typeEvaluator¶added inv1.4.14
type Evaluator struct {Eval *goval.EvaluatorState map[string]anyNoComputeKeystringVarPrefixes []stringKeepMissingExprbool// allows us to leave any missing sub-expression intactAllowNoPrefixboolIgnoreSyntaxErrbool}funcNewEvaluator¶added inv1.4.14
func (*Evaluator)ExtractVars¶added inv1.4.14
ExtractVars identifies variable references in a string expression,ignoring those inside double quotes. It can recognize patterns like env.VAR, state.VAR, secrets.VAR, and auth.VAR.When AllowNoPrefix is true, it also captures unprefixed variables like MY_VAR, some_value, etc.
func (*Evaluator)FillMissingKeys¶added inv1.4.25
func (*Evaluator)FindMatches¶added inv1.5.1
FindMatches parses the input string and extracts expressions within curly braces,properly handling nested brackets and quoted strings.Returns an error if brackets are unbalanced.
func (*Evaluator)RenderPayload¶added inv1.4.14
typeExcel¶added inv1.2.2
type Excel struct {File *excelize.FileSheets []stringPathstring// contains filtered or unexported fields}Excel represent an Excel object pointing to its file
funcNewExcelFromFile¶added inv1.2.2
NewExcelFromFile return a new Excel instance from a local file
funcNewExcelFromReader¶added inv1.2.2
NewExcelFromReader return a new Excel instance from a reader
func (*Excel)GetDataset¶added inv1.2.2
GetDataset returns a dataset of the provided sheet
func (*Excel)GetDatasetFromRange¶added inv1.2.2
GetDatasetFromRange returns a dataset of the provided sheet / rangecellRange example: `$AH$13:$AI$20` or `AH13:AI20` or `A:E`
func (*Excel)RefreshSheets¶added inv1.2.2
RefreshSheets refresh sheet index data
func (*Excel)TitleToNumber¶added inv1.4.24
TitleToNumber provides a function to convert Excel sheet column title toint (this function doesn't do value check currently). For example convertAK and ak to column title 36:
excelize.TitleToNumber("AK")excelize.TitleToNumber("ak")func (*Excel)WriteSheet¶added inv1.2.2
func (xls *Excel) WriteSheet(shtNamestring, ds *Datastream, modestring) (errerror)
WriteSheet write a datastream into a sheetmode can be: `new`, `append` or `overwrite`. Default is `new`
func (*Excel)WriteToFile¶added inv1.2.2
WriteToFile write to a file
typeFileStreamConfig¶added inv1.2.19
type FileStreamConfig struct {Limitint `json:"limit"`Select []string `json:"select"`SQLstring `json:"sql"`Formatdbio.FileType `json:"format"`IncrementalKeystring `json:"incremental_key"`IncrementalValuestring `json:"incremental_value"`FileSelect []string `json:"file_select"`// a list of files to include.DuckDBFilenamebool `json:"duckdb_filename"`// stream URLProps map[string]string `json:"props"`}func (*FileStreamConfig)GetProp¶added inv1.3.5
func (sc *FileStreamConfig) GetProp(keystring)string
func (*FileStreamConfig)SetProp¶added inv1.3.5
func (sc *FileStreamConfig) SetProp(key, valstring)
func (*FileStreamConfig)ShouldUseDuckDB¶added inv1.2.19
func (sc *FileStreamConfig) ShouldUseDuckDB()bool
typeGoogleSheet¶added inv1.2.2
type GoogleSheet struct {Sheets []stringSpreadsheetIDstring// contains filtered or unexported fields}GoogleSheet represent a Google Sheet object
funcNewGoogleSheet¶added inv1.2.2
func NewGoogleSheet(props ...string) (ggs *GoogleSheet, errerror)
NewGoogleSheet is a blank spreadsheettitle is the new spreadsheet title
funcNewGoogleSheetFromURL¶added inv1.2.2
func NewGoogleSheetFromURL(urlStrstring, props ...string) (ggs *GoogleSheet, errerror)
NewGoogleSheetFromURL return a new GoogleSheet instance from a provided url
func (*GoogleSheet)DeleteSheet¶added inv1.2.2
func (ggs *GoogleSheet) DeleteSheet(shtNamestring) (errerror)
func (*GoogleSheet)GetDataset¶added inv1.2.2
func (ggs *GoogleSheet) GetDataset(shtNamestring) (dataDataset, errerror)
GetDataset returns a dataset of the sheet
func (*GoogleSheet)GetDatasetFromRange¶added inv1.2.2
func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRangestring) (dataDataset, errerror)
GetDatasetFromRange returns a dataset from the specified range
func (*GoogleSheet)RefreshSheets¶added inv1.2.2
func (ggs *GoogleSheet) RefreshSheets() (errerror)
RefreshSheets refreshes sheets data
func (*GoogleSheet)URL¶added inv1.2.2
func (ggs *GoogleSheet) URL()string
func (*GoogleSheet)WriteSheet¶added inv1.2.2
func (ggs *GoogleSheet) WriteSheet(shtNamestring, ds *Datastream, modestring) (errerror)
WriteSheet write a datastream into a sheetmode can be: `new`, `append` or `overwrite`. Default is `new`
typeGzipCompressor¶
type GzipCompressor struct {Compressor// contains filtered or unexported fields}func (*GzipCompressor)Compress¶
func (cp *GzipCompressor) Compress(readerio.Reader)io.Reader
Compress uses gzip to compress
func (*GzipCompressor)Decompress¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*GzipCompressor)Suffix¶
func (cp *GzipCompressor) Suffix()string
typeHttpStreamPart¶added inv1.3.6
typeIcebergReader¶added inv1.2.16
funcNewIcebergReader¶added inv1.2.16
func NewIcebergReader(uristring, props ...string) (*IcebergReader,error)
func (*IcebergReader)Close¶added inv1.2.16
func (i *IcebergReader) Close()error
func (*IcebergReader)Columns¶added inv1.2.16
func (r *IcebergReader) Columns() (Columns,error)
func (*IcebergReader)MakeQuery¶added inv1.2.19
func (r *IcebergReader) MakeQuery(scFileStreamConfig)string
typeIterator¶
type Iterator struct {Row []anyReprocess chan []anyIsCastedboolRowIsCastedboolCounteruint64StreamRowNumuint64Context *g.ContextClosedbool// contains filtered or unexported fields}Iterator is the row provider for a datastream
func (*Iterator)BelowEqualIncrementalVal¶added inv1.2.10
BelowEqualIncrementalVal evaluates the incremental value against the incrementalColthis is used when the stream is a file with incremental mode(unable to filter at source like a database)it.incrementalVal and it.incrementalColI need to be set
func (*Iterator)Ds¶
func (it *Iterator) Ds() *Datastream
typeJsonColumnTyping¶added inv1.4.7
type JsonColumnTyping struct {AsTextbool `json:"as_text,omitempty" yaml:"as_text,omitempty"`}JsonColumnTyping contains json type mapping configurations
func (*JsonColumnTyping)Apply¶added inv1.4.7
func (jct *JsonColumnTyping) Apply(col *Column)
typeKeyType¶
type KeyTypestring
const (AggregateKeyKeyType = "aggregate"ClusterKeyKeyType = "cluster"DistributionKeyKeyType = "distribution"DuplicateKeyKeyType = "duplicate"HashKeyKeyType = "hash"IndexKeyKeyType = "index"PartitionKeyKeyType = "partition"PrimaryKeyKeyType = "primary"SortKeyKeyType = "sort"UniqueKeyKeyType = "unique"UpdateKeyKeyType = "update")
func (KeyType)MetadataKey¶added inv1.2.15
typeMetadata¶
typeNoneCompressor¶
type NoneCompressor struct {Compressor// contains filtered or unexported fields}func (*NoneCompressor)Decompress¶
func (*NoneCompressor)Suffix¶
func (cp *NoneCompressor) Suffix()string
typeParquet¶
type Parquet struct {PathstringReader *parquet.ReaderData *Dataset// contains filtered or unexported fields}Parquet is a parquet object
funcNewParquetReader¶added inv1.1.7
typeParquetArrowReader¶added inv1.1.6
type ParquetArrowReader struct {PathstringReader *pqarrow.FileReaderFile *os.FileData *DatasetContext *g.ContextMemorymemory.Allocator// contains filtered or unexported fields}ParquetArrowReader is a parquet reader object using arrow v18
funcNewParquetArrowReader¶added inv1.1.7
func NewParquetArrowReader(reader *os.File, selected []string) (p *ParquetArrowReader, errerror)
func (*ParquetArrowReader)Columns¶added inv1.1.6
func (p *ParquetArrowReader) Columns()Columns
typeParquetArrowWriter¶added inv1.1.6
type ParquetArrowWriter struct {Writer *pqarrow.FileWriter// contains filtered or unexported fields}funcNewParquetArrowWriter¶added inv1.1.6
func NewParquetArrowWriter(wio.Writer, columnsColumns, codeccompress.Compression) (p *ParquetArrowWriter, errerror)
func (*ParquetArrowWriter)Close¶added inv1.1.6
func (p *ParquetArrowWriter) Close()error
func (*ParquetArrowWriter)Columns¶added inv1.1.6
func (p *ParquetArrowWriter) Columns()Columns
func (*ParquetArrowWriter)WriteRow¶added inv1.1.6
func (p *ParquetArrowWriter) WriteRow(row []any)error
typeParquetDuckDb¶added inv1.2.16
funcNewParquetReaderDuckDb¶added inv1.2.16
func NewParquetReaderDuckDb(uristring, props ...string) (*ParquetDuckDb,error)
func (*ParquetDuckDb)Close¶added inv1.2.16
func (r *ParquetDuckDb) Close()error
func (*ParquetDuckDb)Columns¶added inv1.2.16
func (r *ParquetDuckDb) Columns() (Columns,error)
func (*ParquetDuckDb)MakeQuery¶added inv1.2.19
func (r *ParquetDuckDb) MakeQuery(scFileStreamConfig)string
typeParquetWriter¶added inv1.1.7
type ParquetWriter struct {Writer *parquet.WriterWriterMap *parquet.GenericWriter[map[string]any]// contains filtered or unexported fields}funcNewParquetWriter¶added inv1.1.7
funcNewParquetWriterMap¶added inv1.2.12
func (*ParquetWriter)Close¶added inv1.1.7
func (pw *ParquetWriter) Close()error
func (*ParquetWriter)WriteRec¶added inv1.2.12
func (pw *ParquetWriter) WriteRec(row []any)error
func (*ParquetWriter)WriteRow¶added inv1.1.7
func (pw *ParquetWriter) WriteRow(row []any)error
typePartitionLevel¶added inv1.3.4
type PartitionLevelstring
const (PartitionLevelSecondPartitionLevel = "second"PartitionLevelMinutePartitionLevel = "minute"PartitionLevelHourPartitionLevel = "hour"PartitionLevelDayPartitionLevel = "day"PartitionLevelWeekPartitionLevel = "week"PartitionLevelYearMonthPartitionLevel = "year_month"PartitionLevelMonthPartitionLevel = "month"PartitionLevelYearPartitionLevel = "year")
funcExtractPartitionFields¶added inv1.3.4
func ExtractPartitionFields(pathstring) (levels []PartitionLevel)
ExtractPartitionFields extract the partition fields from the given path
funcGetLowestPartTimeLevel¶added inv1.3.6
func GetLowestPartTimeLevel(urlstring) (PartitionLevel,error)
func (PartitionLevel)IsValid¶added inv1.3.4
func (levelPartitionLevel) IsValid()bool
func (PartitionLevel)Str¶added inv1.3.6
func (levelPartitionLevel) Str()string
func (PartitionLevel)TruncateTime¶added inv1.3.4
typeQueue¶added inv1.4.11
type Queue struct {Pathstring `json:"path"`File *os.File `json:"-"`Reader *bufio.Reader `json:"-"`Writer *bufio.Writer `json:"-"`// contains filtered or unexported fields}typeReaderReady¶added inv1.2.6
typeRecNode¶
type RecNode struct {// contains filtered or unexported fields}funcNewRecNode¶
func (*RecNode)Compression¶
typeSAS¶
type SAS struct {PathstringReader *datareader.SAS7BDATData *Dataset// contains filtered or unexported fields}SAS is a sas7bdat object
funcNewSASStream¶
func NewSASStream(readerio.ReadSeeker, columnsColumns) (s *SAS, errerror)
typeSSHClient¶
type SSHClient struct {HoststringPortintUserstringPasswordstringTgtHoststringTgtPortintPrivateKeystringPassphrasestringErrerror// contains filtered or unexported fields}SSHClient is a client to connect to a ssh serverwith the main goal of forwarding ports
func (*SSHClient)NewSession¶added inv1.5.1
NewSession creates a new SSH session
func (*SSHClient)OpenPortForward¶
OpenPortForward forwards the port as specified
func (*SSHClient)RunAsProcess¶
RunAsProcess uses a separate processenables to use public key authhttps://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key
typeSelector¶added inv1.5.1
Selector provides efficient field selection, exclusion, and renaming.Use NewSelector to build from select expressions, then Apply for each field.
funcNewSelector¶added inv1.5.1
func NewSelector(selectExprs []string, casingColumnCasing) *Selector
NewSelector creates a Selector from select expressions.All field matching is case-insensitive.
typeSnappyCompressor¶
type SnappyCompressor struct {Compressor// contains filtered or unexported fields}func (*SnappyCompressor)Compress¶
func (cp *SnappyCompressor) Compress(readerio.Reader)io.Reader
Compress uses gzip to compress
func (*SnappyCompressor)Decompress¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*SnappyCompressor)Suffix¶
func (cp *SnappyCompressor) Suffix()string
typeStreamConfig¶added inv1.1.15
type StreamConfig struct {EmptyAsNullbool `json:"empty_as_null"`Headerbool `json:"header"`CompressionCompressorType `json:"compression"`// AUTO | ZIP | GZIP | SNAPPY | NONENullIfstring `json:"null_if"`NullAsstring `json:"null_as"`DatetimeFormatstring `json:"datetime_format"`SkipBlankLinesbool `json:"skip_blank_lines"`Formatdbio.FileType `json:"format"`Delimiterstring `json:"delimiter"`Escapestring `json:"escape"`Quotestring `json:"quote"`FileMaxRowsint64 `json:"file_max_rows"`FileMaxBytesint64 `json:"file_max_bytes"`BatchLimitint64 `json:"batch_limit"`MaxDecimalsint `json:"max_decimals"`Flattenint `json:"flatten"`FieldsPerRecint `json:"fields_per_rec"`Jmespathstring `json:"jmespath"`Sheetstring `json:"sheet"`ColumnCasingColumnCasing `json:"column_casing"`TargetTypedbio.Type `json:"target_type"`DeleteFilebool `json:"delete"`// whether to delete before writingBoolAsIntbool `json:"-"`ColumnsColumns `json:"columns"`// list of column types. Can be partial list! likely is!TransformsTransformMap map[string]string `json:"-"`}funcDefaultStreamConfig¶added inv1.1.15
func DefaultStreamConfig()StreamConfig
funcLoaderStreamConfig¶added inv1.2.22
func LoaderStreamConfig(headerbool)StreamConfig
func (*StreamConfig)ToMap¶added inv1.2.22
func (sc *StreamConfig) ToMap() map[string]string
typeStreamProcessor¶
type StreamProcessor struct {Nuint64ConfigStreamConfig// contains filtered or unexported fields}StreamProcessor processes rows and values
funcNewStreamProcessor¶
func NewStreamProcessor() *StreamProcessor
NewStreamProcessor returns a new StreamProcessor
func (*StreamProcessor)CastRow¶
func (sp *StreamProcessor) CastRow(row []any, columnsColumns) []any
CastRow casts each value of a rowslows down processing about 40%?
func (*StreamProcessor)CastToBool¶added inv1.2.12
func (sp *StreamProcessor) CastToBool(iany) (bbool, errerror)
CastToBool converts interface to bool
func (*StreamProcessor)CastToString¶
func (sp *StreamProcessor) CastToString(valany) (valStringstring)
func (*StreamProcessor)CastToStringCSV¶added inv1.4.17
func (sp *StreamProcessor) CastToStringCSV(iint, valany, valType ...ColumnType)string
CastToStringCSV to string. used for csv writingslows processing down 5% with upstream CastRow or 35% without upstream CastRow
func (*StreamProcessor)CastToStringE¶added inv1.4.17
func (sp *StreamProcessor) CastToStringE(valany) (valStringstring, errerror)
func (*StreamProcessor)CastToStringSafeMask¶added inv1.2.14
func (sp *StreamProcessor) CastToStringSafeMask(iint, valany, valType ...ColumnType)string
CastToStringSafe to masks to count bytes (even safer)
func (*StreamProcessor)CastToTime¶
func (sp *StreamProcessor) CastToTime(iany) (ttime.Time, errerror)
CastToTime converts interface to time
func (*StreamProcessor)CastType¶
func (sp *StreamProcessor) CastType(valany, typColumnType)any
CastType casts the type of an interfaceCastType is used to cast the interface place holders?
func (*StreamProcessor)CastVal¶
func (sp *StreamProcessor) CastVal(iint, valany, col *Column)any
CastVal casts values with stats collectionwhich degrades performance by ~10%go test -benchmem -run='^$ github.com/slingdata-io/sling-cli/core/dbio/iop' -bench '^BenchmarkProcessVal'
func (*StreamProcessor)CastValWithoutStats¶
func (sp *StreamProcessor) CastValWithoutStats(iint, valany, typColumnType)any
CastValWithoutStats casts the value without counting stats
func (*StreamProcessor)CheckType¶added inv1.4.18
func (sp *StreamProcessor) CheckType(vany) (typColumnType)
CheckType returns the type of an interface
func (*StreamProcessor)ColStats¶added inv1.2.10
func (sp *StreamProcessor) ColStats() map[int]*ColumnStats
func (*StreamProcessor)CountDigits¶added inv1.4.20
func (sp *StreamProcessor) CountDigits(numberstring) (precision, scaleint)
func (*StreamProcessor)GetType¶
func (sp *StreamProcessor) GetType(valany) (typColumnType)
GetType returns the type of an interface
func (*StreamProcessor)ParseString¶
func (sp *StreamProcessor) ParseString(sstring, jj ...int) (valany)
ParseString return an interfacestring: "varchar"integer: "integer"decimal: "decimal"date: "date"datetime: "timestamp"timestamp: "timestamp"text: "text"
func (*StreamProcessor)ParseTime¶
func (sp *StreamProcessor) ParseTime(iany) (ttime.Time, errerror)
ParseTime parses a date string and returns time.Time
func (*StreamProcessor)ParseVal¶
func (sp *StreamProcessor) ParseVal(valany)any
ParseVal parses the value into its appropriate type
func (*StreamProcessor)ProcessRow¶
func (sp *StreamProcessor) ProcessRow(row []any) []any
ProcessRow processes a row
func (*StreamProcessor)ProcessVal¶
func (sp *StreamProcessor) ProcessVal(valany)any
ProcessVal processes a value
func (*StreamProcessor)ResetConfig¶added inv1.1.15
func (sp *StreamProcessor) ResetConfig()
func (*StreamProcessor)SetConfig¶
func (sp *StreamProcessor) SetConfig(configMap map[string]string)
SetConfig sets the data.Sp.config values
func (*StreamProcessor)TruncateDecimalString¶added inv1.4.20
func (sp *StreamProcessor) TruncateDecimalString(numberstring, decCountint) (newNumberstring)
TruncateDecimalString return up to specified scale, without converting
typeStringColumnTyping¶added inv1.4.5
type StringColumnTyping struct {LengthFactorint `json:"length_factor,omitempty" yaml:"length_factor,omitempty"`MinLengthint `json:"min_length,omitempty" yaml:"min_length,omitempty"`MaxLengthint `json:"max_length,omitempty" yaml:"max_length,omitempty"`UseMaxbool `json:"use_max,omitempty" yaml:"use_max,omitempty"`Notestring `json:"note,omitempty" yaml:"note,omitempty"`}StringColumnTyping contains string type mapping configurations
func (*StringColumnTyping)Apply¶added inv1.4.5
func (sct *StringColumnTyping) Apply(length, maxint) (newLengthint)
typeTimeLevel¶added inv1.3.6
type TimeLevelstring
const (TimeLevelYearTimeLevel = "YYYY"TimeLevelYearShortTimeLevel = "YY"TimeLevelMonthNameTimeLevel = "MMM"TimeLevelMonthTimeLevel = "MM"TimeLevelDayTimeLevel = "DD"TimeLevelDayOfYearTimeLevel = "DDD"TimeLevelHour24TimeLevel = "HH"TimeLevelHour12TimeLevel = "hh"TimeLevelMinuteTimeLevel = "mm"TimeLevelSecondTimeLevel = "ss")
funcExtractISO8601DateFields¶added inv1.3.6
ExtractISO8601DateFields return a list of found date fields
func (TimeLevel)AsPartitionLevel¶added inv1.3.6
func (tlTimeLevel) AsPartitionLevel()PartitionLevel
typeTransformLegacy¶added inv1.4.17
type TransformLegacy struct {NamestringFunc func(*StreamProcessor, ...any) (any,error)FuncString func(*StreamProcessor,string) (string,error)FuncTime func(*StreamProcessor, *time.Time)error}typeTransformLegacyList¶added inv1.4.17
type TransformLegacyList []TransformLegacy
func (TransformLegacyList)HasTransform¶added inv1.4.17
func (tlTransformLegacyList) HasTransform(tTransformLegacy)bool
typeTransformers¶added inv1.2.2
type Transformers struct {Accenttransform.TransformerDecodeUTF8transform.TransformerDecodeUTF8BOMtransform.TransformerDecodeUTF16transform.TransformerDecodeISO8859_1transform.TransformerDecodeISO8859_5transform.TransformerDecodeISO8859_15transform.TransformerDecodeWindows1250transform.TransformerDecodeWindows1252transform.TransformerEncodeUTF8transform.TransformerEncodeUTF8BOMtransform.TransformerEncodeUTF16transform.TransformerEncodeISO8859_1transform.TransformerEncodeISO8859_5transform.TransformerEncodeISO8859_15transform.TransformerEncodeWindows1250transform.TransformerEncodeWindows1252transform.Transformer}funcNewTransformers¶added inv1.2.2
func NewTransformers()Transformers
typeZStandardCompressor¶
type ZStandardCompressor struct {Compressor// contains filtered or unexported fields}func (*ZStandardCompressor)Compress¶
func (cp *ZStandardCompressor) Compress(readerio.Reader)io.Reader
Compress uses gzip to compress
func (*ZStandardCompressor)Decompress¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*ZStandardCompressor)Suffix¶
func (cp *ZStandardCompressor) Suffix()string
Source Files¶
- arrow.go
- avro.go
- aws.go
- compression.go
- csv.go
- csv_duckdb.go
- dataflow.go
- dataset.go
- datastream.go
- datastream_batch.go
- datatype.go
- delta.go
- duckdb.go
- iceberg.go
- json.go
- parquet.go
- parquet_arrow.go
- parquet_duckdb.go
- partition.go
- queue.go
- sas7bdat.go
- sheet.go
- sheet_excel.go
- sheet_google.go
- ssh.go
- stream_processor.go
- transforms.go
- xml.go