- Notifications
You must be signed in to change notification settings - Fork0
fillmore-labs/microbatch
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Micro-batching is a technique often used in stream processing to achieve near real-time computation while reducing theoverhead compared to single record processing.It balances latency versus throughput and enables simplified parallelization while optimizing resource utilization.
See also the definition in theHazelcast Glossary andexplanation byJakob Jenkov.Popular examples areSpark Structured StreamingandApache Kafka.It is also used in other contexts, like theFacebook DataLoader.
Try the exampleat the Go Playground.
type (Jobstruct {IDstring`json:"id"`Requeststring`json:"body"`}JobResultstruct {IDstring`json:"id"`Responsestring`json:"body"`Errorstring`json:"error"`}Jobs []*JobJobResults []*JobResultRemoteErrorstruct{msgstring })func (q*Job)JobID()string {returnq.ID }func (r*JobResult)JobID()string {returnr.ID }func (eRemoteError)Error()string {returne.msg }// unwrap unwraps a JobResult to payload and error.funcunwrap(r*JobResult,errerror) (string,error) {iferr!=nil {return"",err}ifr.Error!="" {return"",RemoteError{r.Error}}returnr.Response,nil}
funcprocessJobs(jobsJobs) (JobResults,error) {results:=make(JobResults,0,len(jobs))for_,job:=rangejobs {result:=&JobResult{ID:job.ID,Response:"Processed job "+job.ID,}results=append(results,result)}returnresults,nil}
const (batchSize=3maxBatchDuration=10*time.Milliseconditerations=5)// Initializebatcher:=microbatch.NewBatcher(processJobs,(*Job).JobID,(*JobResult).JobID,microbatch.WithSize(batchSize),microbatch.WithTimeout(maxBatchDuration),)ctx,cancel:=context.WithCancel(context.Background())defercancel()varwg sync.WaitGroupfori:=1;i<=iterations;i++ {future:=batcher.Submit(&Job{ID:strconv.Itoa(i)})wg.Add(1)gofunc(iint) {deferwg.Done()result,err:=unwrap(future.Await(ctx))iferr==nil {fmt.Println(result)}else {fmt.Printf("Error executing job %d: %v\n",i,err)}}(i)}// Shut downbatcher.Send()wg.Wait()
The package is designed to handle request batching efficiently, with a focus on code testability and modulararchitecture.The codebase is organized into two packages: the publicmicrobatch.Batcher structure and an internal helper,processor.Processor.
The primary design goal is to enhance code testability, enabling unit testing of individual components in isolation,with less emphasis on immediate performance gains.
While an alternative approach might involve constructing the correlation map during batch collection for performancereasons, the current design prioritizes testability and separation of concerns.In this context, the batcher remains independent of correlation IDs, focusing solely on batch size and timingdecisions.The responsibility of correlating requests and responses is encapsulated within the processor, contributing to acleaner and more modular architecture.
By maintaining a modular structure and addressing concurrency issues, the codebase is designed to achieve goodtestability while still maintaining high performance and offering flexibility for future optimizations.The deliberate use of channels and immutability contributes to a more straightforward and reliable execution.
The public interface is the entry point for users interacting with the batching functionality.It is designed to be thread-safe, allowing safe invocation from any goroutine and simplifying usage.The batcher is responsible for managing queued requests and initiating batch processing.The batcher maintains an array of queued requests and, when a complete batch is formed or a maximum collection timeis reached, spawns a processor.The processor takes ownership of the queued requests, correlating individual requests and responses.
The processor wraps the user-supplied processor, handling the correlation of requests and responses.Once constructed, the fields are accessed read-only, ensuring immutability.This enables multiple processors to operate in parallel without conflicts.By encapsulating the responsibility of correlation, the processor contributes to a modular and clean architecture,promoting separation of concerns.
About
Go micro batch library
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.