Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Microservice native message and event store for Postgres

License

NotificationsYou must be signed in to change notification settings

message-db/message-db

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Message DB

Message DB

Microservice Native Event Store and Message Store for Postgres

A fully-featured event store and message store implemented in PostgreSQL for Pub/Sub, Event Sourcing, Messaging, and Evented Microservices applications.

Features

  • Pub/Sub
  • JSON message data
  • Event streams
  • Stream categories
  • Metadata
  • Message queues
  • Message storage
  • Consumer groups
  • Service host
  • Administration tools
  • Reports

Rationale

An event sourcing and Pub/Sub message store built on Postgres for simple cloud or local hosting. A minimalist implementation of the essential features of tools likeEvent Store orKafka, with built-in support for messaging patterns like Pub/Sub, and consumer patterns like consumer groups.

Message DB was extracted from theEventide Project to make it easier for users to write clients in the language of their choosing.

User Guide

A complete user guide is available on the Eventide Project docs site:

http://docs.eventide-project.org/user-guide/message-db/

Installation

Message DB can be installed either as a Ruby Gem, an NPM package, or can simply be cloned from this repository.

Git Clone

git clone git@github.com:message-db/message-db.git

As a Ruby Gem

gem install message-db

As an NPM Module

npm install @eventide/message-db

Create the Postgres Database

Running the database installation script creates the database, schema, table, indexes, functions, views, types, a user role, and limit the user's privileges to the message store's public interface.

Requirements

Make sure that your default Postgres user has administrative privileges.

From the Git Clone

The installation script is in thedatabase directory of the cloned repo. Change directory to themessage-db directory where you cloned the repo, and run the script:

database/install.sh

From the Ruby Executable

If you installed Message DB via RubyGems, a database installation Ruby executable will be installed with themessage-db gem.

The executable will be in the gem executable search path and may also be executed through bundler:

bundleexec mdb-create-db

For more information about Ruby executables installed with themessage-db Ruby Gem, see the Eventide docs on the administration tools that are bundled with the gem:

http://docs.eventide-project.org/user-guide/message-db/tools.html

From the NPM Module

Themessage-db NPM module doesn't ship with any special tooling other than the bundled scripts.

To execute the installation script, navigate to the directory where themessage-db module is installed and run the script:

install.sh

Database Name

By default, the database creation tool will create a database namedmessage_store.

If you prefer either a different database name, you can override the name using theDATABASE_NAME environment variable.

DATABASE_NAME=some_other_database database/install.sh

Uninstalling the Database

If you need to drop the database (for example, on a local dev machine):

database/uninstall.sh

If you're upgrading a previous version of the database:

database/update.sh

API Overview

The message store provides an interface of Postgres server functions that can be used with any programming language or through thepsql command line tool.

Interaction with the underlying store through the Postgres server functions ensures correct writing and reading messages, streams, and categories.

Write a Message

Write a JSON-formatted message to a named stream, optionally specifying JSON-formatted metadata and an expected version number.

write_message(  idvarchar,  stream_namevarchar,  typevarchar,  data jsonb,  metadata jsonb DEFAULTNULL,  expected_versionbigint DEFAULTNULL)

Returns

Position of the message written.

Arguments

NameDescriptionTypeDefaultExample
idUUID of the message being writtenvarchara5eb2a97-84d9-4ccf-8a56-7160338b11e2
stream_nameName of stream to which the message is writtenvarcharsomeStream-123
typeThe type of the messagevarcharWithdrawn
dataJSON representation of the message bodyjsonb{"someAttribute": "some value"}
metadata (optional)JSON representation of the message metadatajsonbNULL{"metadataAttribute": "some meta data value"}
expected_version (optional)Version that the stream is expected to be when the message is writtenbigintNULL11

Usage

SELECT write_message('a11e9022-e741-4450-bf9c-c4cc5ddb6ea3','someStream-123','SomeMessageType','{"someAttribute": "some value"}','{"metadataAttribute": "some meta data value"}');
-[ RECORD 1 ]-+--write_message | 0

Example:https://github.com/message-db/message-db/blob/master/database/write-test-message.sh

Get Messages from a Stream

Retrieve messages from a single stream, optionally specifying the starting position, the number of messages to retrieve, and an additional condition that will be appended to the SQL command's WHERE clause.

get_stream_messages(  stream_namevarchar,  positionbigint DEFAULT0,  batch_sizebigint DEFAULT1000,  conditionvarchar DEFAULTNULL)

Arguments

NameDescriptionTypeDefaultExample
stream_nameName of stream to retrieve messages fromvarcharsomeStream-123
position (optional)Starting position of the messages to retrievebigint011
batch_size (optional)Number of messages to retrievebigint1000111
condition (optional)SQL condition to filter the batch byvarcharNULLmessages.time >= current_time

Usage

SELECT*FROM get_stream_messages('someStream-123',0,1000, condition=>'messages.time >= current_time');
-[ RECORD 1 ]---+---------------------------------------------------------id              | 4b96f09e-104a-4b1f-b198-5b3b46cf1d06stream_name     | someStream-123type            | SomeTypeposition        | 0global_position | 1data            | {"attribute": "some value"}metadata        | {"metaAttribute": "some meta value"}time            | 2019-11-24 17:56:09.71594-[ RECORD 2 ]---+---------------------------------------------------------id              | d94e79e3-cdda-49a3-9aad-ce5d70a5edd7stream_name     | someStream-123type            | SomeTypeposition        | 1global_position | 2data            | {"attribute": "some value"}metadata        | {"metaAttribute": "some meta value"}time            | 2019-11-24 17:56:09.75969

Example:https://github.com/message-db/message-db/blob/master/test/get-stream-messages/get-stream-messages.sh

Get Messages from a Category

Retrieve messages from a category of streams, optionally specifying the starting position, the number of messages to retrieve, the correlation category for Pub/Sub, consumer group parameters, and an additional condition that will be appended to the SQL command's WHERE clause.

CREATE OR REPLACEFUNCTIONget_category_messages(  category_namevarchar,  positionbigint DEFAULT0,  batch_sizebigint DEFAULT1000,  correlationvarchar DEFAULTNULL,  consumer_group_memberbigint DEFAULTNULL,  consumer_group_sizebigint DEFAULTNULL,  conditionvarchar DEFAULTNULL)

Arguments

NameDescriptionTypeDefaultExample
category_nameName of the category to retrieve messages fromvarcharsomeCategory
position (optional)Global position to start retrieving messages frombigint111
batch_size (optional)Number of messages to retrievebigint1000111
correlation (optional)Category or stream name recorded in message metadata'scorrelationStreamName attribute to filter the batch byvarcharNULLsomeCorrelationCategory
consumer_group_member (optional)The zero-based member number of an individual consumer that is participating in a consumer groupbigintNULL1
consumer_group_size (optional)The size of a group of consumers that are cooperatively processing a single categorybigintNULL2
condition (optional)SQL condition to filter the batch byvarcharNULLmessages.time >= current_time

Usage

SELECT*FROM get_category_messages('someCategory',1,1000, correlation=>'someCorrelationCategory', consumer_group_member=>1, consumer_group_size=>2, condition=>'messages.time >= current_time');
-[ RECORD 1 ]---+---------------------------------------------------------id              | 28d8347f-677e-4738-b6b9-954f1b15463bstream_name     | someCategory-123type            | SomeTypeposition        | 0global_position | 111data            | {"attribute": "some value"}metadata        | {"correlationStreamName": "someCorrelationCategory-123"}time            | 2019-11-24 17:51:49.836341-[ RECORD 2 ]---+---------------------------------------------------------id              | 57894da7-680b-4483-825c-732dcf873e93stream_name     | someCategory-456type            | SomeTypeposition        | 1global_position | 1111data            | {"attribute": "some value"}metadata        | {"correlationStreamName": "someCorrelationCategory-123"}time            | 2019-11-24 17:51:49.879011

Note: WheresomeStream-123 is astream name,someStream is acategory. Reading thesomeStream category retrieves messages from all streams whose names start withsomeStream and are followed by an ID, or wheresomeStream is the whole stream name.

Example:https://github.com/message-db/message-db/blob/master/test/get-category-messages/get-category-messages.sh

Full API Reference

Structure

The message store is a single table namedmessages.

Messages Table

ColumnDescriptionTypeDefaultNullable
idIdentifier of a message recordUUIDgen_random_uuid()No
stream_nameName of stream to which the message belongsvarcharNo
typeThe type of the messagevarcharNo
positionThe ordinal position of the message in its stream. Position is gapless.bigintNo
global_positionPrimary key. The ordinal position of the message in the entire message store. Global position may have gaps.bigintNo
dataMessage payloadjsonbNULLYes
metadataMessage metadatajsonbNULLYes
timeTimestamp when the message was written. The timestamp does not include a time zone.timestampnow() AT TIME ZONE 'utc'No

Indexes

NameColumnsUniqueNote
messages_ididYesEnforce uniqueness as secondary key
messages_streamstream_name, positionYesEnsures uniqueness of position number in a stream
messages_categorycategory(stream_name), global_position, category(metadata->>'correlationStreamName')NoUsed when retrieving by category name

Database

By default, the message store database is namedmessage_store.

Schema

All message store database objects are contained within a schema namedmessage_store.

User/Role

A role namedmessage_store is created. Themessage_store role is given theLOGIN attribute, but no password is assigned. A passwordcan be assigned to the role, or themessage_store role can begranted to another Postgres user.

Source Code

View complete source code at:

https://github.com/message-db/message-db/tree/master/database

License

The Postgres Message Store is released under theMIT License.


[8]ページ先頭

©2009-2025 Movatter.jp