PostgreSQL (TimescaleDB) で SKIP LOCKED という機能がある事を知ったため、自分用のメモ。
最大処理時間が数十時間かかる可能性があるワーカーの管理に、MQ 的な仕組みを使いたいと考え、できるだけ楽に使える仕組みを検討することにした。
MQ を除くと色々調べてみるとどうやらバックエンドに Redis を使うというのが多かったので、一旦そちらの方向で検討してみた。
Redis 自体は公式が提供しているクラウドサービスを利用する前提。
https://redis.com/redis-enterprise-cloud/overview/
日本リージョンがあり、容量が少なければかなり安い。
ただ永続性という点では Redis を使うのに抵抗があることや、新しくサービスを増やしたくなかったので、Redis の採用は見送った。
普段から利用しているTimescaleDB は PostgreSQL の拡張ということもあり、PostgreSQL で何か実現できないか検討してみた。
特にsqlc を利用している事もあり SQL にまとめられるのは理想だし、永続化の事を考えるとできる限り PostgreSQL を利用したい。
色々調べてみるとどうやら PostgreSQL 9.5 で SKIP LOCKED という仕組みが入ったらしい。
これは簡単に言うと行ロックされている行をスキップできる という仕組みの模様。
ワーカーがジョブを取り出す時、ロックを取ってそのジョブの状態を変更するという SQL を書くことになるだろうと思っていたので、これがあれば、どうやらやりたいことができそうと判断した。
性能が要件に一切ない事もあり、PostgreSQL でまかなえるならなんとかまかないたい。
利用している TimescaleDB は開発元が提供しているマネージドサービスを利用している事もあり、運用コストがかかっていない。
実際 1 年以上稼働しっぱなしかつ、アップデートも定期的に行っているが問題は一切起きていない。
テーブル的には job というのを用意して、このテーブルにジョブを追加する。
CREATETYPE job_statusASENUM(-- ジョブが追加された'queued',-- ジョブ実行中'in_progress',-- キャンセル処理待ちのジョブ'waiting',-- ジョブ完了'completed','success','failure',-- キャンセルされたジョブ'cancelled','time_out','skipped');CREATETABLE job( pk BIGSERIALPRIMARYKEY,-- ジョブの状態 job_status job_statusNOTNULL,-- ジョブ追加時のタイムスタンプ job_add_timestamp TIMESTAMPTZNOTNULL,-- ジョブ開始時のタイムスタンプ job_start_timestamp TIMESTAMPTZ,-- ジョブキャンセル時のタイムスタンプ job_cancel_timestamp TIMESTAMPTZ,-- ジョブ完了時のタイムスタンプ job_complete_timestamp TIMESTAMPTZ,-- ジョブ失敗時のタイムスタンプ job_failure_timestmap TIMESTAMPTZ,-- ジョブの中身 body JSONBNOTNULL,-- ジョブがキャンセルされたかどうか canceledBOOLEANDEFAULTFALSENOTNULL,);-- name: AddJob :execINSERTINTO job( job_status, job_add_timestamp, body)VALUES('queued',@job_add_timestamp,@body);-- name: GetJob :oneSELECT*FROM jobWHERE job_status='queued'ORDERBY job_add_timestampASCLIMIT1FORUPDATESKIP LOCKED;TX で別のワーカーがロックしていない queued 状態のジョブを取得して、job_status を in_progress に変更し、job_start_timestamp を追加しておく。
見つからなければそのまま何もしない。
ワーカー側では定期的にジョブの状態を確認しにいき、キャンセルになっていたら、ワーカー側の処理を停止してジョブをキャンセルする。
一定間隔で起動し、GetJob を行い、ジョブがあれば処理を行い、ジョブがなければ処理は行わない。
Systemd Timer ユニットは発火中に、再度タイマーが発火してもスキップしてくれる。
sqlc を利用して Go でテストコードを書いて、dockertest で TimescaleDB で E2E テストを行ったところ、期待通りの動作をすることを確認できた。
性能が要件になければ、 SKIP LOCKED を利用したジョブキューは自分の用途にはぴったりだった。
