- Notifications
You must be signed in to change notification settings - Fork62
Microservice native message and event store for Postgres
License
message-db/message-db
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Microservice Native Event Store and Message Store for Postgres
A fully-featured event store and message store implemented in PostgreSQL for Pub/Sub, Event Sourcing, Messaging, and Evented Microservices applications.
- Pub/Sub
- JSON message data
- Event streams
- Stream categories
- Metadata
- Message queues
- Message storage
- Consumer groups
- Service host
- Administration tools
- Reports
An event sourcing and Pub/Sub message store built on Postgres for simple cloud or local hosting. A minimalist implementation of the essential features of tools likeEvent Store orKafka, with built-in support for messaging patterns like Pub/Sub, and consumer patterns like consumer groups.
Message DB was extracted from theEventide Project to make it easier for users to write clients in the language of their choosing.
A complete user guide is available on the Eventide Project docs site:
http://docs.eventide-project.org/user-guide/message-db/
Message DB can be installed either as a Ruby Gem, an NPM package, or can simply be cloned from this repository.
git clone git@github.com:message-db/message-db.git
gem install message-db
npm install @eventide/message-db
Running the database installation script creates the database, schema, table, indexes, functions, views, types, a user role, and limit the user's privileges to the message store's public interface.
Make sure that your default Postgres user has administrative privileges.
The installation script is in thedatabase
directory of the cloned repo. Change directory to themessage-db
directory where you cloned the repo, and run the script:
database/install.sh
If you installed Message DB via RubyGems, a database installation Ruby executable will be installed with themessage-db
gem.
The executable will be in the gem executable search path and may also be executed through bundler:
bundleexec mdb-create-db
For more information about Ruby executables installed with themessage-db
Ruby Gem, see the Eventide docs on the administration tools that are bundled with the gem:
http://docs.eventide-project.org/user-guide/message-db/tools.html
Themessage-db
NPM module doesn't ship with any special tooling other than the bundled scripts.
To execute the installation script, navigate to the directory where themessage-db
module is installed and run the script:
install.sh
By default, the database creation tool will create a database namedmessage_store
.
If you prefer either a different database name, you can override the name using theDATABASE_NAME
environment variable.
DATABASE_NAME=some_other_database database/install.sh
If you need to drop the database (for example, on a local dev machine):
database/uninstall.sh
If you're upgrading a previous version of the database:
database/update.sh
The message store provides an interface of Postgres server functions that can be used with any programming language or through thepsql
command line tool.
Interaction with the underlying store through the Postgres server functions ensures correct writing and reading messages, streams, and categories.
Write a JSON-formatted message to a named stream, optionally specifying JSON-formatted metadata and an expected version number.
write_message( idvarchar, stream_namevarchar, typevarchar, data jsonb, metadata jsonb DEFAULTNULL, expected_versionbigint DEFAULTNULL)
Position of the message written.
Name | Description | Type | Default | Example |
---|---|---|---|---|
id | UUID of the message being written | varchar | a5eb2a97-84d9-4ccf-8a56-7160338b11e2 | |
stream_name | Name of stream to which the message is written | varchar | someStream-123 | |
type | The type of the message | varchar | Withdrawn | |
data | JSON representation of the message body | jsonb | {"someAttribute": "some value"} | |
metadata (optional) | JSON representation of the message metadata | jsonb | NULL | {"metadataAttribute": "some meta data value"} |
expected_version (optional) | Version that the stream is expected to be when the message is written | bigint | NULL | 11 |
SELECT write_message('a11e9022-e741-4450-bf9c-c4cc5ddb6ea3','someStream-123','SomeMessageType','{"someAttribute": "some value"}','{"metadataAttribute": "some meta data value"}');
-[ RECORD 1 ]-+--write_message | 0
Example:https://github.com/message-db/message-db/blob/master/database/write-test-message.sh
Retrieve messages from a single stream, optionally specifying the starting position, the number of messages to retrieve, and an additional condition that will be appended to the SQL command's WHERE clause.
get_stream_messages( stream_namevarchar, positionbigint DEFAULT0, batch_sizebigint DEFAULT1000, conditionvarchar DEFAULTNULL)
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of stream to retrieve messages from | varchar | someStream-123 | |
position (optional) | Starting position of the messages to retrieve | bigint | 0 | 11 |
batch_size (optional) | Number of messages to retrieve | bigint | 1000 | 111 |
condition (optional) | SQL condition to filter the batch by | varchar | NULL | messages.time >= current_time |
SELECT*FROM get_stream_messages('someStream-123',0,1000, condition=>'messages.time >= current_time');
-[ RECORD 1 ]---+---------------------------------------------------------id | 4b96f09e-104a-4b1f-b198-5b3b46cf1d06stream_name | someStream-123type | SomeTypeposition | 0global_position | 1data | {"attribute": "some value"}metadata | {"metaAttribute": "some meta value"}time | 2019-11-24 17:56:09.71594-[ RECORD 2 ]---+---------------------------------------------------------id | d94e79e3-cdda-49a3-9aad-ce5d70a5edd7stream_name | someStream-123type | SomeTypeposition | 1global_position | 2data | {"attribute": "some value"}metadata | {"metaAttribute": "some meta value"}time | 2019-11-24 17:56:09.75969
Example:https://github.com/message-db/message-db/blob/master/test/get-stream-messages/get-stream-messages.sh
Retrieve messages from a category of streams, optionally specifying the starting position, the number of messages to retrieve, the correlation category for Pub/Sub, consumer group parameters, and an additional condition that will be appended to the SQL command's WHERE clause.
CREATE OR REPLACEFUNCTIONget_category_messages( category_namevarchar, positionbigint DEFAULT0, batch_sizebigint DEFAULT1000, correlationvarchar DEFAULTNULL, consumer_group_memberbigint DEFAULTNULL, consumer_group_sizebigint DEFAULTNULL, conditionvarchar DEFAULTNULL)
Name | Description | Type | Default | Example |
---|---|---|---|---|
category_name | Name of the category to retrieve messages from | varchar | someCategory | |
position (optional) | Global position to start retrieving messages from | bigint | 1 | 11 |
batch_size (optional) | Number of messages to retrieve | bigint | 1000 | 111 |
correlation (optional) | Category or stream name recorded in message metadata'scorrelationStreamName attribute to filter the batch by | varchar | NULL | someCorrelationCategory |
consumer_group_member (optional) | The zero-based member number of an individual consumer that is participating in a consumer group | bigint | NULL | 1 |
consumer_group_size (optional) | The size of a group of consumers that are cooperatively processing a single category | bigint | NULL | 2 |
condition (optional) | SQL condition to filter the batch by | varchar | NULL | messages.time >= current_time |
SELECT*FROM get_category_messages('someCategory',1,1000, correlation=>'someCorrelationCategory', consumer_group_member=>1, consumer_group_size=>2, condition=>'messages.time >= current_time');
-[ RECORD 1 ]---+---------------------------------------------------------id | 28d8347f-677e-4738-b6b9-954f1b15463bstream_name | someCategory-123type | SomeTypeposition | 0global_position | 111data | {"attribute": "some value"}metadata | {"correlationStreamName": "someCorrelationCategory-123"}time | 2019-11-24 17:51:49.836341-[ RECORD 2 ]---+---------------------------------------------------------id | 57894da7-680b-4483-825c-732dcf873e93stream_name | someCategory-456type | SomeTypeposition | 1global_position | 1111data | {"attribute": "some value"}metadata | {"correlationStreamName": "someCorrelationCategory-123"}time | 2019-11-24 17:51:49.879011
Note: WheresomeStream-123
is astream name,someStream
is acategory. Reading thesomeStream
category retrieves messages from all streams whose names start withsomeStream
and are followed by an ID, or wheresomeStream
is the whole stream name.
- write_message
- get_stream_messages
- get_category_messages
- get_last_stream_message
- stream_version
- id
- cardinal_id
- category
- is_category
- acquire_lock
- hash_64
- message_store_version
The message store is a single table namedmessages
.
Column | Description | Type | Default | Nullable |
---|---|---|---|---|
id | Identifier of a message record | UUID | gen_random_uuid() | No |
stream_name | Name of stream to which the message belongs | varchar | No | |
type | The type of the message | varchar | No | |
position | The ordinal position of the message in its stream. Position is gapless. | bigint | No | |
global_position | Primary key. The ordinal position of the message in the entire message store. Global position may have gaps. | bigint | No | |
data | Message payload | jsonb | NULL | Yes |
metadata | Message metadata | jsonb | NULL | Yes |
time | Timestamp when the message was written. The timestamp does not include a time zone. | timestamp | now() AT TIME ZONE 'utc' | No |
Name | Columns | Unique | Note |
---|---|---|---|
messages_id | id | Yes | Enforce uniqueness as secondary key |
messages_stream | stream_name, position | Yes | Ensures uniqueness of position number in a stream |
messages_category | category(stream_name), global_position, category(metadata->>'correlationStreamName') | No | Used when retrieving by category name |
By default, the message store database is namedmessage_store
.
All message store database objects are contained within a schema namedmessage_store
.
A role namedmessage_store
is created. Themessage_store
role is given theLOGIN
attribute, but no password is assigned. A passwordcan be assigned to the role, or themessage_store
role can begranted to another Postgres user.
View complete source code at:
https://github.com/message-db/message-db/tree/master/database
The Postgres Message Store is released under theMIT License.
About
Microservice native message and event store for Postgres
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors10
Uh oh!
There was an error while loading.Please reload this page.