Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Go micro batch library

License

NotificationsYou must be signed in to change notification settings

fillmore-labs/microbatch

Repository files navigation

Go ReferenceBuild StatusGitHub WorkflowTest CoverageMaintainabilityGo Report CardLicenseFOSSA Status

Micro-batching is a technique often used in stream processing to achieve near real-time computation while reducing theoverhead compared to single record processing.It balances latency versus throughput and enables simplified parallelization while optimizing resource utilization.

See also the definition in theHazelcast Glossary andexplanation byJakob Jenkov.Popular examples areSpark Structured StreamingandApache Kafka.It is also used in other contexts, like theFacebook DataLoader.

Usage

Try the exampleat the Go Playground.

ImplementJob andJobResult

type (Jobstruct {IDstring`json:"id"`Requeststring`json:"body"`}JobResultstruct {IDstring`json:"id"`Responsestring`json:"body"`Errorstring`json:"error"`}Jobs       []*JobJobResults []*JobResultRemoteErrorstruct{msgstring })func (q*Job)JobID()string       {returnq.ID }func (r*JobResult)JobID()string {returnr.ID }func (eRemoteError)Error()string {returne.msg }// unwrap unwraps a JobResult to payload and error.funcunwrap(r*JobResult,errerror) (string,error) {iferr!=nil {return"",err}ifr.Error!="" {return"",RemoteError{r.Error}}returnr.Response,nil}

Implement the Batch Processor

funcprocessJobs(jobsJobs) (JobResults,error) {results:=make(JobResults,0,len(jobs))for_,job:=rangejobs {result:=&JobResult{ID:job.ID,Response:"Processed job "+job.ID,}results=append(results,result)}returnresults,nil}

Use the Batcher

const (batchSize=3maxBatchDuration=10*time.Milliseconditerations=5)// Initializebatcher:=microbatch.NewBatcher(processJobs,(*Job).JobID,(*JobResult).JobID,microbatch.WithSize(batchSize),microbatch.WithTimeout(maxBatchDuration),)ctx,cancel:=context.WithCancel(context.Background())defercancel()varwg sync.WaitGroupfori:=1;i<=iterations;i++ {future:=batcher.Submit(&Job{ID:strconv.Itoa(i)})wg.Add(1)gofunc(iint) {deferwg.Done()result,err:=unwrap(future.Await(ctx))iferr==nil {fmt.Println(result)}else {fmt.Printf("Error executing job %d: %v\n",i,err)}}(i)}// Shut downbatcher.Send()wg.Wait()

Design

The package is designed to handle request batching efficiently, with a focus on code testability and modulararchitecture.The codebase is organized into two packages: the publicmicrobatch.Batcher structure and an internal helper,processor.Processor.

Motivation

The primary design goal is to enhance code testability, enabling unit testing of individual components in isolation,with less emphasis on immediate performance gains.

While an alternative approach might involve constructing the correlation map during batch collection for performancereasons, the current design prioritizes testability and separation of concerns.In this context, the batcher remains independent of correlation IDs, focusing solely on batch size and timingdecisions.The responsibility of correlating requests and responses is encapsulated within the processor, contributing to acleaner and more modular architecture.

Component Description

By maintaining a modular structure and addressing concurrency issues, the codebase is designed to achieve goodtestability while still maintaining high performance and offering flexibility for future optimizations.The deliberate use of channels and immutability contributes to a more straightforward and reliable execution.

Public Interface (microbatch.Batcher)

The public interface is the entry point for users interacting with the batching functionality.It is designed to be thread-safe, allowing safe invocation from any goroutine and simplifying usage.The batcher is responsible for managing queued requests and initiating batch processing.The batcher maintains an array of queued requests and, when a complete batch is formed or a maximum collection timeis reached, spawns a processor.The processor takes ownership of the queued requests, correlating individual requests and responses.

Processor (processor.Processor)

The processor wraps the user-supplied processor, handling the correlation of requests and responses.Once constructed, the fields are accessed read-only, ensuring immutability.This enables multiple processors to operate in parallel without conflicts.By encapsulating the responsibility of correlation, the processor contributes to a modular and clean architecture,promoting separation of concerns.

Links

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp