Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Lightweight Component Model and Messaging Framework based on ØMQ

License

NotificationsYou must be signed in to change notification settings

p-ranav/iris

Repository files navigation

iris is aC++17 header-only library that provides acomponent model and messaging framework based onZeroMQ.

Prerequisites

  • Requires a compiler withC++17 standard compliance
  • Requires an installation oflibzmq

Table of Contents

Component Model

Aniris::Component is a building block - A reusable piece of software that can be instantiated and connected with other components. Think LEGO. Large and complex software systems can be assembled by composing small, tested component building blocks.

iris components support:

  • A variety of communication ports and patterns:Publisher,Subscriber,Client,Server,AsyncServer, andBrokers
  • Periodic and oneshot timers that can trigger the component into action
  • A speedy multi-threaded task system with task stealing
  • Cereal-based serialization and deserialization of complex structures
  • ZeroMQ-based messaging

Getting Started

Simply include#include <iris/iris.hpp> and you're good to go. Start by creating aniris::Component:

iris::Component my_component;

You can optionally specify the number of threads the component can use in its task system, e.g., this component will spawn 2 executor threads that process records in its respective message queues.

iris::Componentmy_component(iris::threads =2);

NOTE: Hereiris::threads is aNamedType parameter. It is not necessary to use named parameters but in certain cases they improve code readability.

Time-Triggered Operations

iris components can be triggered periodically by timers. To create a timer, callcomponent.set_interval. The following component is triggered every 500ms. Timers are an excellent way to kickstart a communication pattern, e.g., publish messages periodically to multiple sinks.

CallComponent.start() to start the component - This starts the component executor threads, listener threads, timers etc.

#include<iostream>#include<iris/iris.hpp>usingnamespaceiris;intmain() {  Component my_component;  my_component.set_interval(period =500,                            on_triggered = [] { std::cout <<"Timer fired!\n"; });  my_component.start();}

One-shot Timers

Usecomponent.set_timeout to create a one-shot timer that triggers the component after a set delay.

// oneshot_timers.cpp#include<iris/iris.hpp>usingnamespaceiris;#include<iostream>intmain() {  Component c;  c.set_timeout(delay =1000,                on_triggered = [] { std::cout <<"1.0 second Timeout!" << std::endl;                 });  c.set_timeout(delay =2500,                on_triggered = [] { std::cout <<"2.5 second Timeout!" << std::endl;                 });  c.set_timeout(delay =5000,                 on_triggered = [&] {                    std::cout <<"Stopping component" << std::endl;                    c.stop();                });  c.start();}

Noice that the component is stopped after 5 seconds -component.stop() stops the task scheduler from further processing of tasks.

Publish-Subscribe Interactions

Publish/Subscribe is classic pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Messages are published without the knowledge of what or if any subscriber of that knowledge exists.

In this example (samples/nginx_log_publisher), we will be parsing an Nginx log file and publishing each log entry. Here's the log file format:

[{"time":"17/May/2015:08:05:32 +0000","remote_ip":"93.180.71.3","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"},{"time":"17/May/2015:08:05:23 +0000","remote_ip":"93.180.71.3","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"},{"time":"17/May/2015:08:05:24 +0000","remote_ip":"80.91.33.133","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.17)"},{"time":"17/May/2015:08:05:34 +0000","remote_ip":"217.168.17.5","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 200,"bytes": 490,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.10.3)"},{"time":"17/May/2015:08:05:09 +0000","remote_ip":"217.168.17.5","remote_user":"-","request":"GET /downloads/product_2 HTTP/1.1","response": 200,"bytes": 490,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.10.3)"},{"time":"17/May/2015:08:05:57 +0000","remote_ip":"93.180.71.3","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"},{"time":"17/May/2015:08:05:02 +0000","remote_ip":"217.168.17.5","remote_user":"-","request":"GET /downloads/product_2 HTTP/1.1","response": 404,"bytes": 337,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.10.3)"},{"time":"17/May/2015:08:05:42 +0000","remote_ip":"217.168.17.5","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 404,"bytes": 332,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.10.3)"},{"time":"17/May/2015:08:05:01 +0000","remote_ip":"80.91.33.133","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.17)"},{"time":"17/May/2015:08:05:27 +0000","remote_ip":"93.180.71.3","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"},{"time":"17/May/2015:08:05:12 +0000","remote_ip":"217.168.17.5","remote_user":"-","request":"GET /downloads/product_2 HTTP/1.1","response": 200,"bytes": 3316,"referrer":"-","agent":"-"},{"time":"17/May/2015:08:05:49 +0000","remote_ip":"188.138.60.101","remote_user":"-","request":"GET /downloads/product_2 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.9.7.9)"},{"time":"17/May/2015:08:05:14 +0000","remote_ip":"80.91.33.133","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.16)"},{"time":"17/May/2015:08:05:45 +0000","remote_ip":"46.4.66.76","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 404,"bytes": 318,"referrer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)"},{"time":"17/May/2015:08:05:26 +0000","remote_ip":"93.180.71.3","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 404,"bytes": 324,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"},{"time":"17/May/2015:08:05:22 +0000","remote_ip":"91.234.194.89","remote_user":"-","request":"GET /downloads/product_2 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.9.7.9)"},{"time":"17/May/2015:08:05:07 +0000","remote_ip":"80.91.33.133","remote_user":"-","request":"GET /downloads/product_1 HTTP/1.1","response": 304,"bytes": 0,"referrer":"-","agent":"Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.17)"},{"time":"17/May/2015:08:05:38 +0000","remote_ip":"37.26.93.214","remote_user":"-","request":"GET /downloads/product_2 HTTP/1.1","response": 404,"bytes": 319,"referrer":"-","agent":"Go 1.1 package http"},.........

First we can write a message structNginxLogEntry.iris usesCereal for serialization and deserialization of messages for transport. OurNginxLogEntry struct has aserialize method for this purpose.

// nginx_log_entry.hpp#pragma once#include<string>structNginxLogEntry {  std::string time;  std::string remote_ip;  std::string remote_user;  std::string request;unsigned response;unsigned bytes;  std::string agent;template<classArchive>voidserialize(Archive &ar) {ar(time, remote_ip, remote_user, request, response, bytes, agent);  }};

We can start by writing our subscriber.

  • Create a subscriber usingcomponent.create_subscriber
  • The subscriber port timeout is how long the subscriber'srecv() call will wait before timing out and checking again. Timeouts are essential to keeping the component reactive to commands like component.stop(). SeeZMQ_RCVTIMEO for more details.
  • The signature of a subscriber callback isstd::function<void(Message)>
  • You can deserialize the received message usingMessage.get<T>()
  • Here, we are receiving log entries and printing select fields
// subscriber.cpp#include<iostream>#include<iris/iris.hpp>usingnamespaceiris;#include"nginx_log_entry.hpp"intmain() {  Componentreceiver(threads =2);  receiver.create_subscriber(      endpoints = {"tcp://localhost:5555"},       timeout =5000,      on_receive = [](Message msg) {auto entry = msg.get<NginxLogEntry>();          std::cout <<"[" << entry.time <<"]"                    <<"{" << entry.remote_ip <<"}"                    <<"->" << entry.request                    <<"->" << entry.response << std::endl;      });  receiver.start();}

Now, for the publisher. When managing state, it is cleaner to inherit fromiris::Component and write a class.

  • Create a class namedNginxLogPublisher that inherits fromiris::Component
  • Create a publisher by callingcreate_publisher - We have inherited this method
  • Parse the JSON log file
  • Create a periodic timer usingset_interval and publish log messages
  • Calljoin() on the class destructor to join on the task system executor threads
// publisher.cpp#include<iostream>#include<iris/iris.hpp>usingnamespaceiris;#include"nginx_log_entry.hpp"#include"json.hpp"#include<fstream>classNginxLogPublisher :publicComponent {  Publisher pub;  nlohmann::json j;  nlohmann::json::iterator it;public:NginxLogPublisher(const std::string &filename) {// read a JSON file    std::ifstreamstream("nginx_logs.json");    stream >> j;    it = j.begin();// Craete publisher    pub =create_publisher(endpoints = {"tcp://*:5555"});// Publish periodicallyset_interval(period =200,                  on_triggered = [this] {auto element = *it;                     std::cout <<"Published:" << element << std::endl;                     pub.send(NginxLogEntry{                         .time = element["time"].get<std::string>(),                         .remote_ip = element["remote_ip"].get<std::string>(),                         .remote_user = element["remote_user"].get<std::string>(),                         .request = element["request"].get<std::string>(),                         .response = element["response"].get<unsigned>(),                         .bytes = element["bytes"].get<unsigned>(),                         .agent = element["agent"].get<std::string>()                     });                     ++it;                 });  }~NginxLogPublisher() {join();  }};intmain() {  NginxLogPublisherpublisher("nginx_logs.json");  publisher.start();}

Synchronous Request-Reply Interactions

The client-server model is another basic interaction pattern. Client sends a request to a remote server and waits for a reply. The server receives the request and calls a server-side callback to respond to the client.

In this example (samples/music_tag_server), we will create a music database server that can be queried for album metadata. Clients can request for album metadata using a key-value pair, e.g.,{"name", "Dark Side of the Moon"}. The server will respond with anstd::optional<Album> - If the album is found in its database, it will return it, else it will returnstd::nullopt.

Our JSON database looks like this:

[    {"catalog":"R2 552927","name":"Paranoid","artist":"Black Sabbath","year": 1970,"genre":"Heavy Metal","tracks": ["War Pigs","Paranoid","Planet Caravan","Iron Man","Electric Funeral","Hand of Doom","Jack the Stripper / Fairies Wear Boots"]    },    {"catalog":"7243 8 35870 2 5","name":"The Number of the Beast",.........

Let's start with the expected server response - theAlbum struct.

// album.hpp#pragma once#include<iris/cereal/types/vector.hpp>#include<string>#include<vector>structAlbum {  std::string name;  std::string artist;int year;  std::string genre;  std::vector<std::string> tracks;template<classArchive>voidserialize(Archive &ar) {ar(name, artist, year, genre, tracks);  }};

To create a server port, callcomponent.create_server.

  • Server callbacks have the signaturestd::function<void(Request, Response&)>
  • The server port timeout is how long the server'srecv() call will wait before timing out and checking again. Timeouts are essential to keeping the component reactive to commands likecomponent.stop(). SeeZMQ_RCVTIMEO for more details.
  • OK so what's happening here?
    • The server receives a request that is actually anstd::tuple<std::string, std::string>
    • We deserialize this request usingrequest.get<T>
    • Then we search in our JSON database if this key-value pair exists.
    • If yes, we construct anAlbum object and set the server callback response
    • If not, we set the server callback response tostd::nullopt
// server.cpp#include"album.hpp"#include<iris/iris.hpp>usingnamespaceiris;#include<iostream>#include<map>#include"json.hpp"#include<fstream>#include<iris/cereal/types/optional.hpp>#include<iris/cereal/types/tuple.hpp>intmain() {// Load JSON database  nlohmann::json j;  std::ifstreamstream("database.json");  stream >> j;  Component server;  server.create_server(      endpoints = {"tcp://*:5510"},       timeout =500,      on_request = [&](Request request, Response &response) {// Request from clientauto kvpair = request.get<std::tuple<std::string, std::string>>();auto key = std::get<0>(kvpair);auto value = std::get<1>(kvpair);          std::cout <<"Received request {key:"                     << key <<", value:" << value <<"}\n";// Response to be filled and sent back// Either a valid album struct or empty          std::optional<Album> album{};// Find the album in the JSON databaseauto it =std::find_if(j.begin(), j.end(),             [&key, &value](constauto& element) {if (key =="year")return element[key] ==std::stoi(value);elsereturn element[key] == value;          });// Populate the response fieldsif (it != j.end()) {            album = Album {              .name = (*it)["name"].get<std::string>(),              .artist = (*it)["artist"].get<std::string>(),              .year = (*it)["year"].get<unsigned>(),              .genre = (*it)["genre"].get<std::string>(),              .tracks = (*it)["tracks"].get<std::vector<std::string>>()            };          }// Set response          response.set(album);      });  server.start();}

Now, we can write a client that calls this server. Create a client port usingcomponent.create_client.

NOTEiris clients implement thelazy pirate pattern - Rather than doing a blocking receive,iris clients:

  • Send a request to the server
  • Poll the REQ socket and receive from it only when it's sure a reply has arrived.
  • Resend a request, if no reply has arrived within a timeout period.
  • Abandon the transaction if there is still no reply after several requests.

So,iris::Clients require atimeout (waiting on server response) and a total number ofretries when this timeout occurs.

// client.cpp#include"album.hpp"#include<iostream>#include<optional>#include<iris/iris.hpp>usingnamespaceiris;#include<iris/cereal/types/optional.hpp>#include<iris/cereal/types/tuple.hpp>intmain(int argc,char *argv[]) {  std::tuple<std::string, std::string> request;if (argc !=3) {    std::cout <<"Usage: ./<executable> key value\n";return0;  }else {    request = {argv[1], argv[2]};  }  Componentc(threads =1);  c.start();auto client = c.create_client(endpoints = {"tcp://127.0.0.1:5510"},                                timeout =2500,                                 retries =3);// Send request to serverauto response = client.send(request);// Check that the server has called `Response.set()`if (response.has_value()) {// Parse response and print result if availableauto album = response.get<std::optional<Album>>();if (album.has_value()) {auto metadata = album.value();      std::cout <<"- Received album:\n";      std::cout <<"    Name:" << metadata.name <<"\n";      std::cout <<"    Artist:" << metadata.artist <<"\n";      std::cout <<"    Year:" << metadata.year <<"\n";      std::cout <<"    Genre:" << metadata.genre <<"\n";      std::cout <<"    Tracks:\n";for (size_t i =0; i < metadata.tracks.size(); ++i) {        std::cout <<"" << i <<"." << metadata.tracks[i] <<"\n";      }    }else {      std::cout <<"Album not found!\n";    }      }else {    std::cout <<"Response not set by server\n";  }  c.stop();}

Asynchronous Request-Reply Interactions

Rather than having one client request work from one worker can we get any number of clients to request work from any number of workers? We could pre load each client with a list of workers and have each client talk directly to a worker. This works, but what if we add or remove workers, we then need to update every client. A better solution would be to have a broker which both clients and workers connect to and is responsible for passing messages back and forth. Brokers can deal with many simultaneous requests and responses using routers and dealers. Routers are like asynchronous response sockets and dealers like asynchronous request sockets.

For this example (samples/number_stats_server), let's assume we have some servers (workers) that can calculate the mean and standard deviation of an array of numbers.The client will send an array of numbers to the broker. The broker will forward this request to one of many async servers. These async servers/workers are connected to the broker and waiting for work.

Here is the struct we will be passing to the server(s). On the client-side, we will prepare a random array of 3 doubles and pass to the server.

// numbers.hpp#pragma once#include<iris/cereal/types/vector.hpp>#include<vector>structNumbers {  std::vector<double> values;autosize()const {return values.size(); }autobegin()const {return values.begin(); }autoend()const {return values.end(); }template<classArchive>voidserialize(Archive &ar) {ar(values);  }};

and here is the response we expect from the server. AStatistics object with mean and standard deviation:

// statistics.hpp#pragma oncestructStatistics {double mean;double stdev;template<classArchive>voidserialize(Archive &ar) {ar(mean, stdev);  }};

Our broker component is very simple. It forwards request on port5510 to port5515 where one of our workers will be waiting to receive work.

NOTE Below, we are creating a component with 0 threads - The task system is not needed for pure broker components as there are no tasks to execute. Brokers operate at the ZeroMQ level, simply forwarding requests and responses.

NOTE Broker components, like any other component, can have other communication ports and timers. If you have these, then you need executor threads.

#include<iostream>#include<iris/iris.hpp>usingnamespaceiris;intmain() {  Componentb(threads =0);  b.create_broker(    router_endpoints = {"tcp://*:5510"},    dealer_endpoints = {"tcp://*:5515"}  );  b.start();}

Here is our async server. Create workers like this usingComponent.create_async_server.

This server:

  • receives aRequest object that is deserialized into aNumbers structures.
  • calculates the mean and standard deviation of the array.
  • sets the response usingresponse.set

NOTEiris::AsyncServer is not very different fromiris::Server. Instead ofbinding to a ZeroMQ socket and waiting to receive requests, anAsyncServerconnects with a broker and waits for requests.

#include<iostream>#include"numbers.hpp"#include"statistics.hpp"#include<iris/iris.hpp>usingnamespaceiris;#include<algorithm>#include<numeric>#include<cmath>intmain() {  Componentworker(threads =3);  worker.create_async_server(      endpoints = {"tcp://localhost:5515"},       timeout =500,      on_request = [&](Request request, Response &res) {auto numbers = request.get<Numbers>();          std::cout <<"Received numbers: {" << numbers.values[0]                     <<"," << numbers.values[1]                     <<"," << numbers.values[2] <<"}\n";// Calculate meandouble sum =std::accumulate(numbers.begin(), numbers.end(),0.0);double mean = sum / numbers.size();// Calculate standard deviationdouble accum =0.0;std::for_each(numbers.begin(), numbers.end(), [&](constdouble d) {            accum += (d - mean) * (d -mean);          });double stdev =std::sqrt(accum / numbers.size());// Set the response          res.set(Statistics{.mean = mean, .stdev = stdev});          std::cout <<"Calculated stats successfully\n";      });  worker.start();}

Finally, here's our client. This client:

  • Creates an array of numbers and sends it to the broker
  • The broker will forward to one of the async servers at its disposal.
  • One of the servers will respond and the response is forwarded back to this client
#include<iostream>#include"numbers.hpp"#include"statistics.hpp"#include<iris/iris.hpp>usingnamespaceiris;intmain() {  Componentc(threads =2);auto client = c.create_client(endpoints = {"tcp://localhost:5510"},                                timeout =2500,                                 retries =3);double i =0.0, j =1.0, k =2.0;  c.set_interval(      period =2000,       on_triggered = [&] {          std::cout <<"[Sent] numbers = {" << i <<"," << j <<"," << k <<"}\n";auto response = client.send(Numbers{.values = {i, j, k}});auto stats = response.get<Statistics>();          std::cout <<"[Received] mean =" << stats.mean                     <<"; stdev =" << stats.stdev                    << std::endl;          i +=0.3;          j +=0.5;          k +=0.9;      });  c.start();}

Building Samples

There are a number of samples in thesamples/ directory. You can build these samples by running the following commands.

mkdir build&&cd buildcmake -DIRIS_SAMPLES=ON ..make

Contributing

Contributions are welcome, have a look at theCONTRIBUTING.md document for more information.

License

The project is available under theMIT license.


[8]ページ先頭

©2009-2026 Movatter.jp