- Notifications
You must be signed in to change notification settings - Fork34
A place to keep useful golang functions and small libraries
License
mailgun/holster
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
A place to holster mailgun's golang libraries and tools
To use, run the following command:
go get github.com/mailgun/holster/v4
A drop in (almost) replacement for the systemtime
package to make scheduledevents deterministic in tests. See theclock readme for details
HttpSign is a library for signing and authenticating HTTP requests between web services.See thehttpsign readme for details
A distributed election implementation using etcd to coordinate electionsSee theetcd v3 readme for details
Errors is a fork ofhttps://github.com/pkg/errors with additionalfunctions for improving the relationship between structured logging and error handling in goSee theerrors readme for details
Waitgroup is a simplification ofsync.Waitgroup
with item and error collection included.
Running many short term routines over a collection with.Run()
import"github.com/mailgun/holster/v4/syncutil"varwg syncutil.WaitGroupfor_,item:=rangeitems {wg.Run(func(iteminterface{})error {// Do some long running thing with the itemfmt.Printf("Item: %+v\n",item.(MyItem))returnnil },item)}errs:=wg.Wait()iferrs!=nil {fmt.Printf("Errs: %+v\n",errs)}
Clean up long running routines easily with.Loop()
import"github.com/mailgun/holster/v4/syncutil"pipe:=make(chanint32,0)varwg syncutil.WaitGroupvarcountint32wg.Loop(func()bool {select {caseinc,ok:=<-pipe:// If the pipe was closed, return falseif!ok {returnfalse }atomic.AddInt32(&count,inc) }returntrue})// Feed the loop some numbers and close the pipepipe<-1pipe<-5pipe<-10close(pipe)// Wait for the loop to exitwg.Wait()
Loop.Until()
.Stop()
is called
import"github.com/mailgun/holster/v4/syncutil"varwg syncutil.WaitGroupwg.Until(func(donechanstruct{})bool {select {case<-time.Tick(time.Second):// Do some periodic thingcase<-done:returnfalse }returntrue})// Close the done channel and wait for the routine to exitwg.Stop()
FanOut spawns a new go-routine each time.Run()
is called untilsize
is reached,subsequent calls to.Run()
will block until previously.Run()
routines have completed.Allowing the user to control how many routines will run simultaneously..Wait()
thencollects any errors from the routines once they have all completed. FanOut allows youto control how many goroutines spawn at a time while WaitGroup will not.
import"github.com/mailgun/holster/v4/syncutil"// Insert records into the database 10 at a timefanOut:=syncutil.NewFanOut(10)for_,item:=rangeitems {fanOut.Run(func(castinterface{})error {item:=cast.(Item)returndb.ExecuteQuery("insert into tbl (id, field) values (?, ?)",item.Id,item.Field) },item)}// Collect errorserrs:=fanOut.Wait()iferrs!=nil {// do something with errs}
Implements a Least Recently Used Cache with optional TTL and stats collection
This is a LRU cache based offgithub.com/golang/groupcache/lru expandedwith the following
Peek()
- Get the value without updating the expiration or last used or statsKeys()
- Get a list of keys at this point in timeStats()
- Returns stats about the current state of the cacheAddWithTTL()
- Adds a value to the cache with a expiration timeEach()
- Concurrent non blocking access to each item in the cacheMap()
- Efficient blocking modification to each item in the cache
TTL is evaluated during calls to.Get()
if the entry is past the requested TTL.Get()
removes the entry from the cache counts a miss and returns notok
import"github.com/mailgun/holster/v4/collections"cache:=collections.NewLRUCache(5000)gofunc() {for {select {// Send cache stats every 5 secondscase<-time.Tick(time.Second*5):stats:=cache.GetStats()metrics.Gauge(metrics.Metric("demo","cache","size"),int64(stats.Size),1)metrics.Gauge(metrics.Metric("demo","cache","hit"),stats.Hit,1)metrics.Gauge(metrics.Metric("demo","cache","miss"),stats.Miss,1) } }}()cache.Add("key","value")value,ok:=cache.Get("key")for_,key:=rangecache.Keys() {value,ok:=cache.Get(key)ifok {fmt.Printf("Key: %+v Value %+v\n",key,value) }}
ExpireCache is a cache which expires entries only after 2 conditions are met
- The Specified TTL has expired
- The item has been processed with ExpireCache.Each()
This is an unbounded cache which guaranties each item in the cachehas been processed before removal. This cache is useful if you need anunbounded queue, that can also act like an LRU cache.
Every time an item is touched by.Get()
or.Set()
the duration isupdated which ensures items in frequent use stay in the cache. Processingthe cache with.Each()
can modify the item in the cache withoutupdating the expiration time by using the.Update()
method.
The cache can also return statistics which can be used to graph cache usageand size.
NOTE: Because this is an unbounded cache, the user MUST process the cachewith.Each()
regularly! Else the cache items will never expire and the cachewill eventually eat all the memory on the system
import"github.com/mailgun/holster/v4/collections"// How often the cache is processedsyncInterval:=time.Second*10// In this example the cache TTL is slightly less than the sync interval// such that before the first sync; items that where only accessed once// between sync intervals should expire. This technique is useful if you// have a long syncInterval and are only interested in keeping items// that where accessed during the sync cyclecache:=collections.NewExpireCache((syncInterval/5)*4)gofunc() {for {select {// Sync the cache with the database every 10 seconds// Items in the cache will not be expired until this completes without errorcase<-time.Tick(syncInterval):// Each() uses FanOut() to run several of these concurrently, in this// example we are capped at running 10 concurrently, Use 0 or 1 if you// don't need concurrent FanOutcache.Each(10,func(keyinterface{},valueinterface{})error {item:=value.(Item)returndb.ExecuteQuery("insert into tbl (id, field) values (?, ?)",item.Id,item.Field) })// Periodically send stats about the cachecase<-time.Tick(time.Second*5):stats:=cache.GetStats()metrics.Gauge(metrics.Metric("demo","cache","size"),int64(stats.Size),1)metrics.Gauge(metrics.Metric("demo","cache","hit"),stats.Hit,1)metrics.Gauge(metrics.Metric("demo","cache","miss"),stats.Miss,1) } }}()cache.Add("domain-id",Item{Id:1,Field:"value"},item,ok:=cache.Get("domain-id")ifok {fmt.Printf("%+v\n",item.(Item))}
Provides a threadsafe time to live map useful for holding a bounded set of key'd valuesthat can expire before being accessed. The expiration of values is calculatedwhen the value is accessed or the map capacity has been reached.
import"github.com/mailgun/holster/v4/collections"ttlMap:=collections.NewTTLMap(10)clock.Freeze(time.Now())// Set a value that expires in 5 secondsttlMap.Set("one","one",5)// Set a value that expires in 10 secondsttlMap.Set("two","twp",10)// Simulate sleeping for 6 secondsclock.Sleep(time.Second*6)// Retrieve the expired value and un-expired value_,ok1:=ttlMap.Get("one")_,ok2:=ttlMap.Get("two")fmt.Printf("value one exists: %t\n",ok1)fmt.Printf("value two exists: %t\n",ok2)// Output: value one exists: false// value two exists: true
These functions assist in determining if values are the golang defaultand if so, set a value
import"github.com/mailgun/holster/v4/setter"varvaluestring// Returns true if 'value' is zero (the default golang value)setter.IsZero(value)// Returns true if 'value' is zero (the default golang value)setter.IsZeroValue(reflect.ValueOf(value))// If 'dest' is empty or of zero value, assign the default value.// This panics if the value is not a pointer or if value and// default value are not of the same type.varconfigstruct {FoostringBarint}setter.SetDefault(&config.Foo,"default")setter.SetDefault(&config.Bar,200)// Supply additional default values and SetDefault will// choose the first default that is not of zero valuesetter.SetDefault(&config.Foo,os.Getenv("FOO"),"default")// Use 'SetOverride() to assign the first value that is not empty or of zero// value. The following will override the config file if 'foo' is provided via// the cli or defined in the environment.loadFromFile(&config)argFoo=flag.String("foo","","foo via cli arg")setter.SetOverride(&config.Foo,*argFoo,os.Env("FOO"))
funcNewImplementation()MyInterface {// Type and Value are not nilvarp*MyImplementation=nilreturnp}thing:=NewImplementation()assert.False(t,thing==nil)assert.True(t,setter.IsNil(thing))assert.False(t,setter.IsNil(&MyImplementation{}))
Get a value from an environment variable or return the provided default
import"github.com/mailgun/holster/v4/config"varconf= sandra.CassandraConfig{Nodes: []string{config.GetEnv("CASSANDRA_ENDPOINT","127.0.0.1:9042")},Keyspace:"test",}
A set of functions to generate random domain names and strings useful for testing
// Return a random string 10 characters long made up of runes passedutil.RandomRunes("prefix-",10,util.AlphaRunes,holster.NumericRunes)// Return a random string 10 characters long made up of Alpha Characters A-Z, a-zutil.RandomAlpha("prefix-",10)// Return a random string 10 characters long made up of Alpha and Numeric Characters A-Z, a-z, 0-9util.RandomString("prefix-",10)// Return a random item from the list givenutil.RandomItem("foo","bar","fee","bee")// Return a random domain name in the form "random-numbers.[gov, net, com, ..]"util.RandomDomainName()
Get the go routine id (useful for logging)
import"github.com/mailgun/holster/v4/callstack"logrus.Infof("[%d] Info about this go routine",stack.GoRoutineID())
Checks if a given slice of strings contains the provided string.If a modifier func is provided, it is called with the slice item before the comparation.
import"github.com/mailgun/holster/v4/slice"haystack:= []string{"one","Two","Three"}slice.ContainsString("two",haystack,strings.ToLower)// trueslice.ContainsString("two",haystack,nil)// false
Provides a Priority Queue implementation as describedhere
import"github.com/mailgun/holster/v4/collections"queue:=collections.NewPriorityQueue()queue.Push(&collections.PQItem{Value:"thing3",Priority:3,})queue.Push(&collections.PQItem{Value:"thing1",Priority:1,})queue.Push(&collections.PQItem{Value:"thing2",Priority:2,})// Pops item off the queue according to the priority instead of the Push() orderitem:=queue.Pop()fmt.Printf("Item: %s",item.Value.(string))// Output: Item: thing1
Allow the user to notify multiple goroutines of an event. This implementation guarantees every goroutine will wakefor every broadcast sent. In the event the goroutine falls behind and more broadcasts() are sent than the goroutinehas handled the broadcasts are buffered up to 10,000 broadcasts. Once the broadcast buffer limit is reached callsto broadcast() will block until goroutines consuming the broadcasts can catch up.
import"github.com/mailgun/holster/v4/syncutil"broadcaster:=syncutil.NewBroadcaster()done:=make(chanstruct{})varmutex sync.Mutexvarchat []string// Start some simple chat clients that are responsible for// sending the contents of the []chat slice to their clientsfori:=0;i<2;i++ {gofunc(idxint) {varclientIndexintfor {mutex.Lock()ifclientIndex!=len(chat) {// Pretend we are sending a message to our client via a socketfmt.Printf("Client [%d] Chat: %s\n",idx,chat[clientIndex])clientIndex++mutex.Unlock()continue }mutex.Unlock()// Wait for more chats to be added to chat[]select {case<-broadcaster.WaitChan(string(idx)):case<-done:return } } }(i) }// Add some chat lines to the []chat slicefori:=0;i<5;i++ {mutex.Lock()chat=append(chat,fmt.Sprintf("Message '%d'",i))mutex.Unlock()// Notify any clients there are new chats to readbroadcaster.Broadcast() }// Tell the clients to quitclose(done)
Functional test helper which will run a suite of tests until the entire suitepasses, or all attempts have been exhausted.
import ("github.com/mailgun/holster/v4/testutil""github.com/stretchr/testify/require""github.com/stretchr/testify/assert")funcTestUntilPass(t*testing.T) {rand.Seed(time.Now().UnixNano())varvaluestringts:=httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,r*http.Request) {ifr.Method==http.MethodPost {// Sleep some rand amount to time to simulate some// async process happening on the servertime.Sleep(time.Duration(rand.Intn(10))*time.Millisecond)// Set the valuevalue=r.FormValue("value") }else {fmt.Fprintln(w,value) } }))deferts.Close()// Start the async process that produces a value on the serverhttp.PostForm(ts.URL, url.Values{"value": []string{"batch job completed"}})// Keep running this until the batch job completes or attempts are exhaustedtestutil.UntilPass(t,10,time.Millisecond*100,func(t testutil.TestingT) {r,err:=http.Get(ts.URL)// use of `require` will abort the current test here and tell UntilPass() to// run again after 100 millisecondsrequire.NoError(t,err)// Or you can check if the assert returned true or notif!assert.Equal(t,200,r.StatusCode) {return }b,err:=ioutil.ReadAll(r.Body)require.NoError(t,err)assert.Equal(t,"batch job completed\n",string(b)) })}
Waits until the test can connect to the TCP/HTTP server before continuing the test
import ("github.com/mailgun/holster/v4/testutil""golang.org/x/net/nettest""github.com/stretchr/testify/require")funcTestUntilConnect(t*testing.T) {ln,err:=nettest.NewLocalListener("tcp")require.NoError(t,err)gofunc() {cn,err:=ln.Accept()require.NoError(t,err)cn.Close() }()// Wait until we can connect, then continue with the testtestutil.UntilConnect(t,10,time.Millisecond*100,ln.Addr().String())}
Retries a function until the function returns error = nil or until the context is deadline is exceeded
ctx,cancel:=context.WithTimeout(context.Background(),time.Second*20)defercancel()err:=retry.Until(ctx,retry.Interval(time.Millisecond*10),func(ctx context.Context,attint)error {res,err:=http.Get("http://example.com/get")iferr!=nil {returnerr }ifres.StatusCode!=http.StatusOK {returnerrors.New("expected status 200") }// Do something with the bodyreturnnil})iferr!=nil {panic(err)}
Backoff functions provided
retry.Attempts(10, time.Millisecond*10)
retries up to10
attemptsretry.Interval(time.Millisecond*10)
retries at an interval indefinitely or until context is cancelledretry.ExponentialBackoff{ Min: time.Millisecond, Max: time.Millisecond * 100, Factor: 2}
retriesat an exponential backoff interval. Can accept an optionalAttempts
which will limit the number of attempts
Runs a function asynchronously and retries it until it succeeds, or the context iscancelled orStop()
is called. This is useful in distributed programming whereyou know a remote thing will eventually succeed, but you need to keep trying untilthe remote thing succeeds, or we are told to shutdown.
ctx:=context.Background()async:=retry.NewRetryAsync()backOff:=&retry.ExponentialBackoff{Min:time.Millisecond,Max:time.Millisecond*100,Factor:2,Attempts:10,}id:=createNewEC2("my-new-server")async.Async(id,ctx,backOff,func(ctx context.Context,iint)error {// Waits for a new EC2 instance to be created then updates the config and exitsiferr:=updateInstance(id,mySettings);err!=nil {returnerr }returnnil})// Wait for all the asyncs to completeasync.Wait()
Tracing tools using OpenTelemetry client SDK and Jaeger Tracing server.
Seetracingreadme fordetails.
See package directoryctxutil
.
Use functionsctxutil.WithDeadline()
/WithTimeout()
instead of thecontext
equivalents to log details of the deadline and source file:line where it wasset. Must enable debug logging.
Please read theContribution Guidelines before sending patches.
About
A place to keep useful golang functions and small libraries