- Notifications
You must be signed in to change notification settings - Fork0
rudderlabs/keydb
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
KeyDB is a distributed key store (not a key-value store) designed to be fast, scalable, and eventually consistent.It provides a simple API for checking the existence of keys with TTL (Time To Live) support.
KeyDB is a distributed system that allows you to:
- Store keys (no values) with configurable TTL
- Check for key existence
- Scale horizontally by adding or removing nodes
- Persist data through snapshots to cloud storage
- Snapshots are used to scale the cluster since new nodes will download the hash ranges they need to manage directlyfrom cloud storage
- Distributed Architecture: Supports multiple nodes with automatic key distribution
- Scalability: Dynamically scale the cluster by adding or removing nodes
- TTL Support: Keys automatically expire after their time-to-live
- Persistence: Snapshots can be stored in cloud storage for scaling the system or backing up the data withoutneeding nodes to communicate with each other
The KeyDB client provides a simple interface to interact with the KeyDB cluster.
import"github.com/rudderlabs/keydb/client"
import ("github.com/rudderlabs/keydb/client""github.com/rudderlabs/rudder-go-kit/logger""github.com/rudderlabs/rudder-go-kit/stats")config:= client.Config{Addresses: []string{"localhost:50051","localhost:50052"},// List of node addressesTotalHashRanges:271,// Optional, defaults to 271ConnectionPoolSize:10,// Optional, defaults to 10RetryPolicy: client.RetryPolicy{Disabled:false,InitialInterval:100*time.Millisecond,// Optional, defaults to 100msMultiplier:1.5,// Optional, defaults to 1.5MaxInterval:30*time.Second,// Optional, defaults to 30s },}// Logger is requiredlogFactory:=logger.NewFactory(conf)log:=logFactory.NewLogger()keydbClient,err:=client.NewClient(config,log,client.WithStats(stats.NOP))iferr!=nil {// Handle error}deferkeydbClient.Close()
To add keys with TTL:
keys:= []string{"key1","key2","key3"}// Put the keyserr:=keydbClient.Put(context.Background(),keys,24*time.Hour)iferr!=nil {// Handle error}
To check if keys exist:
// Keys to checkkeys:= []string{"user:123","session:456","unknown:key"}// Get existence statusexists,err:=keydbClient.Get(context.Background(),keys)iferr!=nil {// Handle error}// Process resultsfori,key:=rangekeys {ifexists[i] {fmt.Printf("Key %s exists\n",key) }else {fmt.Printf("Key %s does not exist\n",key) }}
- Connection Pooling: Each node connection uses a pool of gRPC connections (default 10) for improved throughput
- Connections are distributed using round-robin load balancing
- Pool size is configurable via
ConnectionPoolSizein client config - Metrics track operations per connection:
keydb_client_connection_ops_total
- Automatic Retries: The client automatically retries operations on transient errors
- Cluster Awareness: Automatically adapts to cluster size changes (including degraded mode)
- The cluster size is dynamically calculated based on active (non-degraded) nodes
- When a node is marked as degraded, it's excluded from the effective cluster size
- Key Distribution: Automatically routes keys to the correct node based on hash
- Parallel Operations: Sends requests to multiple nodes in parallel for better performance
- Automatic retries: Automatically retries requests that fail for a configured number of times
- The client tries again if the cluster size was changed and it hits a node not managing a requested key,although before trying again it will update its internal metadata with the new cluster node addresses first
- Cluster changes trigger retries via recursion so there is no limit on the retries here
- The client will retry as soon as possible right after trying to establish connections to new nodes (if any)
- Normal errors trigger retries with an exponential backoff configured with a
RetryPolicy - For the retries client configuration please refer to the clientConfig
struct
- The client tries again if the cluster size was changed and it hits a node not managing a requested key,although before trying again it will update its internal metadata with the new cluster node addresses first
To start a KeyDB node, configure it with environment variables (all prefixed withKEYDB_):
# Configure node with environment variablesexport KEYDB_NODE_ID=0export KEYDB_NODE_ADDRESSES="localhost:50051,localhost:50052,localhost:50053"export KEYDB_PORT=50051export KEYDB_SNAPSHOT_INTERVAL=60sexport KEYDB_TOTAL_HASH_RANGES=271# Optional: Configure degraded nodes (comma-separated boolean values)# Nodes marked as degraded will not serve read/write trafficexport KEYDB_DEGRADED_NODES="false,false,false"# Storage configurationexport KEYDB_STORAGE_BUCKET="my-keydb-bucket"export KEYDB_STORAGE_REGION="us-east-1"export KEYDB_STORAGE_ACCESSKEYID="your-access-key-id"export KEYDB_STORAGE_ACCESSKEY="your-secret-access-key"# Start the nodego run cmd/node/main.go
Node addresses can be updated dynamically without restarting the node. TheKEYDB_NODE_ADDRESSES environment variableis reloadable, meaning changes to the configuration will be picked up by running nodes.
In order to do that, update thenodeAddresses via the configmap (i.e./etc/rudderstack/config.yaml).
This is useful during scaling operations along withdegradedNodes.
To scale the KeyDB cluster, you can leverage theScaler HTTP API.
Internally theScaler API uses theinternal/scaler gRPC client to manage the cluster.
Here is aSCALE UP example of how to scale the cluster via the HTTP API.
The below example depicts the scenario of scaling from 1 node to 3 nodes.
Step 1: Enable Scaler and Add New Nodes in Degraded Mode
Merge arudder-devops PR that will:
- Enable the scaler component if not already enabled
- Increase the CPU and memory of the nodes to give more power for the creation of the snapshots
- Consider increasing IOPS and disk throughput as well if necessary
- Increase the number of replicas (e.g. from 1 to 3) i.e.
replicaCount: 3 - Set the
degradedNodesconfiguration- e.g.
false,true,truewhich means that the 1st node will continue to receive traffic as usual whilst the 2new nodes will not receive traffic but will be available to receive new snapshots
- e.g.
Step 2: Move Hash Ranges
Call/hashRangeMovements to move the hash ranges. You can use one of two methods:
Step 2.1: Using Node-to-Node Streaming (Recommended)
This method transfers data directly between nodes without using cloud storage as an intermediary, which is faster and more efficient.
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 1, "new_cluster_size": 3, "total_hash_ranges": 271, "streaming": true}'
Step 2.2: Using Cloud Storage (Upload and Download)
This method uses cloud storage as an intermediary. It's useful when nodes cannot communicate directly or when you want to create backups during the scaling process.
First, preview the operation (optional but recommended):
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 1, "new_cluster_size": 3, "total_hash_ranges": 271}'
Then, upload the snapshots to cloud storage:
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 1, "new_cluster_size": 3, "total_hash_ranges": 271, "upload": true, "full_sync": true}'
Finally, download and load the snapshots:
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 1, "new_cluster_size": 3, "total_hash_ranges": 271, "download": true}'
Notes:
- Pre-uploads and pre-downloads can be useful for several reasons:
- To measure how long snapshots creation and loading might take without actually having to scale the cluster
- To do a full sync before scaling (full syncs delete old data that might be expired from S3 so that nodeswon't have to download expired data, making the scaling process faster later)
- Full sync snapshots contain all the data, while incremental snapshots only contain data since the last snapshot(see
sinceinnode/node.go)
Step 3: Remove Degraded Mode
Merge anotherrudder-devops PR to setdegradedNodes to either empty orfalse,false,false
- If necessary you should reduce the CPU and memory if you ended up increasing them in Step 1
Here is aSCALE DOWN example of how to scale the cluster via the HTTP API.
The below example depicts the scenario of scaling from 3 nodes to 1 node.
Step 1: Enable Scaler
Merge arudder-devops PR to enable the scaler component if not already enabled.
Step 2: Move Hash Ranges
Call/hashRangeMovements to move the data from the nodes being removed to the remaining node(s).
Using Streaming Mode (Recommended):
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 3, "new_cluster_size": 1, "total_hash_ranges": 271, "streaming": true}'
Alternatively, using Cloud Storage:
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 3, "new_cluster_size": 1, "total_hash_ranges": 271, "upload": true, "download": true, "full_sync": true}'
Step 3: Mark Nodes as Degraded
Merge arudder-devops PR to mark the nodes that will be removed as degraded by setting thedegradedNodes configuration:
- e.g.
false,true,truewhich means that the 1st node will continue to receive traffic whilst nodes 2 and 3are marked as degraded and will not receive traffic
Why this is important: Marking nodes as degraded ensures the client is properly informed about the cluster size change. When a client hits a degraded node (e.g., node 1 or node 2), those nodes will tell the client to use the new cluster size and talk to node 0 instead. If you skip this step and just remove the nodes, the client might try to hit node 1, but since node 1 won't be there anymore, it can't redirect the client to node 0, potentially causing the client to get stuck.
Step 4: Reduce Replica Count
Merge a separaterudder-devops PR to decrease thereplicaCount and remove thedegradedNodes configuration
- This will remove the degraded nodes from the cluster
Why previewing a scaling operation?
- It helps you understand the impact of the scaling operation on the cluster
- It will help you plan the scaling operation by showing you how many hash ranges need to be moved
- It will help you consider the impact of snapshots creation before actually scaling the cluster
To preview a scaling operation you can call/hashRangeMovements like in the example below:
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 4, "new_cluster_size": 5, "total_hash_ranges": 271}'
In the above example we are previewing a scaling operation from 4 to 5 nodes (i.e. scale-up).We're not going to tell the nodes to do a pre-upload of the snapshots (i.e.upload != true).
What thescaler might tell us in that case specifically is that 100 hash ranges will have to be moved:
{"total":100,"movements": [ {"hash_range":5,"from":1,"to":0 }, {"hash_range":9,"from":1,"to":4 },... ]}You can try different combinations:
| op | old_cluster_size | new_cluster_size | total_hash_ranges | no_of_moved_hash_ranges |
|---|---|---|---|---|
| scale_up | 1 | 2 | 271 | 102 |
| scale_up | 1 | 3 | 271 | 167 |
| scale_up | 1 | 4 | 271 | 200 |
| scale_up | 2 | 3 | 271 | 87 |
| scale_up | 2 | 4 | 271 | 137 |
| scale_up | 3 | 4 | 271 | 84 |
| scale_up | 4 | 5 | 271 | 68 |
Supported options:
typeHashRangeMovementsRequeststruct {OldClusterSizeint64`json:"old_cluster_size"`NewClusterSizeint64`json:"new_cluster_size"`TotalHashRangesint64`json:"total_hash_ranges"`Uploadbool`json:"upload,omitempty"`Downloadbool`json:"download,omitempty"`FullSyncbool`json:"full_sync,omitempty"`CreateSnapshotsMaxConcurrencyint`json:"create_snapshots_max_concurrency,omitempty"`LoadSnapshotsMaxConcurrencyint`json:"load_snapshots_max_concurrency,omitempty"`DisableCreateSnapshotsSequentiallybool`json:"disable_create_snapshots_sequentially,omitempty"`Streamingbool`json:"streaming,omitempty"`}
Consider usingCreateSnapshotsMaxConcurrency andLoadSnapshotsMaxConcurrency to control resource usage:
CreateSnapshotsMaxConcurrency: Limits how many snapshots can be created concurrently (default: 10)LoadSnapshotsMaxConcurrency: Limits how many snapshots can be loaded concurrently from S3 (default: 10)
This is useful if a node has to handle a large number of big snapshots to avoid OOM kills.
Whenstreaming=true is set, data is transferred directly between nodes without using cloud storage as anintermediary. This can significantly reduce scaling operation time by eliminating the upload/download steps to S3.
How streaming works:
- The scaler instructs source nodes to send hash ranges directly to destination nodes
- Source nodes connect to destination nodes via gRPC
- Destination nodes inform source nodes of their last known timestamp (for incremental sync)
- Source nodes create snapshots and stream them in chunks to destinations
- Destination nodes load the received data and store the timestamp for future incremental syncs
Streaming constraints:
- One source node can only send one hash range at a time (prevents resource exhaustion)
- Multiple source nodes can send in parallel (maximizes throughput)
- Incremental syncs are supported via timestamp negotiation
Example streaming request:
curl --location'localhost:8080/hashRangeMovements' \--header'Content-Type: application/json' \--data'{ "old_cluster_size": 2, "new_cluster_size": 3, "total_hash_ranges": 271, "streaming": true}'
Alternatively you can give the nodes more memory, although a balance of the two is usually a good idea.Snapshots are usually compressed and uploaded to S3, so the download and decompressionof big snapshots could take a significant portion of memory.
The/backup endpoint allows you to create and/or load snapshots for all nodes (or specific nodes) in the clusterwithout performing any scaling operations. This is useful for:
- Creating periodic backups to cloud storage
- Restoring data from cloud storage after a disaster
- Pre-populating nodes with data before they go live
- Resizing PVCs
Request schema:
typeBackupRequeststruct {Uploadbool`json:"upload,omitempty"`Downloadbool`json:"download,omitempty"`FullSyncbool`json:"full_sync,omitempty"`LoadSnapshotsMaxConcurrencyint`json:"load_snapshots_max_concurrency,omitempty"`DisableCreateSnapshotsSequentiallybool`json:"disable_create_snapshots_sequentially,omitempty"`Nodes []int64`json:"nodes,omitempty"`}
Parameters:
upload: Create snapshots and upload them to cloud storagedownload: Download and load snapshots from cloud storagefull_sync: When true, performs a full sync that deletes old snapshot files on S3 before uploadingload_snapshots_max_concurrency: Limits how many snapshots can be loaded concurrently (default: 10)disable_create_snapshots_sequentially: When true, creates snapshots in parallel instead of sequentiallynodes: Optional list of specific node IDs to backup/restore. If omitted, all nodes are processed
Note: At least one ofupload ordownload must be true. You cannot upload and download in the same request.
Example: Backup all nodes to cloud storage
curl --location'localhost:8080/backup' \--header'Content-Type: application/json' \--data'{ "upload": true, "full_sync": true}'
Example: Restore all nodes from cloud storage
curl --location'localhost:8080/backup' \--header'Content-Type: application/json' \--data'{ "download": true, "load_snapshots_max_concurrency": 5}'
Example: Backup specific nodes
curl --location'localhost:8080/backup' \--header'Content-Type: application/json' \--data'{ "upload": true, "nodes": [0, 2]}'
Response:
{"success":true,"total":271}Wheretotal is the number of hash ranges that were processed (either uploaded or downloaded).
KeyDB supports a "degraded mode" feature that allows nodes to be temporarily excluded from serving traffic duringscaling operations. This helps prevent data inconsistencies when nodes restart during a scale operation.
How it works:
- Nodes can be marked as degraded via the
KEYDB_DEGRADED_NODESenvironment variable (comma-separated boolean values) - Degraded nodes reject all read/write requests with an
ERROR_SCALINGresponse code - The effective cluster size excludes degraded nodes, ensuring correct hash range calculations
- Degraded nodes can still perform administrative operations (e.g.,
LoadSnapshots)
Usage during scaling:
- First devops PR: Add new nodes in degraded mode without changing the config of old nodes
# Old nodes (node 0, 1)export KEYDB_DEGRADED_NODES="false,false"# New nodes (node 2, 3) - start in degraded modeexport KEYDB_DEGRADED_NODES="false,false,true,true"
- Move data: Use
/hashRangeMovementsto move the data between the old and new nodes. - Second devops PR: Update persistent config to remove degraded mode permanently
This approach ensures that if any old node restarts during scaling, it won't use incorrect cluster size configurations.
WARNING:
- Degrading an active node will redistribute the hash range that it was previously responsible for.
- Marking a new node as non-degraded before it could download the relevant snapshots, will make the node serve requestswithout the relevant data. This could lead to data inconsistencies. Whereas it could be a strategy to start servingtraffic to a new node before it has downloaded all the relevant snapshots, it could lead to inconsistencies.
- You can simply merge a devops PR with the desired cluster size and restart the nodes
- In this case data won't be moved between nodes so it will lead to data loss
- You can do everything manually by calling the single endpoints yourself, although it might be burdensome with a lotof hash ranges to move, so this would mean calling
/createSnapshots,/loadSnapshots, and/updateClusterDatamanually
Here you can access the Postman collection for theScaler HTTP API.
POST /get- Check key existencePOST /put- Add keys with TTLPOST /info- Get node information (e.g. node ID, cluster size, addresses, hash ranges managed by that node, etc...)POST /createSnapshots- Create snapshots and upload them to cloud storagePOST /loadSnapshots- Downloads snapshots from cloud storage and loads them into BadgerDBPOST /updateClusterData- Update the scaler internal information with the addresses of all the nodes that comprisethe clusterPOST /hashRangeMovements- Get hash range movement information (supports snapshot creation and loading)POST /backup- Create and/or load snapshots for all (or specific) nodes without scaling
It is possible to experience increased latencies when getting and setting keys.One possible culprit could be insufficient CPU. Another possible culprit could be compactions triggering too often,which also leads to CPU exhaustion.
Usually keydb can operate with decent latencies while using the default settings. However, it is important to understandthem in case some tuning might be needed to optimize compactions.
For a comprehensive list of available settings you can refer to the constructor inbadger.go.
MemTableSizedefault64MB- When too small, you might not be able to buffer enough writes before flushing to L0 (e.g. a bigger size can reducethe frequency of flushes)
NumLevelZeroTablesdefault to10- L0 flushing would start after ~640MB (64MB
MemTableSizetimes 10) - Reducing this value might lead to flushing to L0 (disk) more frequently
- L0 flushing would start after ~640MB (64MB
BaseLevelSizedefault1GB(base level = L1, i.e. the primary destination of compaction from L0)- Badger uses an LSM tree: immutable sorted tables (SSTables) grouped into levels
- Level 0 (L0): contains newly flushed SSTables from MemTables, possibly overlapping in key ranges
- Level 1 and beyond (L1, L2, ...): contain non-overlapping SSTables
- Each successive level can hold more data than the previous one, usually growing by a size multiplier (default 5)
- The base level size determines how big Level 1 (the base level) should be (consider that the base level is the firstlevel of compaction where the overlapping key ranges from L0 are compacted down in L1 first and then down in higherlevels as data keeps on growing)
- TL;DR: More SSTables can accumulate in L1 before being compacted to L2
- This means fewer compactions overall (good for write throughput)
- Higher read amplification (since L1 can grow larger and contains overlapping SSTables, lookups may need to checkmore files)
- Larger total on-disk data before compactions
NumCompactorsdefault4- This increases the parallelism for compaction jobs, helping improve overall throughput with compactions
BlockCacheSize(default to1GB) - Cache for SSTable data blocks (actual value/key payloads read from disk)- During compaction Badger must read data blocks from SSTables in input levels, merge them, and write new SSTables tothe next level
- If the block cache is too small, compaction threads can cause a lot of block cache evictions
- Reads during compaction will go to disk instead of RAM → higher disk I/O, slower compactions
- User queries may suffer, since compaction and queries compete for cache
IndexCacheSize(default to512MB) - Cache for SSTable index blocks and Bloom filters (metadata needed to locatekeys inside SSTables)- During compaction, the compactor repeatedly queries SSTable indexes to decide which keys to merge or drop
- If index cache is too small, index blocks and bloom filters are constantly evicted
- Each lookup during compaction requires re-reading index blocks from disk
- Creating and loading snapshots can take a lot of time for customers like
loveholidays- For this reason we can do pre-uploads with
/hashRangeMovementsbut it is a manual step that requires someunderstanding of the internal workings ofkeydband thescaler - We could take automatic daily snapshots, but they would quite probably create spikes in CPU with a consequentincrease in latencies
- For this reason we can do pre-uploads with
- Pre-downloads are not supported yet
- We might need to create a
checkpointstable to store what snapshots file a node has already loaded, so that we canskip them during a scaling operation
- We might need to create a
- Scaling operations require additional resources
- For deployments with VPA enabled, resources can be automatically adjusted in-place (though restarts may still occurdepending on configuration)
- For deployments without VPA, resources must be manually increased before scaling, which will cause a restart
About
Key store (not key-value store!) for data deduplication.
Topics
Resources
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors7
Uh oh!
There was an error while loading.Please reload this page.