- Notifications
You must be signed in to change notification settings - Fork45
Sandglass is a distributed, horizontally scalable, persistent, time sorted message queue.
License
sandglass/sandglass
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Sandglass is a distributed, horizontally scalable, persistent, time ordered message queue. It was developed to support asynchronous tasks and message scheduling which makes it suitable for usage as a task queue.
- Horizontal scalability
- Highly available
- Persistent storage
- Time ordered
- Multiple consumers per group for a partition
- Produce message to be consumed in the future
- Acknowledge/NotAcknowledge each message individualy
- Automatic redelivery and commit offset tracking
- Language agnostic
EXPERIMENTAL: This is a prototype of a side project. This should not be used in production in its current form as things may change quickly without notice.
On MacOS usingHomebrew:
brew install celrenheit/taps/sandglass
For other platforms, you can grab binarieshere.
NOTE: All data will be stored in /tmp/node1. If you wish to change this, copy
demo/node1.yaml
and modify it accordingly.
First, let's launch sandglass server:
sandglass --config https://raw.githubusercontent.com/sandglass/sandglass/master/demo/node1.yaml --offset_replication_factor 1
In a second terminal window, create aemails topic:
sandctl topics create emails --num_partitions 3 --replication_factor 1
...produce 10,000 messages:
sandctl produce emails'{"dest" : "hi@example.com"}' -n 10000
...and consume from theemails topic:
sandctl consume emails
(or if you wish to watch you can usesandctl consume -f emails
to see messages coming live)
We are using a single node cluster, this is not recommended for production.
Add a second node to the cluster:
sandglass --config https://raw.githubusercontent.com/sandglass/sandglass/master/demo/node2.yaml
and repeat the same steps described above for another topic and increasing the replication factor to 2.
As previously asked (#4), the purpose of this project might not seem clear. In short, there is two goals.
The first is to be able to track each message individually (i.e. not using a single commit offset) to make suitable for asynchronous tasks.
The second is the ability to schedule messages to be consumed in the future. This make it suitable for retries.
Documentation is a work in progress and still lacking.
- API docs
- Overview - TODO
- Clients development guides - TODO
- Documentation the different ways of starting a cluster
go get -u github.com/sandglass/sandglass-client/go/sg
The documentation is available ongodoc.
// Let's first create a client by providing adresses of nodes in the sandglass clusterclient,err:=sg.NewClient(sg.WithAddresses(":7170"),)iferr!=nil {panic(err)}deferclient.Close()// Now we produce a new message// Notice the empty string "" in the 3th argument, meaning let sandglass choose a random partitionerr:=client.Produce(context.Background(),"emails","",&sgproto.Message{Value: []byte("Hello, Sandglass!"),})iferr!=nil {panic(err)}
In order to produce message in the future, you need to specify a custom offset:
inOneHour:=time.Now().Add(1*time.Hour)gen:=sandflake.NewFixedTimeGenerator(inOneHour)msg:=&sgproto.Message{Offset:gen.Next(),Value: []byte("Hello"),}err:=client.ProduceMessage(context.Background(),"emails","",msg)iferr!=nil {returnerr}
This will produce a message that will be available for consumption in 1h.
- High-level
// Let's first create a client by providing adresses of nodes in the sandglass clusterclient,err:=sg.NewClient(sg.WithAddresses(":7170"),)iferr!=nil {panic(err)}deferclient.Close()mux:=sg.NewMux()mux.SubscribeFunc("emails",func(msg*sgproto.Message)error {// handle messagelog.Printf("received: %s\n",string(msg.Value))returnnil})m:=&sg.MuxManager{Client:c,Mux:mux,ReFetchSleepDuration:dur,}err=m.Start()iferr!=nil {log.Fatal(err)}
- Low level
// Let's first create a client by providing adresses of nodes in the sandglass clusterclient,err:=sg.NewClient(sg.WithAddresses(":7170"),)iferr!=nil {panic(err)}deferclient.Close()// Listing partitions in order to choose one to consume frompartitions,err:=client.ListPartitions(context.Background(),topic)iferr!=nil {panic(err)}// we are choosing only one partition for demo purposespartition:=partitions[0]// Create a new consumer using consumer group emails-sender and consumer name consumer1consumer:=client.NewConsumer(topic,partition,"emails-sender","consumer1")// and consume messagesmsgCh,err:=consumer.Consume(context.Background())iferr!=nil {panic(err)}formsg:=rangemsgCh {// do an amazing amount of worklog.Printf("received: %s\n",string(msg.Value))// when we are done, we Acknowledge the message// but we can also NotAcknowledge to trigger a redelivery of the messageiferr:=consumer.Acknowledge(context.Background(),msg);err!=nil {panic(err) }}
Interested in having client for one the following languages ?
Support is planned but there is no specific schedule. So, if you are interested to quickly have a client in your language,help is welcome!
Check the raw generated code available onhttps://github.com/sandglass/sandglass-grpc and feel free to submit your through a pull request tohttps://github.com/sandglass/sandglass-client.
There is two kinds of topics:
Timer:
- Fixed number of partitions (set up-front, could change)
- Time ordered usingsandflake IDs
- Can produce messages in the future
KV:
- Fixed number of partitions (set up-front, cannot change)
- Behaves like a distributed key value store
A topic has a number of partitions.Data is written into a single partition. Either the destination partition is specified by the producer. Otherwise, we fallback to choosing the destination partition using a consistent hashing algorithm.
Each produced message to a partition writes a message to a Write Ahead Log (WAL) and to the View Log (VL).The WAL is used for the replication logic, it is sorted in the order each message was produced.The View Log is used for message consumption, it is mainly sorted by time (please refer tosandflake ids for the exact composition) for a Timer topics and by keys for KV topics.
A message is composed of the following fields:
index <- position in the WAL offset <- position in the view log for timer topics key and clusteringKey <- position in the view log for key for kv topics (key is used for partitioning) value <- your payload
Sandglass is responsible for maintaining two offsets for each consumer group:
- Commited: the offset below which all messages have been ACKed
- Consumed: the last consumed message
When consuming sandglass starts from the last commited until the last consumed message to check the redelivery of messages. And from the last consumed offset until the last produced message to deliver the new messages. These two actions are done in parallel.
- Leader Election:hashicorp/raft
- Persitance:RocksDB orBadger
- Communication:GRPC
- Gossip:Serf
- and many more
Want to contribute to Sandglass ? Awesome! Feel free to submit an issue or a pull request.
Here are some ways you can help:
- Report bugs
- Your language is not supported ? Feel free tobuild a client. Trust me, it should not take you long :)
- Improve code/documentation
- Propose new features
- and more...
This project is licensed under the Apache License 2.0 availablehere.
About
Sandglass is a distributed, horizontally scalable, persistent, time sorted message queue.