beam
packageThis package is not in the latest version of its module.
Details
Validgo.mod file
The Go module system was introduced in Go 1.11 and is the official dependency management solution for Go.
Redistributable license
Redistributable licenses place minimal restrictions on how software can be used, modified, and redistributed.
Tagged version
Modules with tagged versions give importers more predictable builds.
Stable version
When a project reaches major version v1 it is considered stable.
- Learn more about best practices
Repository
Links
Documentation¶
Overview¶
Package beam is an implementation of the Apache Beam (https://beam.apache.org)programming model in Go. Beam provides a simple, powerful model forbuilding both batch and streaming parallel data processing pipelines.
For more on the Beam model see:https://beam.apache.org/documentation/programming-guide
For design choices this implementation makes see:https://s.apache.org/beam-go-sdk-design-rfc
Example (GettingStarted)¶
package mainimport ("context""fmt""regexp""strings""github.com/apache/beam/sdks/v2/go/pkg/beam""github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio""github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct")func main() {// In order to start creating the pipeline for execution, a Pipeline object is needed.p := beam.NewPipeline()s := p.Root()// The pipeline object encapsulates all the data and steps in your processing task.// It is the basis for creating the pipeline's data sets as PCollections and its operations// as transforms.// The PCollection abstraction represents a potentially distributed,// multi-element data set. You can think of a PCollection as “pipeline” data;// Beam transforms use PCollection objects as inputs and outputs. As such, if// you want to work with data in your pipeline, it must be in the form of a// PCollection.// Transformations are applied in a scoped fashion to the pipeline. The scope// can be obtained from the pipeline object.// Start by reading text from an input files, and receiving a PCollection.lines := textio.Read(s, "protocol://path/file*.txt")// Transforms are added to the pipeline so they are part of the work to be// executed. Since this transform has no PCollection as an input, it is// considered a 'root transform'// A pipeline can have multiple root transformsmoreLines := textio.Read(s, "protocol://other/path/file*.txt")// Further transforms can be applied, creating an arbitrary, acyclic graph.// Subsequent transforms (and the intermediate PCollections they produce) are// attached to the same pipeline.all := beam.Flatten(s, lines, moreLines)wordRegexp := regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)words := beam.ParDo(s, func(line string, emit func(string)) {for _, word := range wordRegexp.FindAllString(line, -1) {emit(word)}}, all)formatted := beam.ParDo(s, strings.ToUpper, words)textio.Write(s, "protocol://output/path", formatted)// Applying a transform adds it to the pipeline, rather than executing it// immediately. Once the whole pipeline of transforms is constructed, the// pipeline can be executed by a PipelineRunner. The direct runner executes the// transforms directly, sequentially, in this one process, which is useful for// unit tests and simple experiments:if _, err := direct.Execute(context.Background(), p); err != nil {fmt.Printf("Pipeline failed: %v", err)}}Example (MetricsDeclaredAnywhere)¶
package mainimport ("context""regexp""github.com/apache/beam/sdks/v2/go/pkg/beam""github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics")func ctxWithPtransformID(id string) context.Context {ctx := context.Background()ctx = metrics.SetBundleID(ctx, "exampleBundle")ctx = metrics.SetPTransformID(ctx, id)return ctx}var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)func main() {// Metrics can be declared outside DoFns, and used inside..outside := beam.NewCounter("example.namespace", "count")extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {// They can be defined at time of use within a DoFn, if necessary.inside := beam.NewDistribution("example.namespace", "characters")for _, word := range wordRE.FindAllString(line, -1) {emit(word)outside.Inc(ctx, 1)inside.Update(ctx, int64(len(word)))}}ctx := ctxWithPtransformID("example")extractWordsDofn(ctx, "this has six words in it", func(string) {})extractWordsDofn(ctx, "this has seven words in it, see?", func(string) {})metrics.DumpToOutFromContext(ctx)}Output:PTransformID: "example"example.namespace.characters - count: 13 sum: 43 min: 2 max: 5example.namespace.count - value: 13
Example (MetricsReusable)¶
package mainimport ("context""regexp""github.com/apache/beam/sdks/v2/go/pkg/beam""github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics")// A beam_test global context var to improve how the examples look.var ctx = context.Background()var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)func main() {// Metric proxies can be used in multiple DoFnsc := beam.NewCounter("example.reusable", "count")extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {for _, word := range wordRE.FindAllString(line, -1) {emit(word)c.Inc(ctx, 1)}}extractRunesDofn := func(ctx context.Context, line string, emit func(rune)) {for _, r := range line {emit(r)c.Inc(ctx, 1)}}ctx = metrics.SetBundleID(ctx, "exampleBundle")extractWordsDofn(metrics.SetPTransformID(ctx, "extract1"), "this has six words in it", func(string) {})extractRunesDofn(metrics.SetPTransformID(ctx, "extract2"), "seven thousand", func(rune) {})metrics.DumpToOutFromContext(ctx)}Output:PTransformID: "extract1"example.reusable.count - value: 6PTransformID: "extract2"example.reusable.count - value: 14
Index¶
- Variables
- func CrossLanguage(s Scope, urn string, payload []byte, expansionAddr string, ...) map[string]PCollection
- func CrossLanguagePayload(pl any) []byte
- func ExternalTagged(s Scope, urn string, payload []byte, namedInputs map[string]PCollection, ...) map[string]PCollection
- func Init()
- func Initialized() bool
- func MustTaggedN(ret map[string]PCollection, err error) map[string]PCollection
- func NewPipelineWithRoot() (*Pipeline, Scope)
- func ParDo0(s Scope, dofn any, col PCollection, opts ...Option)
- func ParDo2(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection)
- func ParDo3(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection)
- func ParDo4(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection)
- func ParDo5(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection)
- func ParDo6(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection)
- func ParDo7(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, ...)
- func RegisterCoder(t reflect.Type, encoder, decoder any)
- func RegisterDoFn(dofn any)
- func RegisterFunction(fn any)
- func RegisterInit(hook func())
- func RegisterRunner(name string, fn func(ctx context.Context, p *Pipeline) (PipelineResult, error))
- func RegisterSchemaProvider(rt reflect.Type, provider any)
- func RegisterSchemaProviderWithURN(rt reflect.Type, provider any, urn string)
- func RegisterType(t reflect.Type)
- func TryCrossLanguage(s Scope, urn string, payload []byte, expansionAddr string, ...) (map[string]PCollection, error)
- func TryExternalTagged(s Scope, urn string, payload []byte, namedInputs map[string]PCollection, ...) (map[string]PCollection, error)
- func UnnamedInput(col PCollection) map[string]PCollection
- func UnnamedOutput(t FullType) map[string]FullType
- func UnnamedOutputTag() string
- func ValidateKVType(col PCollection) (typex.FullType, typex.FullType)
- func ValidateNonCompositeType(col PCollection) typex.FullType
- type BundleFinalization
- type Coder
- type Counter
- type Distribution
- type ElementDecoder
- type ElementEncoder
- type EncodedCoder
- type EncodedFunc
- type EncodedType
- type EventTime
- type FullType
- type Gauge
- type MetricResult
- type Option
- type PCollection
- func AddFixedKey(s Scope, col PCollection) PCollection
- func CoGroupByKey(s Scope, cols ...PCollection) PCollection
- func Combine(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
- func CombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) PCollection
- func Create(s Scope, values ...any) PCollection
- func CreateList(s Scope, list any) PCollection
- func DropKey(s Scope, col PCollection) PCollection
- func DropValue(s Scope, col PCollection) PCollection
- func Explode(s Scope, col PCollection) PCollection
- func External(s Scope, urn string, payload []byte, in []PCollection, out []FullType, ...) []PCollection
- func Flatten(s Scope, cols ...PCollection) PCollection
- func GroupByKey(s Scope, a PCollection) PCollection
- func Impulse(s Scope) PCollection
- func ImpulseValue(s Scope, value []byte) PCollection
- func Must(a PCollection, err error) PCollection
- func MustN(list []PCollection, err error) []PCollection
- func ParDo(s Scope, dofn any, col PCollection, opts ...Option) PCollection
- func ParDoN(s Scope, dofn any, col PCollection, opts ...Option) []PCollection
- func Partition(s Scope, n int, fn any, col PCollection) []PCollection
- func Reshuffle(s Scope, col PCollection) PCollection
- func Seq(s Scope, col PCollection, dofns ...any) PCollection
- func SwapKV(s Scope, col PCollection) PCollection
- func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error)
- func TryCombine(s Scope, combinefn any, col PCollection, opts ...Option) (PCollection, error)
- func TryCombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) (PCollection, error)
- func TryCreate(s Scope, values ...any) (PCollection, error)
- func TryCreateList(s Scope, list any) (PCollection, error)
- func TryExternal(s Scope, urn string, payload []byte, in []PCollection, out []FullType, ...) ([]PCollection, error)
- func TryFlatten(s Scope, cols ...PCollection) (PCollection, error)
- func TryGroupByKey(s Scope, a PCollection) (PCollection, error)
- func TryParDo(s Scope, dofn any, col PCollection, opts ...Option) ([]PCollection, error)
- func TryReshuffle(s Scope, col PCollection) (PCollection, error)
- func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts ...WindowIntoOption) (PCollection, error)
- func WindowInto(s Scope, ws *window.Fn, col PCollection, opts ...WindowIntoOption) PCollection
- type PaneInfo
- type Pipeline
- type PipelineResult
- type SchemaProvider
- type Scope
- type SideInput
- type T
- type TypeDefinition
- type U
- type V
- type W
- type Window
- type WindowIntoOption
- type X
- type Y
- type Z
Examples¶
Constants¶
This section is empty.
Variables¶
var (TType =typex.TTypeUType =typex.UTypeVType =typex.VTypeWType =typex.WTypeXType =typex.XTypeYType =typex.YTypeZType =typex.ZType)
These are the reflect.Type instances of the universal types, which are usedwhen binding actual types to "generic" DoFns that use Universal Types.
var EnableSchemasbool =trueEnableSchemas is a temporary configuration variableto use Beam Schema encoding by default instead of JSON.Before it is removed, it will be set to true by defaultand then eventually removed.
Only users who rely on default JSON marshalling behaviour should setthis explicitly, and file an issue on the BEAM repo so the issue maybe resolved.https://github.com/apache/beam/issues/new/choose
var EventTimeType =typex.EventTimeTypeEventTimeType is the reflect.Type of EventTime.
var PipelineOptions =runtime.GlobalOptionsPipelineOptions are global options for the active pipeline. Options canbe defined any time before execution and are re-created by the harness onremote execution workers. Global options should be used sparingly.
Functions¶
funcCrossLanguage¶
func CrossLanguage(sScope,urnstring,payload []byte,expansionAddrstring,namedInputs map[string]PCollection,namedOutputTypes map[string]FullType,) map[string]PCollection
CrossLanguage is a low-level transform for executing cross-language transforms written in otherSDKs. Because this is low-level, it is recommended to use one of the higher-level IO-specificwrappers where available. These can be found in the pkg/beam/io/xlang subdirectory.CrossLanguage is useful for executing cross-language transforms which do not have any existingIO wrappers.
Usage requires an address for an expansion service accessible during pipeline construction, aURN identifying the desired transform, an optional payload with configuration information, andinput and output names. It outputs a map of named output PCollections.
For more information on expansion services and other aspects of cross-language transforms ingeneral, refer to the Beam programming guide:https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
Payload¶
Payloads are configuration data that some cross-language transforms require for expansion.Consult the documentation of the transform in the source SDK to find out what payload data itrequires. If no payload is required, pass in nil.
CrossLanguage accepts payloads as a []byte containing an encoded ExternalConfigurationPayloadprotobuf. The helper function beam.CrossLanguagePayload is the recommended way to easily encodea standard Go struct for use as a payload.
Inputs and Outputs¶
Like most transforms, any input PCollections must be provided. Unlike most transforms, outputtypes must be provided because Go cannot infer output types from external transforms.
Inputs and outputs to a cross-language transform may be either named or unnamed. Namedinputs/outputs are used when there are more than one input/output, and are provided as maps withnames as keys. Unnamed inputs/outputs are used when there is only one, and a map can be quicklyconstructed with the UnnamedInput and UnnamedOutput methods.
An example of defining named inputs and outputs:
namedInputs := map[string]beam.PCollection{"pcol1": pcol1, "pcol2": pcol2}namedOutputTypes := map[string]typex.FullType{ "main": typex.New(reflectx.String), "side": typex.New(reflectx.Int64),}CrossLanguage outputs a map of PCollections with associated names. These names will match thosefrom provided named outputs. If the beam.UnnamedOutput method was used, the PCollection can beretrieved with beam.UnnamedOutputTag().
An example of retrieving named outputs from a call to CrossLanguage:
outputs := beam.CrossLanguage(...)mainPcol := outputs["main"]sidePcol := outputs["side"]
Example¶
This example shows using CrossLanguage to execute the Prefix cross-language transform using anexpansion service running on localhost:8099. Prefix requires a payload containing a prefix toprepend to every input string.
type prefixPayload struct { Data string}encodedPl := beam.CrossLanguagePayload(prefixPayload{Data: "foo"})urn := "beam:transforms:xlang:test:prefix"expansionAddr := "localhost:8099"outputType := beam.UnnamedOutput(typex.New(reflectx.String))input := beam.UnnamedInput(inputPcol)outs := beam.CrossLanguage(s, urn, encodedPl, expansionAddr, input, outputType)outPcol := outputs[beam.UnnamedOutputTag()]Alternative Expansion Handlers¶
The xlangx.RegisterHandler function can be used to register alternative expansionhandlers to a namespace, for use with this function. This allows for custom handlingof expansion addresses or starting up expansion services automatically beneath theCrossLanguage call.
In addition, urns can be bound to specific expansion addresses, usingxlangx.RegisterOverrideForUrn. This allows for testing specific overrides, or othercustom implementations to be used instead.
To ignore overrides regardless of URN, wrapping the expansion address ina call to xlangx.Require, will force expansion using the given address.
funcCrossLanguagePayload¶
CrossLanguagePayload encodes a native Go struct into a payload for cross-language transforms.payloads are []byte encoded ExternalConfigurationPayload protobufs. In order to fill thecontents of the protobuf, the provided struct will be used to converted to a row encodedrepresentation with an accompanying schema, so the input struct must be compatible with schemas.
Seehttps://beam.apache.org/documentation/programming-guide/#schemas for basic information onschemas, and pkg/beam/core/runtime/graphx/schema for details on schemas in the Go SDK.
Example:
type stringPayload struct { Data string}encodedPl := beam.CrossLanguagePayload(stringPayload{Data: "foo"})funcExternalTagged¶
func ExternalTagged(sScope,urnstring,payload []byte,namedInputs map[string]PCollection,namedOutputTypes map[string]FullType,boundedbool) map[string]PCollection
ExternalTagged defines an external PTransform, and allows re-specifying the tags for the inputand output PCollections. The interpretation of this primitive is runner specific.The runner is responsible for parsing the payload based on the URN provided to implementthe behavior of the operation. Transform libraries should expose an API that capturesthe user's intent and serialize the payload as a byte slice that the runner will deserialize.
Use ExternalTagged if the runner will need to associate the PTransforms local PCollection tagswith values in the payload. Otherwise, prefer External.
funcInit¶
func Init()
Init is the hook that all user code must call after flags processing andother static initialization, for now.
funcMustTaggedN¶
func MustTaggedN(ret map[string]PCollection, errerror) map[string]PCollection
MustTaggedN returns the input, but panics if err != nil.
funcNewPipelineWithRoot¶
NewPipelineWithRoot creates a new empty pipeline and its root scope.
funcParDo0¶
func ParDo0(sScope, dofnany, colPCollection, opts ...Option)
ParDo0 inserts a ParDo with zero output transform into the pipeline.
funcParDo2¶
func ParDo2(sScope, dofnany, colPCollection, opts ...Option) (PCollection,PCollection)
ParDo2 inserts a ParDo with 2 outputs into the pipeline.
funcParDo3¶
func ParDo3(sScope, dofnany, colPCollection, opts ...Option) (PCollection,PCollection,PCollection)
ParDo3 inserts a ParDo with 3 outputs into the pipeline.
funcParDo4¶
func ParDo4(sScope, dofnany, colPCollection, opts ...Option) (PCollection,PCollection,PCollection,PCollection)
ParDo4 inserts a ParDo with 4 outputs into the pipeline.
funcParDo5¶
func ParDo5(sScope, dofnany, colPCollection, opts ...Option) (PCollection,PCollection,PCollection,PCollection,PCollection)
ParDo5 inserts a ParDo with 5 outputs into the pipeline.
funcParDo6¶
func ParDo6(sScope, dofnany, colPCollection, opts ...Option) (PCollection,PCollection,PCollection,PCollection,PCollection,PCollection)
ParDo6 inserts a ParDo with 6 outputs into the pipeline.
funcParDo7¶
func ParDo7(sScope, dofnany, colPCollection, opts ...Option) (PCollection,PCollection,PCollection,PCollection,PCollection,PCollection,PCollection)
ParDo7 inserts a ParDo with 7 outputs into the pipeline.
funcRegisterCoder¶
RegisterCoder registers a user defined coder for a given type, and willbe used if there is no existing beam coder for that type.Must be called prior to beam.Init(), preferably in an init() function.
The coder used for a given type follows this ordering:
- Coders for Known Beam types.
- Coders registered for specific types
- Coders registered for interfaces types
- Default coder (JSON)
Coders for interface types are iterated over to check if a typesatisfies them, and the most recent one registered will be used.
Repeated registrations of the same type overrides prior ones.
RegisterCoder additionally registers the type, and coder functionsas per RegisterType and RegisterFunction to avoid redundant calls.
Supported Encoder Signatures
func(T) []bytefunc(reflect.Type, T) []bytefunc(T) ([]byte, error)func(reflect.Type, T) ([]byte, error)
Supported Decoder Signatures
func([]byte) Tfunc(reflect.Type, []byte) Tfunc([]byte) (T, error)func(reflect.Type, []byte) (T, error)
where T is the matching user type.
funcRegisterDoFn¶
func RegisterDoFn(dofnany)
RegisterDoFn is a convenience function to handle registering a DoFn and allrelated types. Use this instead of calling RegisterType or RegisterFunction.Like all the Register* functions, RegisterDoFn should be called in`init()` only.
In particular, it will call RegisterFunction for functional DoFns, andRegisterType for the parameter and return types for that function.StructuralDoFns will have RegisterType called for itself and the parameter andreturn types.
RegisterDoFn will panic if the argument type is not a DoFn.
Usage:
func init() { beam.RegisterDoFn(FunctionalDoFn) beam.RegisterDoFn(reflect.TypeOf((*StructuralDoFn)(nil)).Elem()) }funcRegisterFunction¶
func RegisterFunction(fnany)
RegisterFunction allows function registration. It is beneficial for performanceand is needed for functions -- such as custom coders -- serialized during unittests, where the underlying symbol table is not available. It should be calledin `init()` only.
funcRegisterInit¶
func RegisterInit(hook func())
RegisterInit registers an Init hook. Hooks are expected to be able tofigure out whether they apply on their own, notably if invoked in a remoteexecution environment. They are all executed regardless of the runner.
funcRegisterRunner¶
RegisterRunner associates the name with the supplied runner, making it availableto execute a pipeline via Run.
funcRegisterSchemaProvider¶
RegisterSchemaProvider allows pipeline authors to provide special handlingto convert types to schema representations, when those types are used asfields in types being encoded as schema rows.
At present, the only supported provider interface is SchemaProvider,though this may change in the future.
Providers only need to support a limited set of types for conversion,specifically a single struct type or a pointer to struct type,or an interface type, which they are registered with.
Providers have three tasks with respect to a given supported logical type:
- Producing schema representative types for their logical types.
- Producing schema encoders for values of that type, writing beamschema encoded bytes for a value, matching the schema representative type.
- Producing schema decoders for values of that type, reading beamschema encoded bytes, and producing a value of that type.
Representative Schema types must be structs with only exported fields.
A provider should be thread safe, but it's not required that a producedencoder or decoder is thread safe, since a separate encoder or decoderwill be used for simultaneously executed bundles.
If the supported type is an interface, that interface must have a non-emptymethod set. That is, it cannot be the empty interface.
RegisterSchemaProvider must be called before beam.Init(), and conventionallyis called in a package init() function.
Example¶
// Licensed to the Apache Software Foundation (ASF) under one or more// contributor license agreements. See the NOTICE file distributed with// this work for additional information regarding copyright ownership.// The ASF licenses this file to You under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.package mainimport ("bytes""fmt""io""reflect""github.com/apache/beam/sdks/v2/go/pkg/beam""github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder""github.com/google/go-cmp/cmp")// RegisterSchemaProvider must be called before beam.Init, and conventionally in a package init block.func init() {beam.RegisterSchemaProvider(reflect.TypeOf((*Alphabet)(nil)).Elem(), &AlphabetProvider{})// TODO(BEAM-9615): Registerying a self encoding type causes a cycle. Needs resolving.// beam.RegisterType(reflect.TypeOf((*Cyrillic)(nil)))beam.RegisterType(reflect.TypeOf((*Latin)(nil)))beam.RegisterType(reflect.TypeOf((*Ελληνικά)(nil)))}type Alphabet interface {alphabet() string}type Cyrillic struct {A, B int}func (*Cyrillic) alphabet() string {return "Cyrillic"}type Latin struct {// Unexported fields are not serializable by beam schemas by default// so we need to handle this ourselves.c uint64d *float32}func (*Latin) alphabet() string {return "Latin"}type Ελληνικά struct {q stringG func() string}func (*Ελληνικά) alphabet() string {return "Ελληνικά"}// AlphabetProvider provides encodings for types that implement the Alphabet interface.type AlphabetProvider struct {enc *coder.RowEncoderBuilderdec *coder.RowDecoderBuilder}var (typeCyrillic = reflect.TypeOf((*Cyrillic)(nil))typeLatin = reflect.TypeOf((*Latin)(nil))typeΕλληνικά = reflect.TypeOf((*Ελληνικά)(nil)))func (p *AlphabetProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) {// FromLogicalType produces schema representative types, which match the encoders// and decoders that this function generates for this type.// While this example uses statically assigned schema representative types, it's// possible to generate the returned reflect.Type dynamically instead, using the// reflect package.switch rt {// The Cyrillic type is able to be encoded by default, so we simply use it directly// as it's own representative type.case typeCyrillic:return typeCyrillic, nilcase typeLatin:// The Latin type only has unexported fields, so we need to make the equivalent// have exported fields.return reflect.TypeOf((*struct {C uint64D *float32})(nil)).Elem(), nilcase typeΕλληνικά:return reflect.TypeOf((*struct{ Q string })(nil)).Elem(), nil}return nil, fmt.Errorf("Unknown Alphabet: %v", rt)}// BuildEncoder returns beam schema encoder functions for types with the Alphabet interface.func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(any, io.Writer) error, error) {switch rt {case typeCyrillic:if p.enc == nil {p.enc = &coder.RowEncoderBuilder{}}// Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type.return p.enc.Build(rt)case typeLatin:return func(iface any, w io.Writer) error {v := iface.(*Latin)// Beam Schema Rows have a header that indicates which fields if any, are nil.if err := coder.WriteRowHeader(2, func(i int) bool {if i == 1 {return v.d == nil}return false}, w); err != nil {return err}// Afterwards, each field is encoded using the appropriate helper.if err := coder.EncodeVarUint64(v.c, w); err != nil {return err}// Nil fields have nothing written for them other than the header.if v.d != nil {if err := coder.EncodeDouble(float64(*v.d), w); err != nil {return err}}return nil}, nilcase typeΕλληνικά:return func(iface any, w io.Writer) error {// Since the representation for Ελληνικά never has nil fields// we can use the simple header helper.if err := coder.WriteSimpleRowHeader(1, w); err != nil {return err}v := iface.(*Ελληνικά)if err := coder.EncodeStringUTF8(v.q, w); err != nil {return fmt.Errorf("decoding string field A: %v", err)}return nil}, nil}return nil, fmt.Errorf("Unknown Alphabet: %v", rt)}// BuildDecoder returns beam schema decoder functions for types with the Alphabet interface.func (p *AlphabetProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (any, error), error) {switch rt {case typeCyrillic:if p.dec == nil {p.dec = &coder.RowDecoderBuilder{}}// Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type.return p.dec.Build(rt)case typeLatin:return func(r io.Reader) (any, error) {// Since the d field can be nil, we use the header get the nil bits.n, nils, err := coder.ReadRowHeader(r)if err != nil {return nil, err}// Header returns the number of fields, so we check if it has what we// expect. This allows schemas to evolve if necessary.if n != 2 {return nil, fmt.Errorf("expected 2 fields, but got %v", n)}c, err := coder.DecodeVarUint64(r)if err != nil {return nil, err}// Check if the field is nil before trying to decode a value for it.var d *float32if !coder.IsFieldNil(nils, 1) {f, err := coder.DecodeDouble(r)if err != nil {return nil, err}f32 := float32(f)d = &f32}return &Latin{c: c,d: d,}, nil}, nilcase typeΕλληνικά:return func(r io.Reader) (any, error) {// Since the representation for Ελληνικά never has nil fields// we can use the simple header helper. Returns an error if// something unexpected occurs.if err := coder.ReadSimpleRowHeader(1, r); err != nil {return nil, err}q, err := coder.DecodeStringUTF8(r)if err != nil {return nil, fmt.Errorf("decoding string field A: %v", err)}return &Ελληνικά{q: q,}, nil}, nil}return nil, nil}// Schema providers work on fields of schema encoded types.type translation struct {C *CyrillicL *LatinE *Ελληνικά}func main() {f := float32(42.789)want := translation{C: &Cyrillic{A: 123, B: 456},L: &Latin{c: 789, d: &f},E: &Ελληνικά{q: "testing"},}rt := reflect.TypeOf((*translation)(nil)).Elem()enc, err := coder.RowEncoderForStruct(rt)if err != nil {panic(err)}dec, err := coder.RowDecoderForStruct(rt)if err != nil {panic(err)}var buf bytes.Bufferif err := enc(want, &buf); err != nil {panic(err)}got, err := dec(&buf)if err != nil {panic(err)}if d := cmp.Diff(want, got,cmp.AllowUnexported(Latin{}, Ελληνικά{})); d != "" {fmt.Printf("diff in schema encoding translation: (-want,+got)\n%v\n", d)} else {fmt.Println("No diffs!")}}Output:No diffs!
funcRegisterSchemaProviderWithURN¶added inv2.43.0
RegisterSchemaProviderWithURN is for internal use only. Users are recommended to usebeam.RegisterSchemaProvider() instead.RegisterSchemaProviderWithURN registers a new schema provider for a new logical type definedin pkg/beam/model/pipeline_v1/schema.pb.go
RegisterSchemaProviderWithURN must be called before beam.Init(), and conventionallyis called in a package init() function.
funcRegisterType¶
RegisterType inserts "external" types into a global type registry to bypassserialization and preserve full method information. It should be called in`init()` only.TODO(wcn): the canonical definition of "external" is in v1.proto. We need userfacing copy for this important concept.
funcTryCrossLanguage¶
func TryCrossLanguage(sScope,urnstring,payload []byte,expansionAddrstring,namedInputs map[string]PCollection,namedOutputTypes map[string]FullType,) (map[string]PCollection,error)
TryCrossLanguage coordinates the core functions required to execute the cross-language transform.See CrossLanguage for user documentation.
funcTryExternalTagged¶
func TryExternalTagged(sScope,urnstring,payload []byte,namedInputs map[string]PCollection,namedOutputTypes map[string]FullType,boundedbool) (map[string]PCollection,error)
TryExternalTagged attempts to perform the work of ExternalTagged, returning an errorindicating why the operation failed.
funcUnnamedInput¶
func UnnamedInput(colPCollection) map[string]PCollection
UnnamedInput is a helper function for passing single unnamed inputs tobeam.CrossLanguage.
Example:
beam.CrossLanguage(s, urn, payload, addr, UnnamedInput(input), outputs)
funcUnnamedOutput¶
UnnamedOutput is a helper function for passing single unnamed output types tobeam.CrossLanguage. The associated output can be accessed with beam.UnnamedOutputTag.
Example:
resultMap := beam.CrossLanguage(s, urn, payload, addr, inputs, UnnamedOutput(output));result := resultMap[beam.UnnamedOutputTag()]
funcUnnamedOutputTag¶added inv2.34.0
func UnnamedOutputTag()string
UnnamedOutputTag provides the output tag used for an output passed to beam.UnnamedOutput.Needed to retrieve the unnamed output PCollection from the result of beam.CrossLanguage.
funcValidateKVType¶
func ValidateKVType(colPCollection) (typex.FullType,typex.FullType)
ValidateKVType panics if the type of the PCollection is not KV<A,B>.It returns (A,B).
funcValidateNonCompositeType¶
func ValidateNonCompositeType(colPCollection)typex.FullType
ValidateNonCompositeType panics if the type of the PCollection is not acomposite type. It returns the type.
Types¶
typeBundleFinalization¶added inv2.39.0
type BundleFinalization =typex.BundleFinalization
BundleFinalization represents the parameter used to register callbacks tobe run once the runner has durably persisted output for a bundle.See typex.BundleFinalization for more details.
typeCoder¶
type Coder struct {// contains filtered or unexported fields}Coder defines how to encode and decode values of type 'A' into byte streams.Coders are attached to PCollections of the same type. For PCollectionsconsumed by GBK, the attached coders are required to be deterministic.
funcDecodeCoder¶
DecodeCoder decodes a coder. Any custom coder function symbol must beresolvable via the runtime.GlobalSymbolResolver. The types must be encodable.
typeCounter¶
Counter is a metric that can be incremented and decremented,and is aggregated by the sum.
Counters are safe to use in multiple bundles simultaneously, butnot generally threadsafe. Your DoFn needs to manage the threadsafety of Beam metrics for any additional concurrency it uses.
funcNewCounter¶
NewCounter returns the Counter with the given namespace and name.
func (Counter)Dec¶
Dec decrements the counter within by the given amount. The context must beprovided by the framework, or the value will not be recorded.
Example¶
package mainimport ("context""github.com/apache/beam/sdks/v2/go/pkg/beam")// A beam_test global context var to improve how the examples look.var ctx = context.Background()func main() {c := beam.NewCounter("example", "size")c.Dec(ctx, int64(len("foobar")))}func (Counter)Inc¶
Inc increments the counter within by the given amount. The context must beprovided by the framework, or the value will not be recorded.
Example¶
package mainimport ("context""github.com/apache/beam/sdks/v2/go/pkg/beam")// A beam_test global context var to improve how the examples look.var ctx = context.Background()func main() {c := beam.NewCounter("example", "size")c.Inc(ctx, int64(len("foobar")))}typeDistribution¶
type Distribution struct {*metrics.Distribution}Distribution is a metric that records various statistics about the distributionof reported values.
Distributions are safe to use in multiple bundles simultaneously, butnot generally threadsafe. Your DoFn needs to manage the threadsafety of Beam metrics for any additional concurrency it uses.
funcNewDistribution¶
func NewDistribution(namespace, namestring)Distribution
NewDistribution returns the Distribution with the given namespace and name.
func (Distribution)Update¶
func (cDistribution) Update(ctxcontext.Context, vint64)
Update adds an observation to this distribution. The context must beprovided by the framework, or the value will not be recorded.
Example¶
package mainimport ("context""time""github.com/apache/beam/sdks/v2/go/pkg/beam")// A beam_test global context var to improve how the examples look.var ctx = context.Background()func main() {t := time.Millisecond * 42d := beam.NewDistribution("example", "latency_micros")d.Update(ctx, int64(t/time.Microsecond))}typeElementDecoder¶
type ElementDecoder =coder.ElementDecoder
ElementDecoder encapsulates being able to decode an element from a reader.
funcNewElementDecoder¶
func NewElementDecoder(treflect.Type)ElementDecoder
NewElementDecoder returns an ElementDecoder the given type.
typeElementEncoder¶
type ElementEncoder =coder.ElementEncoder
ElementEncoder encapsulates being able to encode an element into a writer.
funcNewElementEncoder¶
func NewElementEncoder(treflect.Type)ElementEncoder
NewElementEncoder returns a new encoding function for the given type.
typeEncodedCoder¶
type EncodedCoder struct {// Coder is the coder to preserve across serialization.CoderCoder}EncodedCoder is a serialization wrapper around a coder for convenience.
func (EncodedCoder)MarshalJSON¶
func (wEncodedCoder) MarshalJSON() ([]byte,error)
MarshalJSON returns the JSON encoding this value.
func (*EncodedCoder)UnmarshalJSON¶
func (w *EncodedCoder) UnmarshalJSON(buf []byte)error
UnmarshalJSON sets the state of this instance from the passed in JSON.
typeEncodedFunc¶
EncodedFunc is a serialization wrapper around a function for convenience.
func (EncodedFunc)MarshalJSON¶
func (wEncodedFunc) MarshalJSON() ([]byte,error)
MarshalJSON returns the JSON encoding this value.
func (*EncodedFunc)UnmarshalJSON¶
func (w *EncodedFunc) UnmarshalJSON(buf []byte)error
UnmarshalJSON sets the state of this instance from the passed in JSON.
typeEncodedType¶
EncodedType is a serialization wrapper around a type for convenience.
func (EncodedType)MarshalJSON¶
func (wEncodedType) MarshalJSON() ([]byte,error)
MarshalJSON returns the JSON encoding this value.
func (*EncodedType)UnmarshalJSON¶
func (w *EncodedType) UnmarshalJSON(buf []byte)error
UnmarshalJSON sets the state of this instance from the passed in JSON.
typeEventTime¶
EventTime represents the time of the event that generated an element.This is distinct from the time when an element is processed.
typeFullType¶
FullType represents the tree structure of data types processed by the graph.It allows representation of composite types, such as KV<int, string> orCoGBK<int, int>, as well as "generic" such types, KV<int,T> or CoGBK<X,Y>,where the free "type variables" are the fixed universal types: T, X, etc.
typeGauge¶
Gauge is a metric that can have its new value set, and is aggregated by takingthe last reported value.
Gauge are safe to use in multiple bundles simultaneously, butnot generally threadsafe. Your DoFn needs to manage the threadsafety of Beam metrics for any additional concurrency it uses.
func (Gauge)Set¶
Set sets the current value for this gauge. The context must beprovided by the framework, or the value will not be recorded.
Example¶
package mainimport ("context""github.com/apache/beam/sdks/v2/go/pkg/beam")// A beam_test global context var to improve how the examples look.var ctx = context.Background()func main() {g := beam.NewGauge("example", "progress")g.Set(ctx, 42)}typeMetricResult¶added inv2.35.0
type MetricResult =metrics.SingleResult
MetricResult represents a single metric value, for use in writing predicate functions to query PipelineResults.
typeOption¶
type Option interface {// contains filtered or unexported methods}Option is an optional value or context to a transformation, used at pipelineconstruction time. The primary use case is providing side inputs.
typePCollection¶
type PCollection struct {// contains filtered or unexported fields}PCollection is an immutable collection of values of type 'A', which must bea concrete type, such as int or KV<int,string>. A PCollection can containeither a bounded or unbounded number of elements. Bounded and unboundedPCollections are produced as the output of PTransforms (including rootPTransforms like textio.Read), and can be passed as the inputs of otherPTransforms. Some root transforms produce bounded PCollections and othersproduce unbounded ones.
Each element in a PCollection has an associated timestamp. Sources assigntimestamps to elements when they create PCollections, and other PTransformspropagate these timestamps from their input to their output implicitly orexplicitly.
Additionally, each element is assigned to a set of windows. By default, allelements are assigned into a single default window, GlobalWindow.
funcAddFixedKey¶
func AddFixedKey(sScope, colPCollection)PCollection
AddFixedKey adds a fixed key (0) to every element.
funcCoGroupByKey¶
func CoGroupByKey(sScope, cols ...PCollection)PCollection
CoGroupByKey inserts a CoGBK transform into the pipeline.
funcCombine¶
func Combine(sScope, combinefnany, colPCollection, opts ...Option)PCollection
Combine inserts a global Combine transform into the pipeline. Itexpects a PCollection<T> as input where T is a concrete type.Combine supports TypeDefinition options for binding generic types in combinefn.
funcCombinePerKey¶
func CombinePerKey(sScope, combinefnany, colPCollection, opts ...Option)PCollection
CombinePerKey inserts a GBK and per-key Combine transform into the pipeline. Itexpects a PCollection<KV<K,T>>. The CombineFn may optionally take a key parameter.CombinePerKey supports TypeDefinition options for binding generic types in combinefn.
funcCreate¶
func Create(sScope, values ...any)PCollection
Create inserts a fixed non-empty set of values into the pipeline. The values mustbe of the same type 'A' and the returned PCollection is of type A.
The returned PCollections can be used as any other PCollections. The valuesare JSON-coded. Each runner may place limits on the sizes of the values andCreate should generally only be used for small collections.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {beam.Create(s, 5, 6, 7, 8, 9) // PCollection<int>beam.Create(s, []int{5, 6}, []int{7, 8, 9}) // PCollection<[]int>beam.Create(s, []int{5, 6, 7, 8, 9}) // PCollection<[]int>beam.Create(s, "a", "b", "c") // PCollection<string>}funcCreateList¶
func CreateList(sScope, listany)PCollection
CreateList inserts a fixed set of values into the pipeline from a slice orarray. Unlike Create this supports the creation of an empty PCollection.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {beam.CreateList(s, []int{5, 6, 7, 8, 9}) // PCollection<int>}funcDropKey¶
func DropKey(sScope, colPCollection)PCollection
DropKey drops the key for an input PCollection<KV<A,B>>. It returnsa PCollection<B>.
funcDropValue¶
func DropValue(sScope, colPCollection)PCollection
DropValue drops the value for an input PCollection<KV<A,B>>. It returnsa PCollection<A>.
funcExplode¶
func Explode(sScope, colPCollection)PCollection
Explode is a PTransform that takes a single PCollection<[]A> and returns aPCollection<A> containing all the elements for each incoming slice.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {d := beam.Create(s, []int{1, 2, 3, 4, 5}) // PCollection<[]int>beam.Explode(s, d) // PCollection<int>}funcExternal¶
func External(sScope, urnstring, payload []byte, in []PCollection, out []FullType, boundedbool) []PCollection
External defines a Beam external transform. The interpretation of this primitive is runnerspecific. The runner is responsible for parsing the payload based on theURN provided to implement the behavior of the operation. Transformlibraries should expose an API that captures the user's intent and serializethe payload as a byte slice that the runner will deserialize.
Use ExternalTagged if the runner will need to associate the PTransforms local PCollection tagswith values in the payload.
funcFlatten¶
func Flatten(sScope, cols ...PCollection)PCollection
Flatten is a PTransform that takes either multiple PCollections of type 'A'and returns a single PCollection of type 'A' containing all the elements inall the input PCollections. The name "Flatten" suggests taking a list of listsand flattening them into a single list.
By default, the Coder of the output PCollection is the same as the Coderof the first PCollection.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam""github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio")var s = beam.Scope{}func main() {a := textio.Read(s, "...some file path...") // PCollection<string>b := textio.Read(s, "...some other file path...")c := textio.Read(s, "...some third file path...")beam.Flatten(s, a, b, c) // PCollection<String>}funcGroupByKey¶
func GroupByKey(sScope, aPCollection)PCollection
GroupByKey is a PTransform that takes a PCollection of type KV<A,B>,groups the values by key and windows, and returns a PCollection of typeGBK<A,B> representing a map from each distinct key and window of theinput PCollection to an iterable over all the values associated withthat key in the input per window. Each key in the output PCollection isunique within each window.
GroupByKey is analogous to converting a multi-map into a uni-map, andrelated to GROUP BY in SQL. It corresponds to the "shuffle" step betweenthe Mapper and the Reducer in the MapReduce framework.
Two keys of type A are compared for equality by first encoding each of thekeys using the Coder of the keys of the input PCollection, and thencomparing the encoded bytes. This admits efficient parallel evaluation.Note that this requires that the Coder of the keys be deterministic.
By default, input and output PCollections share a key Coder and iterablevalues in the input and output PCollection share an element Coder.
GroupByKey is a key primitive in data-parallel processing, since it is themain way to efficiently bring associated data together into one location.It is also a key determiner of the performance of a data-parallel pipeline.
See CoGroupByKey for a way to group multiple input PCollections by a commonkey at once.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {type Doc struct{}var urlDocPairs beam.PCollection // PCollection<KV<string, Doc>>urlToDocs := beam.GroupByKey(s, urlDocPairs) // PCollection<CoGBK<string, Doc>>// CoGBK parameters receive an iterator function with all values associated// with the same key.beam.ParDo0(s, func(key string, values func(*Doc) bool) {var cur Docfor values(&cur) {// ... process all docs having that url ...}}, urlToDocs) // PCollection<KV<string, []Doc>>}funcImpulse¶
func Impulse(sScope)PCollection
Impulse emits a single empty []byte into the global window. The resultingPCollection is a singleton of type []byte.
The purpose of Impulse is to trigger another transform, such asones that take all information as side inputs.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {beam.Impulse(s) // PCollection<[]byte>}funcImpulseValue¶
func ImpulseValue(sScope, value []byte)PCollection
ImpulseValue emits the supplied byte slice into the global window. The resultingPCollection is a singleton of type []byte.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {beam.ImpulseValue(s, []byte{}) // PCollection<[]byte>}funcMust¶
func Must(aPCollection, errerror)PCollection
Must returns the input, but panics if err != nil.
funcMustN¶
func MustN(list []PCollection, errerror) []PCollection
MustN returns the input, but panics if err != nil.
funcParDo¶
func ParDo(sScope, dofnany, colPCollection, opts ...Option)PCollection
ParDo is the core element-wise PTransform in Apache Beam, invoking auser-specified function on each of the elements of the input PCollectionto produce zero or more output elements, all of which are collected intothe output PCollection. Use one of the ParDo variants for a differentnumber of output PCollections. The PCollections do not need to have thesame types.
Elements are processed independently, and possibly in parallel acrossdistributed cloud resources. The ParDo processing style is similar to whathappens inside the "Mapper" or "Reducer" class of a MapReduce-stylealgorithm.
DoFns¶
The function to use to process each element is specified by a DoFn, either assingle function or as a struct with methods, notably ProcessElement. Thestruct may also define Setup, StartBundle, FinishBundle and Teardown methods.The struct is JSON-serialized and may contain construction-time values.
Functions and types used as DoFns must be registered with beam using thebeam `register` package, so they may execute on distributed workers.Functions must not be anonymous or closures, or they will fail at execution time.
Conceptually, when a ParDo transform is executed, the elements of the inputPCollection are first divided up into some number of "bundles". These arefarmed off to distributed worker machines (or locally on a local runner instance).For each bundle of input elements processing proceeds as follows:
- If a struct, a fresh instance of the argument DoFn is created on aworker from json serialization, and the Setup method is called on thisinstance, if present. A runner may reuse DoFn instances for multiplebundles. A DoFn that has terminated abnormally (by returning an error)will never be reused.
- The DoFn's StartBundle method, if provided, is called to initialize it.
- The DoFn's ProcessElement method is called on each of the input elementsin the bundle.
- The DoFn's FinishBundle method, if provided, is called to complete itswork. After FinishBundle is called, the framework will not again invokeProcessElement or FinishBundle until a new call to StartBundle hasoccurred.
- If any of Setup, StartBundle, ProcessElement or FinishBundle methodsreturn an error, the Teardown method, if provided, will be called on theDoFn instance.
- If a runner will no longer use a DoFn, the Teardown method, if provided,will be called on the discarded instance.
Each of the calls to any of the DoFn's processing methods can produce zeroor more output elements. All of the of output elements from all of the DoFninstances are included in an output PCollection.
For example:
func stringLen(word string) int { return len(word)}func init() { register.Function1x1(stringLen) }words := beam.ParDo(s, &Foo{...}, ...)lengths := beam.ParDo(s, stringLen, words)Each output element has the same timestamp and is in the same windows as itscorresponding input element. The timestamp can be accessed and/or emitted byincluding a EventTime-typed parameter. The name of the function or struct isused as the DoFn name. Function literals do not have stable names and shouldthus not be used in production code.
Side Inputs¶
While a ParDo processes elements from a single "main input" PCollection, itcan take additional "side input" PCollections. These SideInput along withthe DoFn parameter form express styles of accessing PCollection computed byearlier pipeline operations, passed in to the ParDo transform using SideInputoptions, and their contents accessible to each of the DoFn operations. Forexample:
func filterLessThanCutoff(word string, cutoff int, emit func(string)) {if len(word) < cutoff {emit(word)}}func init() { register.Function3x0(filterLessThanCutoff) }words := ...cufoff := ... // Singleton PCollection<int>smallWords := beam.ParDo(s, filterLessThanCutoff, words, beam.SideInput{Input: cutoff})Additional Outputs¶
Optionally, a ParDo transform can produce zero or multiple outputPCollections. Note the use of ParDo2 to specfic 2 outputs. For example:
func partitionAtCutoff(word string, cutoff int, small, big func(string)) {if len(word) < cutoff {small(word)} else {big(word)}}func init() { register.Function4x0(partitionAtCutoff) }words := ...cufoff := ... // Singleton PCollection<int>small, big := beam.ParDo2(s, partitionAtCutoff, words, beam.SideInput{Input: cutoff})By default, the Coders for the elements of each output PCollections isinferred from the concrete type.
No Global Shared State¶
There are three main ways to initialize the state of a DoFn instanceprocessing a bundle:
Define public instance variable state. This state will be automaticallyJSON serialized and then deserialized in the DoFn instances created forbundles. This method is good for state known when the original DoFn iscreated in the main program, if it's not overly large. This is notsuitable for any state which must only be used for a single bundle, asDoFn's may be used to process multiple bundles.
Compute the state as a singleton PCollection and pass it in as a sideinput to the DoFn. This is good if the state needs to be computed by thepipeline, or if the state is very large and so is best read from file(s)rather than sent as part of the DoFn's serialized state.
Initialize the state in each DoFn instance, in a StartBundle method.This is good if the initialization doesn't depend on any informationknown only by the main program or computed by earlier pipelineoperations, but is the same for all instances of this DoFn for allprogram executions, say setting up empty caches or initializing constantdata.
ParDo operations are intended to be able to run in parallel across multipleworker machines. This precludes easy sharing and updating mutable stateacross those machines. There is no support in the Beam model forcommunicating and synchronizing updates to shared state across workermachines, so programs should not access any mutable global variable state intheir DoFn, without understanding that the Go processes for the main programand workers will each have its own independent copy of such state, and therewon't be any automatic copying of that state across Java processes. Allinformation should be communicated to DoFn instances via main and sideinputs and serialized state, and all output should be communicated from aDoFn instance via output PCollections, in the absence of externalcommunication mechanisms written by user code.
Splittable DoFns¶
Splittable DoFns are DoFns that are able to split work within an element,as opposed to only at element boundaries like normal DoFns. This is usefulfor DoFns that emit many outputs per input element and can distribute thatwork among multiple workers. The most common examples of this are sources.
In order to split work within an element, splittable DoFns use the concept ofrestrictions, which are objects that are associated with an element anddescribe a portion of work on that element. For example, a restrictionassociated with a filename might describe what byte range within that file toprocess. In addition to restrictions, splittable DoFns also rely onrestriction trackers to track progress and perform splits on a restrictioncurrently being processed. See the `RTracker` interface in core/sdf/sdf.gofor more details.
Splitting¶
Splitting means taking one restriction and splitting into two or more thatcover the entire input space of the original one. In other words, processingall the split restrictions should produce identical output to processingthe original one.
Splitting occurs in two stages. The initial splitting occurs before anyrestrictions have started processing. This step is used to split largerestrictions into smaller ones that can then be distributed among multipleworkers for processing. Initial splitting is user-defined and optional.
Dynamic splitting occurs during the processing of a restriction in runnersthat have implemented it. If there are available workers, runners may splitthe unprocessed portion of work from a busy worker and shard it to availableworkers in order to better distribute work. With unsplittable DoFns this canonly occur on element boundaries, but for splittable DoFns this splitcan land within a restriction and will require splitting that restriction.
- Note: Dataflow is currently the only runner with support for bothinitial and dynamic splitting. Other runners do not support dynamicsplitting, and stragglers will therefore not be split during executionwith liquid sharding.
Splittable DoFn Methods¶
Making a splittable DoFn requires the following methods to be implemented ona DoFn in addition to the usual DoFn requirements. In the followingmethod signatures `elem` represents the main input elements to the DoFn, andshould match the types used in ProcessElement. `restriction` represents theuser-defined restriction, and can be any type as long as it is consistentthroughout all the splittable DoFn methods:
- `CreateInitialRestriction(context.Context?, elem) (restriction, error?)`CreateInitialRestriction creates an initial restriction encompassing anentire element. The restriction created stays associated with the elementit describes.
- `SplitRestriction(context.Context?, elem, restriction) ([]restriction, error?)`SplitRestriction takes an element and its initial restriction, andoptionally performs an initial split on it, returning a slice of all thesplit restrictions. If no splits are desired, the method returns a slicecontaining only the original restriction. This method will always becalled on each newly created restriction before they are processed.
- `RestrictionSize(context.Context?, elem, restriction) (float64, error?)`RestrictionSize returns a cheap size estimation for a restriction. Thissize is an abstract non-negative scalar value that represents how muchwork a restriction takes compared to other restrictions in the same DoFn.For example, a size of 200 represents twice as much work as a size of100, but the numbers do not represent anything on their own. Size isused by runners to estimate work for dynamic work rebalancing. Must bethread safe. Will be invoked concurrently during bundle processing due torunner initiated splitting and progress estimation.
- `CreateTracker(context.Context?, restriction) (restrictionTracker, error?)`CreateTracker creates and returns a restriction tracker (a concrete typeimplementing the `sdf.RTracker` interface) given a restriction. Therestriction tracker is used to track progress processing a restriction,and to allow for dynamic splits. This method is called on eachrestriction right before processing begins.
- `ProcessElement(context.Context?, sdf.RTracker, elem, func emit(output))(sdf.ProcessContinuation?, error?)`For splittable DoFns, ProcessElement requires a restriction trackerbefore inputs, and generally requires emits to be used for outputs, sincerestrictions will generally produce multiple outputs. For more detailson processing restrictions in a splittable DoFn, see `sdf.RTracker`.ProcessElement can optionally return a `sdf.ProcessContinuation` tosignal to the runner that processing should be resumed at a later time,if not all data within the restriction can be processed within thelifetime of a single bundle. The runner tries to respect the resume time,however it is not guaranteed.
A splittable DoFn can also implement the following optional method:
- `TruncateRestriction(context.Context?, sdf.RTracker, elem) (restriction, error?)`TruncateRestriction is triggered when a pipeline starts to drain on runnersthat support pipeline draining. It helps finish the pipeline faster bytruncating the restriction. If not implemented, the default behavior forbounded restrictions is to process the remainder of the restriction, andfor unbounded restrictions to process until the next SDF-initiatedcheckpoint or runner-initiated split occurs.
Fault Tolerance¶
In a distributed system, things can fail: machines can crash, machines canbe unable to communicate across the network, etc. While individual failuresare rare, the larger the job, the greater the chance that something,somewhere, will fail. Beam runners may strive to mask such failures byretrying failed DoFn bundles. This means that a DoFn instance might processa bundle partially, then crash for some reason, then be rerun (often as anew process) on that same bundle and on the same elements as before.Sometimes two or more DoFn instances will be running on the same bundlesimultaneously, with the system taking the results of the first instance tocomplete successfully. Consequently, the code in a DoFn needs to be writtensuch that these duplicate (sequential or concurrent) executions do not causeproblems. If the outputs of a DoFn are a pure function of its inputs, thenthis requirement is satisfied. However, if a DoFn's execution has externalside-effects, such as performing updates to external HTTP services, thenthe DoFn's code needs to take care to ensure that those updates areidempotent and that concurrent updates are acceptable. This property can bedifficult to achieve, so it is advisable to strive to keep DoFns as purefunctions as much as possible.
Optimization¶
Beam runners may choose to apply optimizations to a pipeline before it isexecuted. A key optimization, fusion, relates to ParDo operations. If oneParDo operation produces a PCollection that is then consumed as the maininput of another ParDo operation, the two ParDo operations will be fusedtogether into a single ParDo operation and run in a single pass; this is"producer-consumer fusion". Similarly, if two or more ParDo operationshave the same PCollection main input, they will be fused into a single ParDothat makes just one pass over the input PCollection; this is "siblingfusion".
If after fusion there are no more unfused references to a PCollection (e.g.,one between a producer ParDo and a consumer ParDo), the PCollection itselfis "fused away" and won't ever be written to disk, saving all the I/O andspace expense of constructing it.
When Beam runners apply fusion optimization, it is essentially "free" towrite ParDo operations in a very modular, composable style, each ParDooperation doing one clear task, and stringing together sequences of ParDooperations to get the desired overall effect. Such programs can be easier tounderstand, easier to unit-test, easier to extend and evolve, and easier toreuse in new programs. The predefined library of PTransforms that come withBeam makes heavy use of this modular, composable style, trusting to therunner to "flatten out" all the compositions into highly optimized stages.
Seehttps://beam.apache.org/documentation/programming-guide/#pardofor the web documentation for ParDo
Example (AdditionalOutputs)¶
Optionally, a ParDo transform can produce zero or multiple outputPCollections. Note the use of ParDo2 to specify 2 outputs.
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {var words beam.PCollection // PCollection<string>var cutoff beam.PCollection // Singleton PCollection<int>small, big := beam.ParDo2(s, func(word string, cutoff int, small, big func(string)) {if len(word) < cutoff {small(word)} else {big(word)}}, words, beam.SideInput{Input: cutoff})_, _ = small, big}funcParDoN¶
func ParDoN(sScope, dofnany, colPCollection, opts ...Option) []PCollection
ParDoN inserts a ParDo with any number of outputs into the pipeline.
funcPartition¶
func Partition(sScope, nint, fnany, colPCollection) []PCollection
Partition takes a PCollection<T> and a PartitionFn, uses the PartitionFn tosplit the elements of the input PCollection into N partitions, and returnsa []PCollection<T> that bundles N PCollection<T>s containing the split elements.
A PartitionFn has the signature `func(T) int.`
func lenToTen(s string) int {if len(s) > 9 {return 10}return len(s)}// Partition functions must be registered with Beam, and must not be closures.func init() { register.Function1x1(lenToTen) }// The number of partitions goes up to 11 since we can return 0 through 10wordsByLength := beam.Partition(s, 11, lenToTen, inputStrings)T is permitted to be a KV.
funcReshuffle¶
func Reshuffle(sScope, colPCollection)PCollection
Reshuffle copies a PCollection of the same kind and using the same elementcoder, and maintains the same windowing information. Importantly, it allowsthe result PCollection to be processed with a different sharding, in adifferent stage than the input PCollection.
For example, if a computation needs a lot of parallelism butproduces only a small amount of output data, then the computationproducing the data can run with as much parallelism as needed,while the output file is written with a smaller amount ofparallelism, using the following pattern:
pc := bigHairyComputationNeedingParallelism(scope) // PCollection<string>resharded := beam.Reshuffle(scope, pc) // PCollection<string>
Another use case is when one has a non-deterministic DoFn followed by onethat performs externally-visible side effects. Inserting a Reshufflebetween these DoFns ensures that retries of the second DoFn will always bethe same, which is necessary to make side effects idempotent.
A Reshuffle will force a break in the optimized pipeline. Consequently,this operation should be used sparingly, only after determining that thepipeline without reshuffling is broken in some way and performing an extraoperation is worth the cost.
funcSeq¶
func Seq(sScope, colPCollection, dofns ...any)PCollection
Seq is a convenience helper to chain single-input/single-output ParDos togetherin a sequence.
Example¶
package mainimport ("math""strconv""github.com/apache/beam/sdks/v2/go/pkg/beam""github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio")var s = beam.Scope{}func main() {a := textio.Read(s, "...some file path...") // PCollection<string>beam.Seq(s, a,strconv.Atoi, // string to intfunc(i int) float64 { return float64(i) }, // int to float64math.Signbit, // float64 to bool) // PCollection<bool>}funcSwapKV¶
func SwapKV(sScope, colPCollection)PCollection
SwapKV swaps the key and value for an input PCollection<KV<A,B>>. It returnsa PCollection<KV<B,A>>.
funcTryCoGroupByKey¶
func TryCoGroupByKey(sScope, cols ...PCollection) (PCollection,error)
TryCoGroupByKey inserts a CoGBK transform into the pipeline. Returnsan error on failure.
funcTryCombine¶
func TryCombine(sScope, combinefnany, colPCollection, opts ...Option) (PCollection,error)
TryCombine attempts to insert a global Combine transform into the pipeline. It may failfor multiple reasons, notably that the combinefn is not valid or cannot be bound-- due to type mismatch, say -- to the incoming PCollections.
funcTryCombinePerKey¶
func TryCombinePerKey(sScope, combinefnany, colPCollection, opts ...Option) (PCollection,error)
TryCombinePerKey attempts to insert a per-key Combine transform into the pipeline. It may failfor multiple reasons, notably that the combinefn is not valid or cannot be bound-- due to type mismatch, say -- to the incoming PCollection.
funcTryCreate¶
func TryCreate(sScope, values ...any) (PCollection,error)
TryCreate inserts a fixed non-empty set of values into the pipeline. Thevalues must be of the same type.
funcTryCreateList¶
func TryCreateList(sScope, listany) (PCollection,error)
TryCreateList inserts a fixed set of values into the pipeline from a slice orarray. The values must be of the same type. Unlike TryCreate this supportsthe creation of an empty PCollection.
funcTryExternal¶
func TryExternal(sScope, urnstring, payload []byte, in []PCollection, out []FullType, boundedbool) ([]PCollection,error)
TryExternal attempts to perform the work of External, returning an error indicatingwhy the operation failed.
funcTryFlatten¶
func TryFlatten(sScope, cols ...PCollection) (PCollection,error)
TryFlatten merges incoming PCollections of type 'A' to a single PCollectionof type 'A'. Returns an error indicating the set of PCollections that couldnot be flattened.
funcTryGroupByKey¶
func TryGroupByKey(sScope, aPCollection) (PCollection,error)
TryGroupByKey inserts a GBK transform into the pipeline. Returnsan error on failure.
funcTryParDo¶
func TryParDo(sScope, dofnany, colPCollection, opts ...Option) ([]PCollection,error)
TryParDo attempts to insert a ParDo transform into the pipeline. It may failfor multiple reasons, notably that the dofn is not valid or cannot be bound-- due to type mismatch, say -- to the incoming PCollections.
funcTryReshuffle¶
func TryReshuffle(sScope, colPCollection) (PCollection,error)
TryReshuffle inserts a Reshuffle into the pipeline, and returns an error ifthe pcollection's unable to be reshuffled.
funcTryWindowInto¶
func TryWindowInto(sScope, wfn *window.Fn, colPCollection, opts ...WindowIntoOption) (PCollection,error)
TryWindowInto attempts to insert a WindowInto transform.
funcWindowInto¶
func WindowInto(sScope, ws *window.Fn, colPCollection, opts ...WindowIntoOption)PCollection
WindowInto applies the windowing strategy to each element.
func (PCollection)Coder¶
func (pPCollection) Coder()Coder
Coder returns the coder for the collection. The Coder is of type 'A'.
func (PCollection)IsValid¶
func (pPCollection) IsValid()bool
IsValid returns true iff the PCollection is valid and part of a Pipeline.Any use of an invalid PCollection will result in a panic.
func (PCollection)SetCoder¶
func (pPCollection) SetCoder(cCoder)error
SetCoder set the coder for the collection. The Coder must be of type 'A'.
func (PCollection)String¶
func (pPCollection) String()string
func (PCollection)Type¶
func (pPCollection) Type()FullType
Type returns the full type 'A' of the elements. 'A' must be a concretetype, such as int or KV<int,string>.
typePaneInfo¶added inv2.38.0
PaneInfo represents a PaneInfo that provides information about current firing when triggers are used.
typePipeline¶
type Pipeline struct {// contains filtered or unexported fields}Pipeline manages a directed acyclic graph of primitive PTransforms, and thePCollections that the PTransforms consume and produce. Each Pipeline isself-contained and isolated from any other Pipeline. The Pipeline owns thePCollections and PTransforms and they can be used by that Pipeline only.Pipelines can safely be executed concurrently.
typePipelineResult¶
PipelineResult is the result of beamx.RunWithMetrics.
typeSchemaProvider¶
type SchemaProvider interface {FromLogicalType(reflect.Type) (reflect.Type,error)BuildEncoder(rtreflect.Type) (func(any,io.Writer)error,error)BuildDecoder(rtreflect.Type) (func(io.Reader) (any,error),error)}SchemaProvider specializes schema handling for complex types, including conversion to avalid schema base type,
In particular, they are intended to handle schema for interface types.
Sepearated out the acting type from the provider implementation is good.
typeScope¶
type Scope struct {// contains filtered or unexported fields}Scope is a hierarchical grouping for composite transforms. Scopes can beenclosed in other scopes and for a tree structure. For pipeline updates,the scope chain form a unique name. The scope chain can also be used formonitoring and visualization purposes.
func (Scope)IsValid¶
IsValid returns true iff the Scope is valid. Any use of an invalid Scopewill result in a panic.
func (Scope)Scope¶
Scope returns a sub-scope with the given name. The name provided maybe augmented to ensure uniqueness.
func (Scope)WithContext¶added inv2.53.0
WithContext creates a named subscope with an attached context for therepresented composite transform. Values from that context may beextracted and added to the composite PTransform or generate a newenvironment for scoped transforms.
If you're not sure whether these apply to your transform, use Scopeinstead, and do not set a context.
typeSideInput¶
type SideInput struct {InputPCollection}SideInput provides a view of the given PCollection to the transformation.
Example¶
package mainimport ("github.com/apache/beam/sdks/v2/go/pkg/beam")var s = beam.Scope{}func main() {// words and sample are PCollection<string>var words, sample beam.PCollection// analyzeFn emits values from the primary based on the singleton side input.analyzeFn := func(primary string, side string, emit func(string)) {}// Use beam.SideInput to declare that the sample PCollection is the side input.beam.ParDo(s, analyzeFn, words, beam.SideInput{Input: sample})}typeT¶
T is a Universal Type used to represent "generic" types in DoFn andPCollection signatures. Each universal type is distinct from all others.
typeTypeDefinition¶
type TypeDefinition struct {// Var is the universal type defined.Varreflect.Type// T is the type it is bound to.Treflect.Type}TypeDefinition provides construction-time type information that the platformcannot infer, such as structured storage sources. These types are universal typesthat appear as output only. Types that are inferrable should not be conveyed viathis mechanism.
typeU¶
U is a Universal Type used to represent "generic" types in DoFn andPCollection signatures. Each universal type is distinct from all others.
typeV¶
V is a Universal Type used to represent "generic" types in DoFn andPCollection signatures. Each universal type is distinct from all others.
typeW¶
W is a Universal Type used to represent "generic" types in DoFn andPCollection signatures. Each universal type is distinct from all others.
typeWindow¶
Window represents the aggregation window of this element. An element canbe a part of multiple windows, based on the element's event time.
typeWindowIntoOption¶
type WindowIntoOption interface {// contains filtered or unexported methods}funcAllowedLateness¶added inv2.34.0
func AllowedLateness(delaytime.Duration)WindowIntoOption
AllowedLateness configures for how long data may arrive after the end of a window.
funcPanesAccumulate¶added inv2.34.0
func PanesAccumulate()WindowIntoOption
PanesAccumulate applies an Accumulating AccumulationMode to the window.After a pane fires, already processed elements will accumulate andelements will be repeated in subseqent firings for the window.
funcPanesDiscard¶added inv2.34.0
func PanesDiscard()WindowIntoOption
PanesDiscard applies a Discarding AccumulationMode to the window.After a pane fires, already processed elements will be discardedand not included in later firings for the window.
funcTrigger¶added inv2.34.0
func Trigger(trtrigger.Trigger)WindowIntoOption
Trigger applies the given trigger to the window.
typeX¶
X is a Universal Type used to represent "generic" types in DoFn andPCollection signatures. Each universal type is distinct from all others.
Source Files¶
Directories¶
| Path | Synopsis |
|---|---|
Package artifact contains utilities for staging and retrieving artifacts. | Package artifact contains utilities for staging and retrieving artifacts. |
gcsproxy Package gcsproxy contains artifact staging and retrieval servers backed by GCS. | Package gcsproxy contains artifact staging and retrieval servers backed by GCS. |
Package core contains constants and other static data related to the SDK, such as the SDK Name and version. | Package core contains constants and other static data related to the SDK, such as the SDK Name and version. |
funcx Package funcx contains functions and types used to perform type analysis of Beam functions. | Package funcx contains functions and types used to perform type analysis of Beam functions. |
graph Package graph is the internal representation of the Beam execution plan. | Package graph is the internal representation of the Beam execution plan. |
graph/coder Package coder contains coder representation and utilities. | Package coder contains coder representation and utilities. |
graph/coder/testutil Package testutil contains helpers to test and validate custom Beam Schema coders. | Package testutil contains helpers to test and validate custom Beam Schema coders. |
graph/mtime Package mtime contains a millisecond representation of time. | Package mtime contains a millisecond representation of time. |
graph/window Package window contains window representation, windowing strategies and utilities. | Package window contains window representation, windowing strategies and utilities. |
graph/window/trigger Package trigger helps construct aggregation triggers with beam.WindowInto. | Package trigger helps construct aggregation triggers with beam.WindowInto. |
metrics Package metrics implements the Beam metrics API, described at http://s.apache.org/beam-metrics-api | Package metrics implements the Beam metrics API, described at http://s.apache.org/beam-metrics-api |
runtime Package runtime contains runtime hooks and utilities for pipeline options and type registration. | Package runtime contains runtime hooks and utilities for pipeline options and type registration. |
runtime/coderx Package coderx contains coders for primitive types that aren't included in the beam model. | Package coderx contains coders for primitive types that aren't included in the beam model. |
runtime/contextreg Package contextreg contains the global registrations of functions for extracting ptransform annotations or environment resource hints from context.Context attached to scopes. | Package contextreg contains the global registrations of functions for extracting ptransform annotations or environment resource hints from context.Context attached to scopes. |
runtime/exec Package exec contains runtime plan representation and execution. | Package exec contains runtime plan representation and execution. |
runtime/exec/optimized Package optimized contains type-specialized shims for faster execution. | Package optimized contains type-specialized shims for faster execution. |
runtime/genx Package genx is a convenience package to better support the code generator. | Package genx is a convenience package to better support the code generator. |
runtime/graphx Package graphx provides facilities to help with the serialization of pipelines into a serializable graph structure suitable for the worker. | Package graphx provides facilities to help with the serialization of pipelines into a serializable graph structure suitable for the worker. |
runtime/graphx/schema Package schema contains utility functions for relating Go types and Beam Schemas. | Package schema contains utility functions for relating Go types and Beam Schemas. |
runtime/harness Package harness implements the SDK side of the Beam FnAPI. | Package harness implements the SDK side of the Beam FnAPI. |
runtime/harness/init Package init contains the harness initialization code defined by the FnAPI. | Package init contains the harness initialization code defined by the FnAPI. |
runtime/harness/statecache Package statecache implements the state caching feature described by the Beam Fn API | Package statecache implements the state caching feature described by the Beam Fn API |
runtime/pipelinex Package pipelinex contains utilities for manipulating Beam proto pipelines. | Package pipelinex contains utilities for manipulating Beam proto pipelines. |
runtime/xlangx Package xlangx contains various low-level utilities needed for adding cross-language transforms to the pipeline. | Package xlangx contains various low-level utilities needed for adding cross-language transforms to the pipeline. |
runtime/xlangx/expansionx Package expansionx contains utilities for starting expansion services for cross-language transforms. | Package expansionx contains utilities for starting expansion services for cross-language transforms. |
sdf Package contains interfaces used specifically for splittable DoFns. | Package contains interfaces used specifically for splittable DoFns. |
state Package state contains structs for reading and manipulating pipeline state. | Package state contains structs for reading and manipulating pipeline state. |
timers Package timers contains structs for setting pipeline timers. | Package timers contains structs for setting pipeline timers. |
typex Package typex contains full type representation for PCollections and DoFns, and utilities for type checking. | Package typex contains full type representation for PCollections and DoFns, and utilities for type checking. |
util/dot Package dot produces DOT graphs from Beam graph representations. | Package dot produces DOT graphs from Beam graph representations. |
util/hooks Package hooks allows runners to tailor execution of the worker harness. | Package hooks allows runners to tailor execution of the worker harness. |
util/ioutilx Package ioutilx contains additional io utilities. | Package ioutilx contains additional io utilities. |
util/jsonx Package jsonx contains utilities for working with JSON encoded data. | Package jsonx contains utilities for working with JSON encoded data. |
util/protox Package protox contains utilities for working with protobufs. | Package protox contains utilities for working with protobufs. |
util/reflectx Package reflectx contains a set of reflection utilities and well-known types. | Package reflectx contains a set of reflection utilities and well-known types. |
util/symtab Package symtab allows reading low-level symbol information from the symbol table. | Package symtab allows reading low-level symbol information from the symbol table. |
internal | |
errors Package errors contains functionality for creating and wrapping errors with improved formatting compared to the standard Go error functionality. | Package errors contains functionality for creating and wrapping errors with improved formatting compared to the standard Go error functionality. |
io | |
avroio Package avroio contains transforms for reading and writing avro files. | Package avroio contains transforms for reading and writing avro files. |
bigqueryio Package bigqueryio provides transformations and utilities to interact with Google BigQuery. | Package bigqueryio provides transformations and utilities to interact with Google BigQuery. |
bigtableio Package bigtableio provides transformations and utilities to interact with Google Bigtable. | Package bigtableio provides transformations and utilities to interact with Google Bigtable. |
databaseio Package databaseio provides transformations and utilities to interact with generic database database/sql API. | Package databaseio provides transformations and utilities to interact with generic database database/sql API. |
datastoreio Package datastoreio provides transformations and utilities to interact with Google Datastore. | Package datastoreio provides transformations and utilities to interact with Google Datastore. |
fhirio Package fhirio provides an API for reading and writing resources to Google Cloud Healthcare Fhir stores. | Package fhirio provides an API for reading and writing resources to Google Cloud Healthcare Fhir stores. |
fileio Package fileio provides transforms for matching and reading files. | Package fileio provides transforms for matching and reading files. |
filesystem Package filesystem contains an extensible file system abstraction. | Package filesystem contains an extensible file system abstraction. |
filesystem/gcs Package gcs contains a Google Cloud Storage (GCS) implementation of the Beam file system. | Package gcs contains a Google Cloud Storage (GCS) implementation of the Beam file system. |
filesystem/local Package local contains a local file implementation of the Beam file system. | Package local contains a local file implementation of the Beam file system. |
filesystem/memfs Package memfs contains a in-memory Beam filesystem. | Package memfs contains a in-memory Beam filesystem. |
filesystem/s3 Package s3 contains an AWS S3 implementation of the Beam file system. | Package s3 contains an AWS S3 implementation of the Beam file system. |
mongodbio Package mongodbio contains transforms for reading from and writing to MongoDB. | Package mongodbio contains transforms for reading from and writing to MongoDB. |
natsio Package natsio contains transforms for interacting with NATS. | Package natsio contains transforms for interacting with NATS. |
parquetio Package parquetio contains transforms for reading and writing parquet files | Package parquetio contains transforms for reading and writing parquet files |
pubsubio Package pubsubio provides access to Pub/Sub on Dataflow streaming. | Package pubsubio provides access to Pub/Sub on Dataflow streaming. |
rtrackers/offsetrange Package offsetrange defines a restriction and restriction tracker for offset ranges. | Package offsetrange defines a restriction and restriction tracker for offset ranges. |
spannerio Package spannerio provides an API for reading and writing resouces to Google Spanner datastores. | Package spannerio provides an API for reading and writing resouces to Google Spanner datastores. |
synthetic Package synthetic contains transforms for creating synthetic pipelines. | Package synthetic contains transforms for creating synthetic pipelines. |
textio Package textio contains transforms for reading and writing text files. | Package textio contains transforms for reading and writing text files. |
xlang/bigqueryio Package bigqueryio contains cross-language functionality for using Google Cloud BigQuery (https://cloud.google.com/bigquery). | Package bigqueryio contains cross-language functionality for using Google Cloud BigQuery (https://cloud.google.com/bigquery). |
xlang/bigtableio Package bigtableio contains cross-language functionality for using Google Cloud BigQuery (https://cloud.google.com/bigquery). | Package bigtableio contains cross-language functionality for using Google Cloud BigQuery (https://cloud.google.com/bigquery). |
xlang/debeziumio Package debeziumio contains cross-language functionality for using Debezium (http://kafka.apache.org/). | Package debeziumio contains cross-language functionality for using Debezium (http://kafka.apache.org/). |
xlang/jdbcio Package jdbcio contains cross-language functionality for reading and writing data to JDBC. | Package jdbcio contains cross-language functionality for reading and writing data to JDBC. |
xlang/kafkaio Package kafkaio contains cross-language functionality for using Apache Kafka (http://kafka.apache.org/). | Package kafkaio contains cross-language functionality for using Apache Kafka (http://kafka.apache.org/). |
xlang/schemaio Package schemaio contains utilities for constructing cross-language IO wrappers meant to interface with the Java SDK's Schema IOs. | Package schemaio contains utilities for constructing cross-language IO wrappers meant to interface with the Java SDK's Schema IOs. |
Package log contains a re-targetable context-aware logging system. | Package log contains a re-targetable context-aware logging system. |
Package model contains the portable Beam model contracts. | Package model contains the portable Beam model contracts. |
options | |
gcpopts Package gcpopts contains shared options for Google Cloud Platform. | Package gcpopts contains shared options for Google Cloud Platform. |
jobopts Package jobopts contains shared options for job submission. | Package jobopts contains shared options for job submission. |
resource Package resource supports Beam resource hints to specify scoped hints or annotations to pipelines. | Package resource supports Beam resource hints to specify scoped hints or annotations to pipelines. |
Package provision contains utilities for obtaining runtime provision, information -- such as pipeline options. | Package provision contains utilities for obtaining runtime provision, information -- such as pipeline options. |
Package register contains functions for registering and optimizing your DoFn. | Package register contains functions for registering and optimizing your DoFn. |
Package runners defines the common "--runner" flag. | Package runners defines the common "--runner" flag. |
dataflow Package dataflow contains the Dataflow runner for submitting pipelines to Google Cloud Dataflow. | Package dataflow contains the Dataflow runner for submitting pipelines to Google Cloud Dataflow. |
dataflow/dataflowlib Package dataflowlib translates a Beam pipeline model to the Dataflow API job model, for submission to Google Cloud Dataflow. | Package dataflowlib translates a Beam pipeline model to the Dataflow API job model, for submission to Google Cloud Dataflow. |
direct Package direct contains the direct runner for running single-bundle pipelines in the current process. | Package direct contains the direct runner for running single-bundle pipelines in the current process. |
dot Package dot is a Beam runner that "runs" a pipeline by producing a DOT graph of the execution plan. | Package dot is a Beam runner that "runs" a pipeline by producing a DOT graph of the execution plan. |
flink Package flink contains the Flink runner. | Package flink contains the Flink runner. |
prism Package prism contains a local runner for running pipelines in the current process. | Package prism contains a local runner for running pipelines in the current process. |
prism/internal Package internal is where the less separable parts of the runner are put together in order to execute pipelines, and validate that beam features are implemented, and configured appropriately for the variant a pipeline is using. | Package internal is where the less separable parts of the runner are put together in order to execute pipelines, and validate that beam features are implemented, and configured appropriately for the variant a pipeline is using. |
prism/internal/config Package config defines and handles the parsing and provision of configurations for the runner. | Package config defines and handles the parsing and provision of configurations for the runner. |
prism/internal/engine Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc | Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc |
prism/internal/jobservices Package jobservices handles services necessary WRT handling jobs from SDKs. | Package jobservices handles services necessary WRT handling jobs from SDKs. |
prism/internal/urns Package urns handles extracting urns from all the protos. | Package urns handles extracting urns from all the protos. |
prism/internal/web Package web serves a web UI for Prism. | Package web serves a web UI for Prism. |
prism/internal/worker Package worker handles interactions with SDK side workers, representing the worker services, communicating with those services, and SDK environments. | Package worker handles interactions with SDK side workers, representing the worker services, communicating with those services, and SDK environments. |
samza Package samza contains the Samza runner. | Package samza contains the Samza runner. |
spark Package spark contains the Spark runner. | Package spark contains the Spark runner. |
universal Package universal contains a general-purpose runner that can submit jobs to any portable Beam runner. | Package universal contains a general-purpose runner that can submit jobs to any portable Beam runner. |
universal/extworker Package extworker provides an external worker service and related utilities. | Package extworker provides an external worker service and related utilities. |
universal/runnerlib Package runnerlib contains utilities for submitting Go pipelines to a Beam model runner. | Package runnerlib contains utilities for submitting Go pipelines to a Beam model runner. |
vet Package vet is a Beam runner that "runs" a pipeline by producing generated code to avoid symbol table lookups and reflection in pipeline execution. | Package vet is a Beam runner that "runs" a pipeline by producing generated code to avoid symbol table lookups and reflection in pipeline execution. |
vet/testpipeline Package testpipeline exports small test pipelines for testing the vet runner. | Package testpipeline exports small test pipelines for testing the vet runner. |
testing | |
passert Package passert contains verification transformations for testing pipelines. | Package passert contains verification transformations for testing pipelines. |
ptest Package ptest contains utilities for pipeline unit testing. | Package ptest contains utilities for pipeline unit testing. |
teststream Package teststream contains code configuring the TestStream primitive for use in testing code that is meant to be run on streaming data sources. | Package teststream contains code configuring the TestStream primitive for use in testing code that is meant to be run on streaming data sources. |
transforms | |
filter Package filter contains transformations for removing pipeline elements based on various conditions. | Package filter contains transformations for removing pipeline elements based on various conditions. |
periodic Package periodic contains transformations for generating periodic sequences. | Package periodic contains transformations for generating periodic sequences. |
sql Package sql contains SQL transform APIs, allowing SQL queries to be used in Beam Go pipelines. | Package sql contains SQL transform APIs, allowing SQL queries to be used in Beam Go pipelines. |
sql/sqlx Package sqlx contains "internal" SQL transform interfaces that are needed by the SQL expansion providers. | Package sqlx contains "internal" SQL transform interfaces that are needed by the SQL expansion providers. |
stats Package stats contains transforms for statistical processing. | Package stats contains transforms for statistical processing. |
top Package top contains transformations for finding the smallest (or largest) N elements based on arbitrary orderings. | Package top contains transformations for finding the smallest (or largest) N elements based on arbitrary orderings. |
xlang Package xlang contains cross-language transforms. | Package xlang contains cross-language transforms. |
xlang/dataframe Package dataframe is a wrapper for DataframeTransform defined in Apache Beam Python SDK. | Package dataframe is a wrapper for DataframeTransform defined in Apache Beam Python SDK. |
xlang/inference Package inference has the cross language implementation of RunInference API implemented in Python SDK. | Package inference has the cross language implementation of RunInference API implemented in Python SDK. |
xlang/python Package python contains data structures required for python external transforms in a multilanguage pipeline. | Package python contains data structures required for python external transforms in a multilanguage pipeline. |
xlang/schema Package schema has the cross language implementation for calling schema transforms in other language SDKs. | Package schema has the cross language implementation for calling schema transforms in other language SDKs. |
util | |
diagnostics Package diagnostics is a beam internal package that contains code for writing and uploading diagnostic info (e.g. | Package diagnostics is a beam internal package that contains code for writing and uploading diagnostic info (e.g. |
errorx Package errorx contains utilities for handling errors. | Package errorx contains utilities for handling errors. |
execx Package execx contains wrappers and utilities for the exec package. | Package execx contains wrappers and utilities for the exec package. |
fsx Package fsx contains utilities for working with filesystems. | Package fsx contains utilities for working with filesystems. |
gcsx Package gcsx contains utilities for working with Google Cloud Storage (GCS). | Package gcsx contains utilities for working with Google Cloud Storage (GCS). |
grpcx Package grpcx contains utilities for working with gRPC. | Package grpcx contains utilities for working with gRPC. |
harnessopts Package harnessopts defines user-facing entrypoints into Beam hooks affecting the SDK harness. | Package harnessopts defines user-facing entrypoints into Beam hooks affecting the SDK harness. |
pubsubx Package pubsubx contains utilities for working with Google PubSub. | Package pubsubx contains utilities for working with Google PubSub. |
shimx Package shimx specifies the templates for generating type assertion shims for Apache Beam Go SDK pipelines. | Package shimx specifies the templates for generating type assertion shims for Apache Beam Go SDK pipelines. |
starcgenx Package starcgenx is a Static Analysis Type Assertion shim and Registration Code Generator which provides an extractor to extract types from a package, in order to generate appropriate shims for a package so code can be generated for it. | Package starcgenx is a Static Analysis Type Assertion shim and Registration Code Generator which provides an extractor to extract types from a package, in order to generate appropriate shims for a package so code can be generated for it. |
structx Package structx provides utilities for working with structs. | Package structx provides utilities for working with structs. |
syscallx Package syscallx provides system call utilities that attempt to hide platform differences. | Package syscallx provides system call utilities that attempt to hide platform differences. |
x | |
beamx Package beamx is a convenience package for beam. | Package beamx is a convenience package for beam. |
debug Package debug contains pipeline components that may help in debugging pipeline issues. | Package debug contains pipeline components that may help in debugging pipeline issues. |
hooks/perf Package perf is to add performance measuring hooks to a runner, such as cpu, heap, or trace profiles. | Package perf is to add performance measuring hooks to a runner, such as cpu, heap, or trace profiles. |