Lightweight Portable Message Queue Using SQLite




Temporary and permanent message queues for R. Built on top of SQLitedatabases. 'SQLite' provides locking, and makes it possible to detectcrashed consumers. Crashed jobs can be automatically marked as "failed",or put back in the queue again, potentially a limited number of times.
Stable version:
install.packages("liteq")Development versiot:
liteq implements a serverless message queue system in R.It can handle multiple databases, and each database can containmultiple queues.
liteq uses SQLite to store a database of queues, and uses other,temporary SQLites databases for locking, and finding crashed workers(see below).
In the following we create a queue in a temporary queue database.The database will be removed if the R session quits.
db<- tempfile()q<- ensure_queue("jobs",db=db)q#> [[1]]#> liteq queue 'jobs'
Note thatensure_queue() is idempotent, if you call it again on the samedatabase, it will return the queue that was created previously. So it issafe to call it multiple times, even from multiple processes. In case ofmultiple processes, the locking mechanism eliminates race conditions.
To publish a message in the queue, callpublish() on the queue object:
publish(q,title="First message",message="Hello world!")publish(q,title="Second message",message="Hello again!")list_messages(q)
#> id title status#> 1 1 First message READY#> 2 2 Second message READY
Aliteq message has a title, which is a string scalar, and the messagebody itself is a string scalar as well. To use more complex data types inmessages, you need to serialize them using theserialize() function (setascii toTRUE!), or convert them to JSON with thejsonlite package.
Two functions are available to consume a message from a queue.try_consume() returns immediately, either with a message (liteq_messageobject), orNULL if the queue is empty. Theconsume() function blocksif the queue is empty, and waits until a message appears in it.
#> liteq message from queue 'jobs':#> First message (12 B)
The title and the message body are available as fields of the messageobject:
When a consumer is done processing a message it must callack() on themessage object, to notify the queue that it is safe to remove the message.If the consumer fails to process a message, it can callnack() (negativeackowledgement) on the message object. Then the status of the message willbe set to"FAILED". Failed messages can be removed from the queue, orput back in the queue again, depending on the application.
#> id title status#> 1 2 Second message READY
msg2<- try_consume(q)nack(msg2)list_messages(q)
#> id title status#> 1 2 Second message FAILED
The queue is empty now, sotry_consume() returnsNULL:
If a worker crashes without calling eitherack() ornack() on a message,then this messages will be put back in the queue the next time a message isrequested from the queue.
To make this possible, each delivered message keeps an open connection toa lock file, and crashed workers are found by the absense of this openconnection. In R basically means that the worker is considered as crashedif the R process has no reference to the message object.
Note, that this also means that having many workers at the same time meansthat it is possible to reach the maximum number of open connections byR or the operating system.
MIT © Gábor Csárdi