Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A place to keep useful golang functions and small libraries

License

NotificationsYou must be signed in to change notification settings

mailgun/holster

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

A place to holster mailgun's golang libraries and tools

Installation

To use, run the following command:

go get github.com/mailgun/holster/v4

Clock

A drop in (almost) replacement for the systemtime package to make scheduledevents deterministic in tests. See theclock readme for details

HttpSign

HttpSign is a library for signing and authenticating HTTP requests between web services.See thehttpsign readme for details

Distributed Election

A distributed election implementation using etcd to coordinate electionsSee theetcd v3 readme for details

Errors

Errors is a fork ofhttps://github.com/pkg/errors with additionalfunctions for improving the relationship between structured logging and error handling in goSee theerrors readme for details

WaitGroup

Waitgroup is a simplification ofsync.Waitgroup with item and error collection included.

Running many short term routines over a collection with.Run()

import"github.com/mailgun/holster/v4/syncutil"varwg syncutil.WaitGroupfor_,item:=rangeitems {wg.Run(func(iteminterface{})error {// Do some long running thing with the itemfmt.Printf("Item: %+v\n",item.(MyItem))returnnil    },item)}errs:=wg.Wait()iferrs!=nil {fmt.Printf("Errs: %+v\n",errs)}

Clean up long running routines easily with.Loop()

import"github.com/mailgun/holster/v4/syncutil"pipe:=make(chanint32,0)varwg syncutil.WaitGroupvarcountint32wg.Loop(func()bool {select {caseinc,ok:=<-pipe:// If the pipe was closed, return falseif!ok {returnfalse        }atomic.AddInt32(&count,inc)    }returntrue})// Feed the loop some numbers and close the pipepipe<-1pipe<-5pipe<-10close(pipe)// Wait for the loop to exitwg.Wait()

Loop.Until().Stop() is called

import"github.com/mailgun/holster/v4/syncutil"varwg syncutil.WaitGroupwg.Until(func(donechanstruct{})bool {select {case<-time.Tick(time.Second):// Do some periodic thingcase<-done:returnfalse    }returntrue})// Close the done channel and wait for the routine to exitwg.Stop()

FanOut

FanOut spawns a new go-routine each time.Run() is called untilsize is reached,subsequent calls to.Run() will block until previously.Run() routines have completed.Allowing the user to control how many routines will run simultaneously..Wait() thencollects any errors from the routines once they have all completed. FanOut allows youto control how many goroutines spawn at a time while WaitGroup will not.

import"github.com/mailgun/holster/v4/syncutil"// Insert records into the database 10 at a timefanOut:=syncutil.NewFanOut(10)for_,item:=rangeitems {fanOut.Run(func(castinterface{})error {item:=cast.(Item)returndb.ExecuteQuery("insert into tbl (id, field) values (?, ?)",item.Id,item.Field)    },item)}// Collect errorserrs:=fanOut.Wait()iferrs!=nil {// do something with errs}

LRUCache

Implements a Least Recently Used Cache with optional TTL and stats collection

This is a LRU cache based offgithub.com/golang/groupcache/lru expandedwith the following

  • Peek() - Get the value without updating the expiration or last used or stats
  • Keys() - Get a list of keys at this point in time
  • Stats() - Returns stats about the current state of the cache
  • AddWithTTL() - Adds a value to the cache with a expiration time
  • Each() - Concurrent non blocking access to each item in the cache
  • Map() - Efficient blocking modification to each item in the cache

TTL is evaluated during calls to.Get() if the entry is past the requested TTL.Get()removes the entry from the cache counts a miss and returns notok

import"github.com/mailgun/holster/v4/collections"cache:=collections.NewLRUCache(5000)gofunc() {for {select {// Send cache stats every 5 secondscase<-time.Tick(time.Second*5):stats:=cache.GetStats()metrics.Gauge(metrics.Metric("demo","cache","size"),int64(stats.Size),1)metrics.Gauge(metrics.Metric("demo","cache","hit"),stats.Hit,1)metrics.Gauge(metrics.Metric("demo","cache","miss"),stats.Miss,1)        }    }}()cache.Add("key","value")value,ok:=cache.Get("key")for_,key:=rangecache.Keys() {value,ok:=cache.Get(key)ifok {fmt.Printf("Key: %+v Value %+v\n",key,value)    }}

ExpireCache

ExpireCache is a cache which expires entries only after 2 conditions are met

  1. The Specified TTL has expired
  2. The item has been processed with ExpireCache.Each()

This is an unbounded cache which guaranties each item in the cachehas been processed before removal. This cache is useful if you need anunbounded queue, that can also act like an LRU cache.

Every time an item is touched by.Get() or.Set() the duration isupdated which ensures items in frequent use stay in the cache. Processingthe cache with.Each() can modify the item in the cache withoutupdating the expiration time by using the.Update() method.

The cache can also return statistics which can be used to graph cache usageand size.

NOTE: Because this is an unbounded cache, the user MUST process the cachewith.Each() regularly! Else the cache items will never expire and the cachewill eventually eat all the memory on the system

import"github.com/mailgun/holster/v4/collections"// How often the cache is processedsyncInterval:=time.Second*10// In this example the cache TTL is slightly less than the sync interval// such that before the first sync; items that where only accessed once// between sync intervals should expire. This technique is useful if you// have a long syncInterval and are only interested in keeping items// that where accessed during the sync cyclecache:=collections.NewExpireCache((syncInterval/5)*4)gofunc() {for {select {// Sync the cache with the database every 10 seconds// Items in the cache will not be expired until this completes without errorcase<-time.Tick(syncInterval):// Each() uses FanOut() to run several of these concurrently, in this// example we are capped at running 10 concurrently, Use 0 or 1 if you// don't need concurrent FanOutcache.Each(10,func(keyinterface{},valueinterface{})error {item:=value.(Item)returndb.ExecuteQuery("insert into tbl (id, field) values (?, ?)",item.Id,item.Field)            })// Periodically send stats about the cachecase<-time.Tick(time.Second*5):stats:=cache.GetStats()metrics.Gauge(metrics.Metric("demo","cache","size"),int64(stats.Size),1)metrics.Gauge(metrics.Metric("demo","cache","hit"),stats.Hit,1)metrics.Gauge(metrics.Metric("demo","cache","miss"),stats.Miss,1)        }    }}()cache.Add("domain-id",Item{Id:1,Field:"value"},item,ok:=cache.Get("domain-id")ifok {fmt.Printf("%+v\n",item.(Item))}

TTLMap

Provides a threadsafe time to live map useful for holding a bounded set of key'd valuesthat can expire before being accessed. The expiration of values is calculatedwhen the value is accessed or the map capacity has been reached.

import"github.com/mailgun/holster/v4/collections"ttlMap:=collections.NewTTLMap(10)clock.Freeze(time.Now())// Set a value that expires in 5 secondsttlMap.Set("one","one",5)// Set a value that expires in 10 secondsttlMap.Set("two","twp",10)// Simulate sleeping for 6 secondsclock.Sleep(time.Second*6)// Retrieve the expired value and un-expired value_,ok1:=ttlMap.Get("one")_,ok2:=ttlMap.Get("two")fmt.Printf("value one exists: %t\n",ok1)fmt.Printf("value two exists: %t\n",ok2)// Output: value one exists: false// value two exists: true

Default values

These functions assist in determining if values are the golang defaultand if so, set a value

import"github.com/mailgun/holster/v4/setter"varvaluestring// Returns true if 'value' is zero (the default golang value)setter.IsZero(value)// Returns true if 'value' is zero (the default golang value)setter.IsZeroValue(reflect.ValueOf(value))// If 'dest' is empty or of zero value, assign the default value.// This panics if the value is not a pointer or if value and// default value are not of the same type.varconfigstruct {FoostringBarint}setter.SetDefault(&config.Foo,"default")setter.SetDefault(&config.Bar,200)// Supply additional default values and SetDefault will// choose the first default that is not of zero valuesetter.SetDefault(&config.Foo,os.Getenv("FOO"),"default")// Use 'SetOverride() to assign the first value that is not empty or of zero// value.  The following will override the config file if 'foo' is provided via// the cli or defined in the environment.loadFromFile(&config)argFoo=flag.String("foo","","foo via cli arg")setter.SetOverride(&config.Foo,*argFoo,os.Env("FOO"))

Check for Nil interface

funcNewImplementation()MyInterface {// Type and Value are not nilvarp*MyImplementation=nilreturnp}thing:=NewImplementation()assert.False(t,thing==nil)assert.True(t,setter.IsNil(thing))assert.False(t,setter.IsNil(&MyImplementation{}))

GetEnv

Get a value from an environment variable or return the provided default

import"github.com/mailgun/holster/v4/config"varconf= sandra.CassandraConfig{Nodes:    []string{config.GetEnv("CASSANDRA_ENDPOINT","127.0.0.1:9042")},Keyspace:"test",}

Random Things

A set of functions to generate random domain names and strings useful for testing

// Return a random string 10 characters long made up of runes passedutil.RandomRunes("prefix-",10,util.AlphaRunes,holster.NumericRunes)// Return a random string 10 characters long made up of Alpha Characters A-Z, a-zutil.RandomAlpha("prefix-",10)// Return a random string 10 characters long made up of Alpha and Numeric Characters A-Z, a-z, 0-9util.RandomString("prefix-",10)// Return a random item from the list givenutil.RandomItem("foo","bar","fee","bee")// Return a random domain name in the form "random-numbers.[gov, net, com, ..]"util.RandomDomainName()

GoRoutine ID

Get the go routine id (useful for logging)

import"github.com/mailgun/holster/v4/callstack"logrus.Infof("[%d] Info about this go routine",stack.GoRoutineID())

ContainsString

Checks if a given slice of strings contains the provided string.If a modifier func is provided, it is called with the slice item before the comparation.

import"github.com/mailgun/holster/v4/slice"haystack:= []string{"one","Two","Three"}slice.ContainsString("two",haystack,strings.ToLower)// trueslice.ContainsString("two",haystack,nil)// false

Priority Queue

Provides a Priority Queue implementation as describedhere

import"github.com/mailgun/holster/v4/collections"queue:=collections.NewPriorityQueue()queue.Push(&collections.PQItem{Value:"thing3",Priority:3,})queue.Push(&collections.PQItem{Value:"thing1",Priority:1,})queue.Push(&collections.PQItem{Value:"thing2",Priority:2,})// Pops item off the queue according to the priority instead of the Push() orderitem:=queue.Pop()fmt.Printf("Item: %s",item.Value.(string))// Output: Item: thing1

Broadcaster

Allow the user to notify multiple goroutines of an event. This implementation guarantees every goroutine will wakefor every broadcast sent. In the event the goroutine falls behind and more broadcasts() are sent than the goroutinehas handled the broadcasts are buffered up to 10,000 broadcasts. Once the broadcast buffer limit is reached callsto broadcast() will block until goroutines consuming the broadcasts can catch up.

import"github.com/mailgun/holster/v4/syncutil"broadcaster:=syncutil.NewBroadcaster()done:=make(chanstruct{})varmutex sync.Mutexvarchat []string// Start some simple chat clients that are responsible for// sending the contents of the []chat slice to their clientsfori:=0;i<2;i++ {gofunc(idxint) {varclientIndexintfor {mutex.Lock()ifclientIndex!=len(chat) {// Pretend we are sending a message to our client via a socketfmt.Printf("Client [%d] Chat: %s\n",idx,chat[clientIndex])clientIndex++mutex.Unlock()continue                }mutex.Unlock()// Wait for more chats to be added to chat[]select {case<-broadcaster.WaitChan(string(idx)):case<-done:return                }            }        }(i)    }// Add some chat lines to the []chat slicefori:=0;i<5;i++ {mutex.Lock()chat=append(chat,fmt.Sprintf("Message '%d'",i))mutex.Unlock()// Notify any clients there are new chats to readbroadcaster.Broadcast()    }// Tell the clients to quitclose(done)

UntilPass

Functional test helper which will run a suite of tests until the entire suitepasses, or all attempts have been exhausted.

import ("github.com/mailgun/holster/v4/testutil""github.com/stretchr/testify/require""github.com/stretchr/testify/assert")funcTestUntilPass(t*testing.T) {rand.Seed(time.Now().UnixNano())varvaluestringts:=httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,r*http.Request) {ifr.Method==http.MethodPost {// Sleep some rand amount to time to simulate some// async process happening on the servertime.Sleep(time.Duration(rand.Intn(10))*time.Millisecond)// Set the valuevalue=r.FormValue("value")        }else {fmt.Fprintln(w,value)        }    }))deferts.Close()// Start the async process that produces a value on the serverhttp.PostForm(ts.URL, url.Values{"value": []string{"batch job completed"}})// Keep running this until the batch job completes or attempts are exhaustedtestutil.UntilPass(t,10,time.Millisecond*100,func(t testutil.TestingT) {r,err:=http.Get(ts.URL)// use of `require` will abort the current test here and tell UntilPass() to// run again after 100 millisecondsrequire.NoError(t,err)// Or you can check if the assert returned true or notif!assert.Equal(t,200,r.StatusCode) {return        }b,err:=ioutil.ReadAll(r.Body)require.NoError(t,err)assert.Equal(t,"batch job completed\n",string(b))    })}

UntilConnect

Waits until the test can connect to the TCP/HTTP server before continuing the test

import ("github.com/mailgun/holster/v4/testutil""golang.org/x/net/nettest""github.com/stretchr/testify/require")funcTestUntilConnect(t*testing.T) {ln,err:=nettest.NewLocalListener("tcp")require.NoError(t,err)gofunc() {cn,err:=ln.Accept()require.NoError(t,err)cn.Close()    }()// Wait until we can connect, then continue with the testtestutil.UntilConnect(t,10,time.Millisecond*100,ln.Addr().String())}

Retry Until

Retries a function until the function returns error = nil or until the context is deadline is exceeded

ctx,cancel:=context.WithTimeout(context.Background(),time.Second*20)defercancel()err:=retry.Until(ctx,retry.Interval(time.Millisecond*10),func(ctx context.Context,attint)error {res,err:=http.Get("http://example.com/get")iferr!=nil {returnerr    }ifres.StatusCode!=http.StatusOK {returnerrors.New("expected status 200")    }// Do something with the bodyreturnnil})iferr!=nil {panic(err)}

Backoff functions provided

  • retry.Attempts(10, time.Millisecond*10) retries up to10 attempts
  • retry.Interval(time.Millisecond*10) retries at an interval indefinitely or until context is cancelled
  • retry.ExponentialBackoff{ Min: time.Millisecond, Max: time.Millisecond * 100, Factor: 2} retriesat an exponential backoff interval. Can accept an optionalAttempts which will limit the number of attempts

Retry Async

Runs a function asynchronously and retries it until it succeeds, or the context iscancelled orStop() is called. This is useful in distributed programming whereyou know a remote thing will eventually succeed, but you need to keep trying untilthe remote thing succeeds, or we are told to shutdown.

ctx:=context.Background()async:=retry.NewRetryAsync()backOff:=&retry.ExponentialBackoff{Min:time.Millisecond,Max:time.Millisecond*100,Factor:2,Attempts:10,}id:=createNewEC2("my-new-server")async.Async(id,ctx,backOff,func(ctx context.Context,iint)error {// Waits for a new EC2 instance to be created then updates the config and exitsiferr:=updateInstance(id,mySettings);err!=nil {returnerr    }returnnil})// Wait for all the asyncs to completeasync.Wait()

OpenTelemetry

Tracing tools using OpenTelemetry client SDK and Jaeger Tracing server.

Seetracingreadme fordetails.

Context Utilities

See package directoryctxutil.

Use functionsctxutil.WithDeadline()/WithTimeout() instead of thecontextequivalents to log details of the deadline and source file:line where it wasset. Must enable debug logging.

Contributing code

Please read theContribution Guidelines before sending patches.


[8]ページ先頭

©2009-2025 Movatter.jp