Movatterモバイル変換


[0]ホーム

URL:


時雨堂ノート時雨堂ノート
時雨堂ノートPublicationへの投稿
🗿

PostgreSQL で SKIP LOCKED を利用する

に公開
2023/03/03

PostgreSQL (TimescaleDB) で SKIP LOCKED という機能がある事を知ったため、自分用のメモ。

目的

最大処理時間が数十時間かかる可能性があるワーカーの管理に、MQ 的な仕組みを使いたいと考え、できるだけ楽に使える仕組みを検討することにした。

  • クラウドのサービスは利用しない
    • AWS / GCP / Azure といったサービスを普段から利用していない
  • Go で利用できること
  • 普段の負荷は 1 時間 10 リクエストもない
    • 負荷対策での利用ではない
  • MQ をもし使う場合は RabbitMQ だが、今回はオーバーキル
    • Erlang VM で、ソースも一通り読んだことあるので、何かあってもなんとかなる
  • 一つの処理が数十時間かかることもあり、ジョブにはキャンセルをできるようにしたい
    • 処理が終わるまでは Ack を返さない仕組みが必要

Redis の検討

MQ を除くと色々調べてみるとどうやらバックエンドに Redis を使うというのが多かったので、一旦そちらの方向で検討してみた。

Redis 自体は公式が提供しているクラウドサービスを利用する前提。

https://redis.com/redis-enterprise-cloud/overview/

日本リージョンがあり、容量が少なければかなり安い。

ただ永続性という点では Redis を使うのに抵抗があることや、新しくサービスを増やしたくなかったので、Redis の採用は見送った。

PostgreSQL の検討

普段から利用しているTimescaleDB は PostgreSQL の拡張ということもあり、PostgreSQL で何か実現できないか検討してみた。

特にsqlc を利用している事もあり SQL にまとめられるのは理想だし、永続化の事を考えるとできる限り PostgreSQL を利用したい。

SKIP LOCKED

色々調べてみるとどうやら PostgreSQL 9.5 で SKIP LOCKED という仕組みが入ったらしい。
これは簡単に言うと行ロックされている行をスキップできる という仕組みの模様。

ワーカーがジョブを取り出す時、ロックを取ってそのジョブの状態を変更するという SQL を書くことになるだろうと思っていたので、これがあれば、どうやらやりたいことができそうと判断した。

性能が要件に一切ない事もあり、PostgreSQL でまかなえるならなんとかまかないたい。
利用している TimescaleDB は開発元が提供しているマネージドサービスを利用している事もあり、運用コストがかかっていない。

実際 1 年以上稼働しっぱなしかつ、アップデートも定期的に行っているが問題は一切起きていない。

イメージコード

  • SQL は sqlc 文法を利用している

ジョブテーブル

テーブル的には job というのを用意して、このテーブルにジョブを追加する。

CREATETYPE job_statusASENUM(-- ジョブが追加された'queued',-- ジョブ実行中'in_progress',-- キャンセル処理待ちのジョブ'waiting',-- ジョブ完了'completed','success','failure',-- キャンセルされたジョブ'cancelled','time_out','skipped');CREATETABLE job(  pk BIGSERIALPRIMARYKEY,-- ジョブの状態  job_status job_statusNOTNULL,-- ジョブ追加時のタイムスタンプ  job_add_timestamp TIMESTAMPTZNOTNULL,-- ジョブ開始時のタイムスタンプ  job_start_timestamp TIMESTAMPTZ,-- ジョブキャンセル時のタイムスタンプ  job_cancel_timestamp TIMESTAMPTZ,-- ジョブ完了時のタイムスタンプ  job_complete_timestamp TIMESTAMPTZ,-- ジョブ失敗時のタイムスタンプ  job_failure_timestmap TIMESTAMPTZ,-- ジョブの中身  body JSONBNOTNULL,-- ジョブがキャンセルされたかどうか  canceledBOOLEANDEFAULTFALSENOTNULL,);

ジョブ追加

-- name: AddJob :execINSERTINTO job(    job_status,    job_add_timestamp,    body)VALUES('queued',@job_add_timestamp,@body);

ジョブ取得

-- name: GetJob :oneSELECT*FROM jobWHERE job_status='queued'ORDERBY job_add_timestampASCLIMIT1FORUPDATESKIP LOCKED;

TX で別のワーカーがロックしていない queued 状態のジョブを取得して、job_status を in_progress に変更し、job_start_timestamp を追加しておく。

見つからなければそのまま何もしない。

ジョブのキャンセルチェック

ワーカー側では定期的にジョブの状態を確認しにいき、キャンセルになっていたら、ワーカー側の処理を停止してジョブをキャンセルする。

ワーカーは Systemd Timer ユニットを利用

一定間隔で起動し、GetJob を行い、ジョブがあれば処理を行い、ジョブがなければ処理は行わない。
Systemd Timer ユニットは発火中に、再度タイマーが発火してもスキップしてくれる。

まとめ

sqlc を利用して Go でテストコードを書いて、dockertest で TimescaleDB で E2E テストを行ったところ、期待通りの動作をすることを確認できた。

性能が要件になければ、 SKIP LOCKED を利用したジョブキューは自分の用途にはぴったりだった。

参考資料

時雨堂ノート により固定

時雨堂の商用製品

voluntas

時雨堂

Discussion


[8]ページ先頭

©2009-2025 Movatter.jp