Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Serverless R message queue using SQLite

License

Unknown, MIT licenses found

Licenses found

Unknown
LICENSE
MIT
LICENSE.md
NotificationsYou must be signed in to change notification settings

r-lib/liteq

Repository files navigation

Lightweight Portable Message Queue Using SQLite

R-CMD-checkCRAN RStudio mirror downloadsCodecov test coverage

Temporary and permanent message queues for R. Built on top of SQLitedatabases. 'SQLite' provides locking, and makes it possible to detectcrashed consumers. Crashed jobs can be automatically marked as "failed",or put back in the queue again, potentially a limited number of times.

Installation

Stable version:

install.packages("liteq")

Development versiot:

pak::pak("r-lib/liteq")

Introduction

liteq implements a serverless message queue system in R.It can handle multiple databases, and each database can containmultiple queues.

liteq uses SQLite to store a database of queues, and uses other,temporary SQLites databases for locking, and finding crashed workers(see below).

Usage

Basic usage

library(liteq)

In the following we create a queue in a temporary queue database.The database will be removed if the R session quits.

db<- tempfile()q<- ensure_queue("jobs",db=db)q
#> liteq queue 'jobs'
list_queues(db)
#> [[1]]#> liteq queue 'jobs'

Note thatensure_queue() is idempotent, if you call it again on the samedatabase, it will return the queue that was created previously. So it issafe to call it multiple times, even from multiple processes. In case ofmultiple processes, the locking mechanism eliminates race conditions.

To publish a message in the queue, callpublish() on the queue object:

publish(q,title="First message",message="Hello world!")publish(q,title="Second message",message="Hello again!")list_messages(q)
#>   id          title status#> 1  1  First message  READY#> 2  2 Second message  READY

Aliteq message has a title, which is a string scalar, and the messagebody itself is a string scalar as well. To use more complex data types inmessages, you need to serialize them using theserialize() function (setascii toTRUE!), or convert them to JSON with thejsonlite package.

Two functions are available to consume a message from a queue.try_consume() returns immediately, either with a message (liteq_messageobject), orNULL if the queue is empty. Theconsume() function blocksif the queue is empty, and waits until a message appears in it.

msg<- try_consume(q)msg
#> liteq message from queue 'jobs':#>   First message (12 B)

The title and the message body are available as fields of the messageobject:

msg$title
#> [1] "First message"
msg$message
#> [1] "Hello world!"

When a consumer is done processing a message it must callack() on themessage object, to notify the queue that it is safe to remove the message.If the consumer fails to process a message, it can callnack() (negativeackowledgement) on the message object. Then the status of the message willbe set to"FAILED". Failed messages can be removed from the queue, orput back in the queue again, depending on the application.

ack(msg)list_messages(q)
#>   id          title status#> 1  2 Second message  READY
msg2<- try_consume(q)nack(msg2)list_messages(q)
#>   id          title status#> 1  2 Second message FAILED

The queue is empty now, sotry_consume() returnsNULL:

try_consume(q)
#> NULL

Crashed workers

If a worker crashes without calling eitherack() ornack() on a message,then this messages will be put back in the queue the next time a message isrequested from the queue.

To make this possible, each delivered message keeps an open connection toa lock file, and crashed workers are found by the absense of this openconnection. In R basically means that the worker is considered as crashedif the R process has no reference to the message object.

Note, that this also means that having many workers at the same time meansthat it is possible to reach the maximum number of open connections byR or the operating system.

License

MIT © Gábor Csárdi

About

Serverless R message queue using SQLite

Topics

Resources

License

Unknown, MIT licenses found

Licenses found

Unknown
LICENSE
MIT
LICENSE.md

Code of conduct

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors7


[8]ページ先頭

©2009-2025 Movatter.jp