Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

A fast Golang Valkey client that supports Client Side Caching and Auto Pipelining.

License

NotificationsYou must be signed in to change notification settings

valkey-io/valkey-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Go ReferenceCircleCIGo Report Cardcodecov

A fast Golang Valkey client that does auto pipelining and supports server-assisted client-side caching.

Features


Getting Started

package mainimport ("context""github.com/valkey-io/valkey-go")funcmain() {client,err:=valkey.NewClient(valkey.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})iferr!=nil {panic(err)}deferclient.Close()ctx:=context.Background()// SET key val NXerr=client.Do(ctx,client.B().Set().Key("key").Value("val").Nx().Build()).Error()// HGETALL hmhm,err:=client.Do(ctx,client.B().Hgetall().Key("hm").Build()).AsStrMap()}

Check out more examples:Command Response Cheatsheet

Developer Friendly Command Builder

client.B() is the builder entry point to construct a valkey command:

Developer friendly command builder
Recorded by @FZambiaImproving Centrifugo Redis Engine throughput and allocation efficiency with Rueidis Go library

Once a command is built, use eitherclient.Do() orclient.DoMulti() to send it to valkey.

You ❗️SHOULD NOT❗️ reuse the command to anotherclient.Do() orclient.DoMulti() call because it has been recycled to the underlyingsync.Pool by default.

To reuse a command, usePin() afterBuild() and it will prevent the command from being recycled.

Auto Pipelining

All concurrent non-blocking valkey commands (such asGET,SET) are automatically pipelined by default,which reduces the overall round trips and system calls and gets higher throughput. You can easily get the benefitofpipelining technique by just callingclient.Do() from multiple goroutines concurrently.For example:

funcBenchmarkPipelining(b*testing.B,client valkey.Client) {// the below client.Do() operations will be issued from// multiple goroutines and thus will be pipelined automatically.b.RunParallel(func(pb*testing.PB) {forpb.Next() {client.Do(context.Background(),client.B().Get().Key("k").Build()).ToString()}})}

Benchmark Comparison with go-redis v9

Compared to go-redis, valkey-go has higher throughput across 1, 8, and 64 parallelism settings.

It is even able to achieve~14x throughput over go-redis in a local benchmark of Macbook Pro 16" M1 Pro 2021. (seeparallelism(64)-key(16)-value(64)-10)

client_test_set

Benchmark source code:https://github.com/rueian/rueidis-benchmark

A benchmark result performed on two GCP n2-highcpu-2 machines also shows that valkey-go can achieve higher throughput with lower latencies:redis/rueidis#93

Disable Auto Pipelining

While auto pipelining maximizes throughput, it relies on additional goroutines to process requests and responses and may add some latencies due to goroutine scheduling and head of line blocking.

You can avoid this by settingDisableAutoPipelining to true, then it will switch to connection pooling approach and serve each request with dedicated connection on the same goroutine.

WhenDisableAutoPipelining is set to true, you can still send commands for auto pipelining withToPipe():

cmd:=client.B().Get().Key("key").Build().ToPipe()client.Do(ctx,cmd)

This allows you to use connection pooling approach by default but opt in auto pipelining for a subset of requests.

Manual Pipelining

Besides auto pipelining, you can also pipeline commands manually withDoMulti():

cmds:=make(valkey.Commands,0,10)fori:=0;i<10;i++ {cmds=append(cmds,client.B().Set().Key("key").Value("value").Build())}for_,resp:=rangeclient.DoMulti(ctx,cmds...) {iferr:=resp.Error();err!=nil {panic(err)    }}

The opt-in mode ofserver-assisted client-side caching is enabled by default and can be used by callingDoCache() orDoMultiCache() with client-side TTLs specified.

client.DoCache(ctx,client.B().Hmget().Key("mk").Field("1","2").Cache(),time.Minute).ToArray()client.DoMultiCache(ctx,valkey.CT(client.B().Get().Key("k1").Cache(),1*time.Minute),valkey.CT(client.B().Get().Key("k2").Cache(),2*time.Minute))

Cached responses, including Valkey Nils, will be invalidated either when being notified by valkey servers or when their client-side TTLs are reached. Seeredis/rueidis#534 for more details.

Benchmark

Server-assisted client-side caching can dramatically boost latencies and throughput just likehaving a valkey replica right inside your application. For example:

client_test_get

Benchmark source code:https://github.com/rueian/rueidis-benchmark

Client-Side Caching Helpers

UseCacheTTL() to check the remaining client-side TTL in seconds:

client.DoCache(ctx,client.B().Get().Key("k1").Cache(),time.Minute).CacheTTL()==60

UseIsCacheHit() to verify if the response came from the client-side memory:

client.DoCache(ctx,client.B().Get().Key("k1").Cache(),time.Minute).IsCacheHit()==true

If the OpenTelemetry is enabled by thevalkeyotel.NewClient(option), then there are also two metrics instrumented:

  • valkey_do_cache_miss
  • valkey_do_cache_hits

MGET/JSON.MGET Client-Side Caching Helpers

valkey.MGetCache andvalkey.JsonMGetCache are handy helpers fetching multiple keys across different slots through the client-side caching.They will first group keys by slot to buildMGET orJSON.MGET commands respectively and then send requests with only cache missed keys to valkey nodes.

Broadcast Mode Client-Side Caching

Although the default is opt-in mode, you can use broadcast mode by specifying your prefixes inClientOption.ClientTrackingOptions:

client,err:=valkey.NewClient(valkey.ClientOption{InitAddress:           []string{"127.0.0.1:6379"},ClientTrackingOptions: []string{"PREFIX","prefix1:","PREFIX","prefix2:","BCAST"},})iferr!=nil {panic(err)}client.DoCache(ctx,client.B().Get().Key("prefix1:1").Cache(),time.Minute).IsCacheHit()==falseclient.DoCache(ctx,client.B().Get().Key("prefix1:1").Cache(),time.Minute).IsCacheHit()==true

Please make sure that commands passed toDoCache() andDoMultiCache() are covered by your prefixes.Otherwise, their client-side cache will not be invalidated by valkey.

Client-Side Caching with Cache Aside Pattern

Cache-Aside is a widely used caching strategy.valkeyaside can help you cache data into your client-side cache backed by Valkey. For example:

client,err:=valkeyaside.NewClient(valkeyaside.ClientOption{ClientOption: valkey.ClientOption{InitAddress: []string{"127.0.0.1:6379"}},})iferr!=nil {panic(err)}val,err:=client.Get(context.Background(),time.Minute,"mykey",func(ctx context.Context,keystring) (valstring,errerror) {iferr=db.QueryRowContext(ctx,"SELECT val FROM mytab WHERE id = ?",key).Scan(&val);err==sql.ErrNoRows {val="_nil_"// cache nil to avoid penetration.err=nil// clear err in case of sql.ErrNoRows.    }return})// ...

Please refer to the full example atvalkeyaside.

Disable Client-Side Caching

Some Valkey providers don't support client-side caching, ex. Google Cloud Memorystore.You can disable client-side caching by settingClientOption.DisableCache totrue.This will also fall backclient.DoCache() andclient.DoMultiCache() toclient.Do() andclient.DoMulti().

Context Cancellation

client.Do(),client.DoMulti(),client.DoCache(), andclient.DoMultiCache() can return early if the context deadline is reached.

ctx,cancel:=context.WithTimeout(context.Background(),time.Second)defercancel()client.Do(ctx,client.B().Set().Key("key").Value("val").Nx().Build()).Error()==context.DeadlineExceeded

Please note that though operations can return early, the command is likely sent already.

Canceling a Context Before Its Deadline

Manually canceling a context is only work in pipeline mode, as it requires an additional goroutine to monitor the context.Pipeline mode will be started automatically when there are concurrent requests on the same connection, but you can start it in advance withClientOption.AlwaysPipeliningto make sure manually cancellation is respected, especially for blocking requests which are sent with a dedicated connection where pipeline mode isn't started.

Disable Auto Retry

All read-only commands are automatically retried on failures by default before their context deadlines exceeded.You can disable this by settingDisableRetry or adjust the number of retries and durations between retries usingRetryDelay function.

Pub/Sub

To receive messages from channels,client.Receive() should be used. It supportsSUBSCRIBE,PSUBSCRIBE, and Valkey 7.0'sSSUBSCRIBE:

err=client.Receive(context.Background(),client.B().Subscribe().Channel("ch1","ch2").Build(),func(msg valkey.PubSubMessage) {// Handle the message. Note that if you want to call another `client.Do()` here, you need to do it in another goroutine or the `client` will be blocked.})

The provided handler will be called with the received message.

It is important to note thatclient.Receive() will keep blocking until returning a value in the following cases:

  1. returnnil when receiving any unsubscribe/punsubscribe message related to the providedsubscribe command, includingsunsubscribe messages caused by slot migrations.
  2. returnvalkey.ErrClosing when the client is closed manually.
  3. returnctx.Err() when thectx is done.
  4. return non-nilerr when the providedsubscribe command fails.

While theclient.Receive() call is blocking, theClient is still able to accept other concurrent requests,and they are sharing the same TCP connection. If your message handler may take some time to complete, it is recommendedto use theclient.Receive() inside aclient.Dedicated() for not blocking other concurrent requests.

Alternative PubSub Hooks

Theclient.Receive() requires users to provide a subscription command in advance.There is an alternativeDedicatedclient.SetPubSubHooks() that allows users to subscribe/unsubscribe channels later.

c,cancel:=client.Dedicate()defercancel()wait:=c.SetPubSubHooks(valkey.PubSubHooks{OnMessage:func(m valkey.PubSubMessage) {// Handle the message. Note that if you want to call another `c.Do()` here, you need to do it in another goroutine or the `c` will be blocked.}})c.Do(ctx,c.B().Subscribe().Channel("ch").Build())err:=<-wait// disconnected with err

If the hooks are not nil, the abovewait channel is guaranteed to be closed when the hooks will not be called anymore,and produce at most one error describing the reason. Users can use this channel to detect disconnection.

CAS Transaction

To do aCAS Transaction (WATCH +MULTI +EXEC), a dedicated connection should be used because there should be nounintentional write commands betweenWATCH andEXEC. Otherwise, theEXEC may not fail as expected.

client.Dedicated(func(c valkey.DedicatedClient)error {// watch keys firstc.Do(ctx,c.B().Watch().Key("k1","k2").Build())// perform read herec.Do(ctx,c.B().Mget().Key("k1","k2").Build())// perform write with MULTI EXECc.DoMulti(ctx,c.B().Multi().Build(),c.B().Set().Key("k1").Value("1").Build(),c.B().Set().Key("k2").Value("2").Build(),c.B().Exec().Build(),    )returnnil})

Or useDedicate() and invokecancel() when finished to put the connection back to the pool.

c,cancel:=client.Dedicate()defercancel()c.Do(ctx,c.B().Watch().Key("k1","k2").Build())// do the rest CAS operations with the `client` who occupies a connection

However, occupying a connection is not good in terms of throughput. It is better to useLua script to performoptimistic locking instead.

Lua Script

TheNewLuaScript orNewLuaScriptReadOnly will create a script which is safe for concurrent usage.

When calling thescript.Exec, it will try sendingEVALSHA first and fall back toEVAL if the server returnsNOSCRIPT.

script:=valkey.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")// the script.Exec is safe for concurrent calllist,err:=script.Exec(ctx,client, []string{"k1","k2"}, []string{"a1","a2"}).ToArray()

Streaming Read

client.DoStream() andclient.DoMultiStream() can be used to send large valkey responses to anio.Writerdirectly without allocating them to the memory. They work by first sending commands to a dedicated connection acquired from a pool,then directly copying the response values to the givenio.Writer, and finally recycling the connection.

s:=client.DoMultiStream(ctx,client.B().Get().Key("a{slot1}").Build(),client.B().Get().Key("b{slot1}").Build())fors.HasNext() {n,err:=s.WriteTo(io.Discard)ifvalkey.IsValkeyNil(err) {// ...    }}

Note that these two methods will occupy connections until all responses are written to the givenio.Writer.This can take a long time and hurt performance. Use the normalDo() andDoMulti() instead unless you want to avoid allocating memory for a large valkey response.

Also note that these two methods only work withstring,integer, andfloat valkey responses. AndDoMultiStream currentlydoes not support pipelining keys across multiple slots when connecting to a valkey cluster.

Memory Consumption Consideration

Each underlying connection in valkey allocates a ring buffer for pipelining.Its size is controlled by theClientOption.RingScaleEachConn and the default value is 10 which results into each ring of size 2^10.

If you have many valkey connections, you may find that they occupy quite an amount of memory.In that case, you may consider reducingClientOption.RingScaleEachConn to 8 or 9 at the cost of potential throughput degradation.

You may also consider setting the value ofClientOption.PipelineMultiplex to-1, which will let valkey use only 1 connection for pipelining to each valkey node.

Instantiating a new Valkey Client

You can create a new valkey client usingNewClient and provide several options.

// Connect to a single valkey node:client,err:=valkey.NewClient(valkey.ClientOption{InitAddress: []string{"127.0.0.1:6379"},})// Connect to a valkey clusterclient,err:=valkey.NewClient(valkey.ClientOption{InitAddress: []string{"127.0.0.1:7001","127.0.0.1:7002","127.0.0.1:7003"},ShuffleInit:true,})// Connect to a valkey cluster and use replicas for read operationsclient,err:=valkey.NewClient(valkey.ClientOption{InitAddress: []string{"127.0.0.1:7001","127.0.0.1:7002","127.0.0.1:7003"},SendToReplicas:func(cmd valkey.Completed)bool {returncmd.IsReadOnly()    },})// Connect to sentinelsclient,err:=valkey.NewClient(valkey.ClientOption{InitAddress: []string{"127.0.0.1:26379","127.0.0.1:26380","127.0.0.1:26381"},Sentinel: valkey.SentinelOption{MasterSet:"my_master",    },})

Valkey URL

You can useParseURL orMustParseURL to construct aClientOption.

The provided URL must be started with eitherredis://,rediss:// orunix://.

Currently supported url parameters aredb,dial_timeout,write_timeout,addr,protocol,client_cache,client_name,max_retries, andmaster_set.

// connect to a valkey clusterclient,err=valkey.NewClient(valkey.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))// connect to a valkey nodeclient,err=valkey.NewClient(valkey.MustParseURL("redis://127.0.0.1:6379/0"))// connect to a valkey sentinelclient,err=valkey.NewClient(valkey.MustParseURL("redis://127.0.0.1:26379/0?master_set=my_master"))

Availability Zone Affinity Routing

Starting from Valkey 8.1, Valkey server provides theavailability-zone information for clients to know where the server is located.For using this information to route requests to the replica located in the same availability zone,set theEnableReplicaAZInfo option and yourReplicaSelector function. For example:

client,err:=valkey.NewClient(valkey.ClientOption{InitAddress:         []string{"address.example.com:6379"},EnableReplicaAZInfo:true,ReplicaSelector:func(slotuint16,replicas []valkey.ReplicaInfo)int {fori,replica:=rangereplicas {ifreplica.AZ=="us-east-1a" {returni// return the index of the replica.}}return-1// send to the primary.},})

Arbitrary Command

If you want to construct commands that are absent from the command builder, you can useclient.B().Arbitrary():

// This will result in [ANY CMD k1 k2 a1 a2]client.B().Arbitrary("ANY","CMD").Keys("k1","k2").Args("a1","a2").Build()

Working with JSON, Raw[]byte, and Vector Similarity Search

The command builder treats all the parameters as Valkey strings, which are binary safe. This means that users can store[]bytedirectly into Valkey without conversion. And thevalkey.BinaryString helper can convert[]byte tostring without copying. For example:

client.B().Set().Key("b").Value(valkey.BinaryString([]byte{...})).Build()

Treating all the parameters as Valkey strings also means that the command builder doesn't do any quoting, conversion automatically for users.

When working with RedisJSON, users frequently need to prepare JSON strings in Valkey strings. Andvalkey.JSON can help:

client.B().JsonSet().Key("j").Path("$.myStrField").Value(valkey.JSON("str")).Build()// equivalent toclient.B().JsonSet().Key("j").Path("$.myStrField").Value(`"str"`).Build()

When working with vector similarity search, users can usevalkey.VectorString32 andvalkey.VectorString64 to build queries:

cmd:=client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]").Params().Nargs(2).NameValue().NameValue("V",valkey.VectorString64([]float64{...})).Dialect(2).Build()n,resp,err:=client.Do(ctx,cmd).AsFtSearch()

Command Response Cheatsheet

While the command builder is developer-friendly, the response parser is a little unfriendly. Developers must know what type of Valkey response will be returned from the server beforehand and which parser they should use.

Error Handling:If an incorrect parser function is chosen, an errParse will be returned. Here's an example using ToArray which demonstrates this scenario:

// Attempt to parse the response. If a parsing error occurs, check if the error is a parse error and handle it.// Normally, you should fix the code by choosing the correct parser function.// For instance, use ToString() if the expected response is a string, or ToArray() if the expected response is an array as follows:iferr:=client.Do(ctx,client.B().Get().Key("k").Build()).ToArray();IsParseErr(err) {fmt.Println("Parsing error:",err)}

It is hard to remember what type of message will be returned and which parsing to use. So, here are some common examples:

// GETclient.Do(ctx,client.B().Get().Key("k").Build()).ToString()client.Do(ctx,client.B().Get().Key("k").Build()).AsInt64()// MGETclient.Do(ctx,client.B().Mget().Key("k1","k2").Build()).ToArray()// SETclient.Do(ctx,client.B().Set().Key("k").Value("v").Build()).Error()// INCRclient.Do(ctx,client.B().Incr().Key("k").Build()).AsInt64()// HGETclient.Do(ctx,client.B().Hget().Key("k").Field("f").Build()).ToString()// HMGETclient.Do(ctx,client.B().Hmget().Key("h").Field("a","b").Build()).ToArray()// HGETALLclient.Do(ctx,client.B().Hgetall().Key("h").Build()).AsStrMap()// EXPIREclient.Do(ctx,client.B().Expire().Key("k").Seconds(1).Build()).AsInt64()// HEXPIREclient.Do(ctx,client.B().Hexpire().Key("h").Seconds(1).Fields().Numfields(2).Field("f1","f2").Build()).AsIntSlice()// ZRANGEclient.Do(ctx,client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice()// ZRANKclient.Do(ctx,client.B().Zrank().Key("k").Member("m").Build()).AsInt64()// ZSCOREclient.Do(ctx,client.B().Zscore().Key("k").Member("m").Build()).AsFloat64()// ZRANGEclient.Do(ctx,client.B().Zrange().Key("k").Min("0").Max("-1").Build()).AsStrSlice()client.Do(ctx,client.B().Zrange().Key("k").Min("0").Max("-1").Withscores().Build()).AsZScores()// ZPOPMINclient.Do(ctx,client.B().Zpopmin().Key("k").Build()).AsZScore()client.Do(ctx,client.B().Zpopmin().Key("myzset").Count(2).Build()).AsZScores()// SCARDclient.Do(ctx,client.B().Scard().Key("k").Build()).AsInt64()// SMEMBERSclient.Do(ctx,client.B().Smembers().Key("k").Build()).AsStrSlice()// LINDEXclient.Do(ctx,client.B().Lindex().Key("k").Index(0).Build()).ToString()// LPOPclient.Do(ctx,client.B().Lpop().Key("k").Build()).ToString()client.Do(ctx,client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()// SCANclient.Do(ctx,client.B().Scan().Cursor(0).Build()).AsScanEntry()// FT.SEARCHclient.Do(ctx,client.B().FtSearch().Index("idx").Query("@f:v").Build()).AsFtSearch()// GEOSEARCHclient.Do(ctx,client.B().Geosearch().Key("k").Fromlonlat(1,1).Bybox(1).Height(1).Km().Build()).AsGeosearch()

Use DecodeSliceOfJSON to Scan Array Result

DecodeSliceOfJSON is useful when you would like to scan the results of an array into a slice of a specific struct.

typeUserstruct {Namestring`json:"name"`}// Set some valuesiferr=client.Do(ctx,client.B().Set().Key("user1").Value(`{"name": "name1"}`).Build()).Error();err!=nil {returnerr}iferr=client.Do(ctx,client.B().Set().Key("user2").Value(`{"name": "name2"}`).Build()).Error();err!=nil {returnerr}// Scan MGET results into []*Uservarusers []*User// or []User is also scannableiferr:=valkey.DecodeSliceOfJSON(client.Do(ctx,client.B().Mget().Key("user1","user2").Build()),&users);err!=nil {returnerr}for_,user:=rangeusers {fmt.Printf("%+v\n",user)}/*&{name:name1}&{name:name2}*/

!!!!!! DO NOT DO THIS !!!!!!

Please make sure that all values in the result have the same JSON structures.

// Set a pure string valueiferr=client.Do(ctx,client.B().Set().Key("user1").Value("userName1").Build()).Error();err!=nil {returnerr}// Badusers:=make([]*User,0)iferr:=valkey.DecodeSliceOfJSON(client.Do(ctx,client.B().Mget().Key("user1").Build()),&users);err!=nil {returnerr}// -> Error: invalid character 'u' looking for the beginning of the value// in this case, use client.Do(ctx, client.B().Mget().Key("user1").Build()).AsStrSlice()

Contributing

Contributions are welcome, includingissues,pull requests, anddiscussions.Contributions mean a lot to us and help us improve this library and the community!

Thanks to all the people who already contributed!

Generate Command Builders

Command builders are generated based on the definitions in./hack/cmds by running:

go generate

Testing

Please use the./dockertest.sh script for running test cases locally.And please try your best to have 100% test coverage on code changes.

About

A fast Golang Valkey client that supports Client Side Caching and Auto Pipelining.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp