Movatterモバイル変換


[0]ホーム

URL:


Tech Rachoエンジニアの「?」を「!」に。
  1. TOP
  2. Ruby / Rails関連
  3. Rails: DBメンテナンス支援ツール "maintenance_tasks" README(翻訳)
  • Ruby / Rails関連

Rails: DBメンテナンス支援ツール "maintenance_tasks" README(翻訳)

概要

MITライセンスに基づいて翻訳・公開いたします。

本記事では、原則としてツール(gem、フレームワーク)の名前をmaintenance_tasksと表記します。

Shopify/maintenance_tasks - GitHub

Shopifyが開発したmaintenance_tasksは、Railsガイドでも推奨されているDBのデータマイグレーション用gemです↓。

参考:10.2 データのマイグレーション -- Active Record マイグレーション - Railsガイド

  • 2025/10/27: 更新

Rails: DBメンテナンス支援ツール "maintenance_tasks" README(翻訳)

maintenance_tasksは、メンテナンスタスクをジョブキューに入れて管理するRailsエンジンです。

このプロジェクトにおける「メンテナンスタスク」は、いわゆるデータのマイグレーション(つまりデータベース内のデータを変更するコード、多くは最終的にスキーママイグレーションをサポートするのが目的)を意味します。
たとえば、NOT NULLカラムを新たに導入する場合、以下を行わなければなりません。

  1. そのカラムをnullable(NULL許容)として追加する
  2. そのカラムに値をバックフィルする
  3. 最後にそのカラムをNOT NULLに変更する

maintenance_tasksのエンジンは、上のステップ2である「バックフィル(backfill: 値の埋め戻し)」を支援します。

メンテナンスタスクはコレクションベースのタスクであり、データベース内のデータをActive Recordで更新するのが普通です。このメンテナンスタスクは一時停止・中断が可能です。また、メンテナンスタスクをバッチで実行することも、データベースの負荷制御のためにスロットリングすることも可能です。

メンテナンスタスクは定期的に実行するものではなく、必要が生じたときに1回だけ実行されることが前提です。メンテナンスタスクは一時的にしか必要とされないので、利用後は削除されるのが普通です。

maintenance_tasksのRailsエンジンには、メンテナンスタスクの「一覧表示」「ステータス確認」「開始」「一時停止」「再起動」を行えるWebベースのUIが搭載されています。

🔗 メンテナンスタスクはどんなときに必要か

maintenance_tasksに搭載されているジョブUIは、あくまで用途に特化した機能に限定されています。maintenance_tasksのRailsエンジンを用いて、目的以外のデータ変更タスク(サポートリクエストのデータ変更など)を提供することも一応可能ですが、そうなるとこのエンジンで提供できる以上の柔軟性が求められることになってしまうので、そうしたユースケースにはmaintenance_tasksではなく通常のアプリケーションコードを使うことをおすすめします。

Active Jobとして実行すべきでないタスクは、おそらくこのgemの用途に合わないでしょう。

  • タスクをバックグラウンドで実行する必然性がなければ、代わりにランナースクリプトの利用を検討してください。
  • タスクを中断可能にする必然性がなければ、通常のActive Jobの利用を検討してください。

メンテナンスタスクは、反復中に中断可能です。
タスクがコレクションベースではない場合や、バッチが極めて大きい場合は、スロットリング(イテレーションごとに休止をはさむ)や中断機能で得られるメリットは限られます。これで問題ない場合もありますが、通常のActive Jobよりも複雑なメンテナンスタスクをわざわざ追加する手間に見合わない可能性もあります。

タスクで更新する対象がデータではなくデータベーススキーマの場合は、メンテナンスタスクではなく、通常のマイグレーションタスクをお使いください。

定期的に発生するタスクを扱うのであれば、maintenance_tasks gemよりも、Active Jobにスケジューラやcron、job-iterationrails_adminのカスタムUIを組み合わせることを検討しましょう。

新規アプリケーション上でseedデータを作成したい場合は、maintenance_tasks gemではなく、db/seeds.rbをお使いください。

途中で終わったマイグレーションを適切に処理できないアプリケーションの場合は、おそらくmaintenance_tasks gemは適切なツールではありません。maintenance_tasks gemは、メンテナンスタスクが「一時停止可能」「キャンセル可能」であることを意図しています。

🔗 インストール

gemをインストールしてジェネレータを実行するには、以下を実行します

bundle add maintenance_tasksbin/rails generate maintenance_tasks:install

このジェネレータは、必要なテーブルをデータベースに追加するためのマイグレーションを作成して実行します。また、maintenance_tasks用のルーティングをconfig/routes.rbにマウントします。デフォルトでは、/maintenance_tasksという新しいパスでメンテナンスタスクにアクセスできます。

このgemでは、Railsのエラーレポーターを使ってエラーを通知します。バグトラッキングサービスを利用している場合はレポーターにサブスクライブするとよいでしょう。詳しくはエラーを通知するを参照してください。

🔗 Active Jobへの依存について

maintenance_tasksフレームワークは、タスクをバックグラウンドで実行するためにActive Jobに依存します。Active Jobのキューイングバックエンドはデフォルトでは非同期です。コードやインフラストラクチャの変更中にタスクの進行状況が失われるのを防ぐために、キューイングを永続化バックエンドに変更することを強く推奨します。キューイングバックエンドの設定方法について詳しくは、Active Jobガイドを参照してください。

🔗 Action ControllerとAction Viewへの依存について

maintenance_tasksフレームワークは、UIのレンダリングにAction ControllerとAction Viewを使います。RailsをAPI専用モードで利用している場合は、API専用アプリケーションで利用するを参照してください。

🔗 オートロードについての注意

maintenance_tasksフレームワークは、:classicモードでのオートロードをサポートしていません。
利用するには、アプリケーションのコードでZeitwerkが使われていることを必ず確認してください。詳しくはRailsガイドの定数のオートロードとリロードを参照してください。

🔗 利用法

maintenance_tasksの典型的なワークフローは以下のような感じになります。

  1. タスクを記述するためのクラスを生成して実行したい作業を記述する
  2. 以下のいずれかの方法でタスクを実行する
  3. 以下のいずれかの方法でタスクを監視する
    • 組み込みのWeb UI
    • タスクの実行ステータスをデータベースで手動確認する
      4.不要になったタスクを削除する(オプション)

🔗 タスクを作成する

タスク作成用のジェネレータが提供されているので、以下を実行して新しいタスクを生成します。

bin/rails generate maintenance_tasks:task update_posts

これで、app/tasks/maintenance/update_posts_task.rbというタスクファイルが作成されます。

生成されたタスクは、MaintenanceTasks::Taskのサブクラスです。このタスクには以下を実装します。

  • collection: 反復処理の対象となるActive Recordリレーション、または配列を返します
  • process: メンテナンスタスクの作業を1個のレコードに対して実行します

オプションとして、タスクに#countメソッドをカスタム実装することで、イテレーションする要素の個数を定義することも可能です。

タスクのtick_total(経過時間)はコレクションのサイズに応じて自動的に算出されますが、必要であればこの値を#countメソッドでオーバーライドできます(例: コレクションのサイズを決定するためのクエリ生成を避けたい場合)。

タスクの例:

# app/tasks/maintenance/update_posts_task.rbmodule Maintenance  class UpdatePostsTask < MaintenanceTasks::Task    def collection      Post.all    end    def process(post)      post.update!(content: "New content!")    end  endend
🔗 バッチサイズをカスタマイズする

Active Recordリレーションを用いてレコードを処理する場合、レコードは内部でバッチで取得されたうえでレコードごとに#processメソッドに渡されます。
メンテナンスタスクがデフォルトで取得するデフォルトのバッチ数は100個ですが、バッチサイズはcollection_batch_sizeマクロで変更可能です。

# app/tasks/maintenance/update_posts_task.rbmodule Maintenance  class UpdatePostsTask < MaintenanceTasks::Task    # Fetch records in batches of 1000    collection_batch_size(1000)    def collection      Post.all    end    def process(post)      post.update!(content: "New content!")    end  endend

🔗 CSV処理用のタスクを作成する

CSVファイルをイテレーションするタスクも作成できます。
CSVタスクを作成するには、Active Storageも設定しておく必要があることにご注意ください。Active Storageで必要な依存関係がアプリケーションのGemfileで指定済みであることを確認してから、セットアップ手順に沿って設定してください。後述のActive Storageサービスのカスタマイズ方法も参照してください。

以下のコマンドを実行してCSVタスクを生成します。

bin/rails generate maintenance_tasks:task import_posts --csv

生成されたタスクはMaintenanceTasks::Taskのサブクラスです。ここに以下を実装します。

  • process:CSV::Rowで行いたいメンテナンスタスクを記述する
# app/tasks/maintenance/import_posts_task.rbmodule Maintenance  class ImportPostsTask < MaintenanceTasks::Task    csv_collection    def process(row)      Post.create!(title: row["title"], content: row["content"])    end  endend
  • posts.csv:
title,contentMy Title,Hello World!

Active Storageのサービスプロバイダにアップロードされたファイルは、ISO 8601形式のタイムスタンプを含み、タスク名をスネークケース形式にする形でリネームされます。

暗黙の#countメソッドは、ファイルの正確な行数を決定するために、ファイル全体を読み込んで解析します。ファイルが数百万行になると処理に時間がかかるので、nilを返すcountを定義する、行数を近似する(例: 新規の行数をカウントする)などの方法で、カウントをスキップすることを検討してください。

def count(task)  task.csv_content.count("\n") - 1end
🔗 CSVオプション

タスクにcsv_collectionキーワード引数を追加することで、RubyのCSVパーサー用のオプションをタスクに渡せるようになります。

# app/tasks/maintenance/import_posts_task.rbmodule Maintenance  class ImportPosts    csv_collection(skip_lines: /^#/, converters: ->(field) { field.strip })    def process(row)      Post.create!(title: row["title"], content: row["content"])    end  endend

これらのオプションは、#で始まる行をスキップするようRuby CSVパーサーに指示し、すべてのフィールドから冒頭と末尾のスペース文字を削除します。これによって、以下のファイルは前述のサンプルとまったく同様に処理されます。

posts.csv:

# コメント行title,content My Title ,Hello World!
🔗 CSVタスクのバッチ処理

タスクでは、CSVをバッチで処理できます。タスク内のcsv_collectionマクロに、以下のようにin_batchesオプションを追加します。

# app/tasks/maintenance/batch_import_posts_task.rbmodule Maintenance  class BatchImportPostsTask < MaintenanceTasks::Task    csv_collection(in_batches: 50)    def process(batch_of_rows)      Post.insert_all(post_rows.map(&:to_h))    end  endend

通常のCSVタスクと同様に、以下のメソッドも実装しておくこと。

  • process: タスクで(CSV::Rowオブジェクトの配列に)行いたい処理をバッチで記述する

このとき、#countはコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(CSVの全行数ではなく)バッチ単位で表示されます。

CSVタスクをバッチ化しない場合、実質的なバッチサイズが1となり、データベース操作の効率が落ちる可能性があります。

🔗 バッチのコレクションを処理する

maintenance_tasks gemはActive Recordのバッチ処理をサポートしています。バッチ処理を使うことで、タスクがデータベースを呼び出す回数を削減できます。
処理をレコード単位ではなくバッチ単位で処理するには、コレクションが返すリレーションのActiveRecord::Batches#in_batchesを使います。
Active Recordのバッチサイズはデフォルトで1000レコードですが、カスタムのサイズも指定可能です。

# app/tasks/maintenance/update_posts_in_batches_task.rbmodule Maintenance  class UpdatePostsInBatchesTask < MaintenanceTasks::Task    def collection      Post.in_batches    end    def process(batch_of_posts)      batch_of_posts.update_all(content: "New content added on #{Time.now.utc}")    end  endend

このタスクには以下のメソッドを実装しておく必要があります。

  • collection:ActiveRecord::Batches::BatchEnumeratorを返します
  • process: タスクの作業をActiveRecord::Relationに対してバッチで行います

このとき、#countはコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(リレーションのレコード数ではなく)バッチ単位で表示されます。

重要!

バッチ処理は、#process#update_all#delete_allを実行する場合に限ってお使いください。
個別のレコードをイテレーションしなければならない場合は、ActiveRecord::Relationを返すコレクションを定義する必要があります。これは内部でバッチ処理を行いますが、レコードを1件のクエリで読み込む点が異なります。
逆にバッチのコレクションは、最初にバッチのレコードから主キーを読み込んでから、#processメソッド内でeach(またはEnumerableの任意のメソッド)を呼び出すたびに追加クエリを実行してレコードを読み込みます。

🔗 コレクションの不要なタスク

場合によっては、シンプルな操作を1個だけ実行したいことがあります(例: バックグラウンドジョブを追加する、外部APIにクエリをかける)。maintenance_tasks gemは、コレクションなしのタスクもサポートしています。

以下を実行して、コレクションなしのタスクを生成します。

bin/rails generate maintenance_tasks:task no_collection_task --no-collection

生成されたタスクはMaintenanceTasks::Taskのサブクラスです。ここに以下を実装します。

  • process: メンテナンスタスクで行いたい処理を実行する
# app/tasks/maintenance/no_collection_task.rbmodule Maintenance  class NoCollectionTask < MaintenanceTasks::Task    no_collection    def process      SomeAsyncJob.perform_later    end  endend

🔗 タスクでカスタムのEnumeratorを使う

サポートされていないコレクション型(APIで取得した外部リソースなど)をイテレーションしなければならない特殊なユースケースでは、タスク内にenumerator_builder(cursor:)を実装することでイテレーションできます。

このメソッドは、[item, cursor]というペアを生成するEnumeratorを返す必要があります。
maintenance_tasksは、イテレーションの現在のカーソル位置を永続化して、タスクが中断または再開したときにcursorと引数として提供します。このcursorStringとして保存されるので、カスタムのEnumeratorは必要に応じてこの値のシリアライズ/デシリアライズを処理する必要があります。

# app/tasks/maintenance/custom_enumerator_task.rbmodule Maintenance  class CustomEnumeratorTask < MaintenanceTasks::Task    def enumerator_builder(cursor:)      after_id = cursor&.to_i      PostAPI.index(after_id: after_id).map { |post| [post, post.id] }.to_enum    end    def process(post)      Post.create!(post)    end  endend

🔗 スロットリング

maintenance_tasksでは大量のデータを変更することが多いため、データベースに負荷をかけることがあります。そのため、maintenance_tasksはスロットリングメカニズムを提供しており、特定の条件が満たされたときにタスクをスロットリングできます。
タスクがスロットリングされると(スロットルブロックがtrueを返すと)、タスクを中断し、バックオフ期間をすぎるとタスクをリトライします。デフォルトのバックオフ期間は30秒です。

スロットリングの条件はブロックを渡す形で指定します。

# app/tasks/maintenance/update_posts_throttled_task.rbmodule Maintenance  class UpdatePostsThrottledTask < MaintenanceTasks::Task    throttle_on(backoff: 1.minute) do      DatabaseStatus.unhealthy?    end    def collection      Post.all    end    def process(post)      post.update!(content: "New content added on #{Time.now.utc}")    end  endend

アプリのスロットリング条件を適切に定義するのは、開発者にかかっています。ShopifyではDatabaseStatus.healthy?を実装する形で、MySQLのさまざまなメトリクス(レプリケーションラグ、DBスレッド、DB書き込み可能かどうかなど)をチェックしています。

タスクには複数のスロットリング条件を定義できます。スロットリング条件は子孫クラスに継承され、新しい条件を追加しても既存の条件に影響しません。

バックオフ期間は、引数なしのProcとして指定することも可能です。

# app/tasks/maintenance/update_posts_throttled_task.rbmodule Maintenance  class UpdatePostsThrottledTask < MaintenanceTasks::Task    throttle_on(backoff: -> { RandomBackoffGenerator.generate_duration } ) do      DatabaseStatus.unhealthy?    end    # ...  endend

🔗 カスタムのタスクパラメータ

タスクを実行するためにパラメータで追加情報を指定する必要が生じることがあります。タスク内でパラメータをActive Model属性として定義することで、#collection#count#processなどのタスクメソッドでパラメータを利用可能になります。

# app/tasks/maintenance/update_posts_via_params_task.rbmodule Maintenance  class UpdatePostsViaParamsTask < MaintenanceTasks::Task    attribute :updated_content, :string    validates :updated_content, presence: true    def collection      Post.all    end    def process(post)      post.update!(content: updated_content)    end  endend

タスクにパラメータを定義するときに、Active Modelのバリデーションも活用できます。パラメータを受け取れるタスクに引数を渡すと、タスクの実行開始前にバリデーションされます。引数はユーザーインターフェイス内のテキスト領域入力経由で指定されるので、引数がタスクで想定している形式に準拠するようにし、必要に応じてサニタイズすることが重要です。

🔗 タスクのパラメータをバリデーションする

タスクの属性は、Active Modelのバリデーション機能でバリデーションできます。属性のバリデーションは、タスクがエンキューされる前に行われます。

たとえば、属性でin:オプションによる範囲バリデーションを使っている場合、その値のセットは画面のドロップダウン項目で表示されます。以下の型がサポートされています。

  • 配列
  • Taskインスタンスをオプションで受け取って配列を返す、Procまたはlambda
  • 引数を1個(Taskインスタンス)受け取って配列を返す、呼び出し可能オブジェクト
  • 配列を返すメソッド(このメソッドはTaskインスタンスで呼び出される)

サポートされる型に合わないenumerableの場合は、代わりにテキストフィールドがレンダリングされます。

🔗 タスクパラメータをマスクする

タスククラスにmask_attributeクラスメソッドを追加することで、タスク属性をUIでマスクできます。
これにより、UI内の引数リストの値が[FILTERED]に置き換えられます。

# app/tasks/maintenance/sensitive_params_task.rbmodule Maintenance  class SensitiveParamsTask < MaintenanceTasks::Task    attribute :sensitive_content, :string    mask_attribute :sensitive_content  endend

パラメータをグローバルなRailsパラメータフィルタでフィルタしている場合は、パラメータをマスクするときにもそれらのパラメータが自動的に考慮されます。つまり、グローバルなRailsパラメータフィルタにパラメータを追加すれば、すべてのタスクでパラメータをマスクできます。

Rails.application.config.filter_parameters += %i[token]
🔗 カーソルカラムをカスタマイズしてパフォーマンスを向上させる

maintenance_tasks gemが依存しているjob-iteration gemは、collectionメソッドが返すリレーションにORDER BY句を追加してレコードをイテレーション可能にします(デフォルトではidカラム順でイテレーションされます)。

job-iteration gemは、カーソルの順序付けにどのカラムを使うかというbuild_active_record_enumerator_on_recordsコンフィグをサポートしています。

maintenance-tasks gemは、job-iteration gemが提供するカーソルカラムの制御機能を、MaintenanceTasks::Taskクラスのcursor_columnsメソッドで公開します。
cursor_columnsnilを返す場合、クエリの結果は主キー順になります。イテレーション中にカーソルカラムの値が変更されると、レコードがスキップされたり、同じレコードが何度も生成されてしまう可能性があります。

module Maintenance  class UpdatePostsTask < MaintenanceTasks::Task    def cursor_columns      [:created_at, :id]    end    def collection      Post.where(created_at: 2.days.ago...1.hour.ago)    end    def process(post)      post.update!(content: "updated content")    end  endend

🔗 instrumentationイベントへのサブスクライブ

特定のタスクイベントに対して操作を行いたい場合は、後述するタスクコールバックセクションを参照してください。

ただし、すべてのイベントをサブスクライブしたい場合は、以下のActive Support通知をどのタスクでも利用できます。

enqueued.maintenance_tasks    # タスクがユーザーによってエンキューされるとこのイベントがpublishされるsucceeded.maintenance_tasks   # タスクがエラーなしで完了するとこのイベントがpublishされるcancelled.maintenance_tasks   # ユーザーが何らかのタスクを明示的に停止するとこのイベントがpublishされるpaused.maintenance_tasks      # ユーザーがタスクを実行中に一時停止するとこのイベントがpublishされるerrored.maintenance_tasks     # タスクのコードがunhandled exceptionを発生するとこのイベントがpublishされる

これらの通知は、メンテナンスタスクのライフサイクルをアプリケーション内で監視する方法を提供します。

利用例:

ActiveSupport::Notifications.subscribe("succeeded.maintenance_tasks") do |*, payload|  task_name = payload[:task_name]  arguments = payload[:arguments]  metadata = payload[:metadata]  job_id = payload[:job_id]  run_id = payload[:run_id]  time_running = payload[:time_running]  started_at = payload[:started_at]  ended_at = payload[:ended_at]rescue => e  Rails.logger.error(e)end
ActiveSupport::Notifications.subscribe("errored.maintenance_tasks") do |*, payload|  task_name = payload[:task_name]  error = payload[:error]  error_message = error[:message]  error_class = error[:class]  error_backtrace = error[:backtrace]rescue => e  Rails.logger.error(e)end
class MaintenanceTasksInstrumenter < ActiveSupport::Subscriber  attach_to :maintenance_tasks  def enqueued(event)    task_name = event.payload[:task_name]    arguments = event.payload[:arguments]    metadata = event.payload[:metadata]    SlackNotifier.broadcast(SLACK_CHANNEL,      "Job #{task_name} was started by #{metadata[:user_email]}} with arguments #{arguments.to_s.truncate(255)}")  rescue => e    Rails.logger.error(e)  endend

🔗 タスクのコールバックを使う

タスクのライフサイクルにフックをかける以下のコールバックが提供されています。

  • after_start
  • after_pause
  • after_interrupt
  • after_cancel
  • after_complete
  • after_error
module Maintenance  class UpdatePostsTask < MaintenanceTasks::Task    after_start :notify    def notify      NotifyJob.perform_later(self.class.name)    end    # ...  endend

:after_errorは完了することが保証されているので、コールバックのコード内で発生する例外はすべて無視されます。after_errorコールバックのコードが例外をraiseする場合は、そのコールバック内で適切にrescueして処理する必要があります。

module Maintenance  class UpdatePostsTask < MaintenanceTasks::Task    after_error :dangerous_notify    def dangerous_notify      # This error is rescued and ignored in favour of the original error causing the error flow.      raise NotDeliveredError    end    # ...  endend

その他のコールバックが例外を発生した場合は、エラーハンドラによって処理され、タスクが実行を停止します。

🔗 タスクを記述するときに考慮すべき点

maintenance_tasksは、タスクを処理するジョブを実行するために、アプリケーション用に設定されたキューアダプタに依存します。タスクの記述方法に関するガイドラインはキューアダプタによって異なる部分もありますが、一般には以下のルールに沿って書く必要があります。

  • Task#processの実行時間に注意すること
    コレクションの要素1個の処理が25秒を超えないようにする。
    あるいは、Sidekiqやアプリケーションに設定済みのキューアダプタに設定されている実行時間を超えないようにする。
    バッチを小さめにしておくことで、タスクが中断や再開を安全に行えるようになります。

  • Task#processを冪等にすること
    コレクションの同じ要素に対してprocessが複数回実行されても安全になるようにすること。詳しくはSidekiqのベストプラクティスを参照。
    タスクでエラーが発生した場合、エラーの原因になった要素が再度処理される可能性があるため、要素の処理を再実行可能にすることが重要です。
    これは、上の状況でイテレーション期間がタイムアウトした場合に特に重要です(ジョブが再度エンキューされると1個以上の要素が再処理される可能性があるため)。

🔗 タスクオブジェクトのライフサイクルとメモ化

タスクが実行または再開されると、ランナーはタスク処理用のジョブをエンキューします。このジョブはタスクオブジェクトをインスタンス化し、タスクオブジェクトはそのジョブが続く間存在し続けます。ジョブが初めて実行されると、タスクはcountを呼び出します。ジョブが実行されるたびに、タスクはタスクオブジェクトのcollectionを呼び出し、続いてコレクションの項目ごとにprocessを呼び出します。コレクションの処理が完了するか、ジョブの最大実行時間を超えると、ジョブは停止します。

メモ化された値は、同じジョブ内における以後のprocess呼び出しでも参照可能になるため、process内でメモ化を行うとおかしなことになる可能性があります。
それでもメモ化はスロットリングやレポート生成で引き続き利用可能です。たとえば、レポートを永続化したりログ出力するのにタスクのコールバックを利用できます。

🔗 タスクのテストを書く

タスクジェネレータを実行すると、test/tasks/maintenance/ディレクトリにタスクのテストファイルも作成されます。少なくともタスクの#processメソッドについてはテストを書いておくことをおすすめします。タスクの#collectionメソッドや#countメソッドが複雑になってきたら、これらについてもテストを書くのがよいでしょう。

テストコードの例:

# test/tasks/maintenance/update_posts_task_test.rbrequire "test_helper"module Maintenance  class UpdatePostsTaskTest < ActiveSupport::TestCase    test "#process performs a task iteration" do      post = Post.new      Maintenance::UpdatePostsTask.process(post)      assert_equal "New content!", post.content    end  endend

🔗 CSVタスクのテストを書く

CSVタスクでも#processメソッドのテストを書きましょう。テストではCSV::Rowを引数として受け取ります。CSVのある行、または文字列キーのハッシュをテストに渡すことで、#processをテストできます。

# test/tasks/maintenance/import_posts_task_test.rbrequire "test_helper"module Maintenance  class ImportPostsTaskTest < ActiveSupport::TestCase    test "#process performs a task iteration" do      assert_difference -> { Post.count } do        Maintenance::UpdatePostsTask.process({          "title" => "My Title",          "content" => "Hello World!",        })      end      post = Post.last      assert_equal "My Title", post.title      assert_equal "Hello World!", post.content    end  endend

🔗 パラメータ付きタスクのテストを書く

パラメータを受け取れるタスクのテストでは、属性を代入するためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、通常どおり#process をテストできるようになります。

# test/tasks/maintenance/update_posts_via_params_task_test.rbrequire "test_helper"module Maintenance  class UpdatePostsViaParamsTaskTest < ActiveSupport::TestCase    setup do      @task = UpdatePostsViaParamsTask.new      @task.updated_content = "Testing"    end    test "#process performs a task iteration" do      assert_difference -> { Post.first.content } do        @task.process(Post.first)      end    end  endend

🔗 カスタムEnumeratorを使うタスクをテストする

カスタムEnumeratorを使うタスクのテストでは、#enumerator_builderを呼び出すためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、#enumerator_builderが返すEnumeratorが[item, cursor]ペアを期待通りに生成することを検証できるようになります。

# test/tasks/maintenance/custom_enumerating_task.rbrequire "test_helper"module Maintenance  class CustomEnumeratingTaskTest < ActiveSupport::TestCase    setup do      @task = CustomEnumeratingTask.new    end    test "#enumerator_builder returns enumerator yielding pairs of [item, cursor]" do      enum = @task.enumerator_builder(cursor: 0)      expected_items = [:b, :c]      assert_equal 2, enum.size      enum.each_with_index do |item, cursor|        assert_equal expected_items[cursor], item      end    end    test "#process performs a task iteration" do      # ...    end  endend

🔗 タスクを実行する

🔗 Web UIから実行する

Web UIを開いてタスクの"Run"をクリックすれば、新しいタスクを実行できます。

🔗 コマンドラインから実行する

以下のようにコマンドラインでタスクを実行することも可能です。

bundle exec maintenance_tasks perform Maintenance::UpdatePostsTask

CSVを処理するタスクをコマンドラインで実行するには、--csvオプションを指定します。

bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv "path/to/my_csv.csv"

この--csvオプションは、標準入力からCSVコンテンツを受け取るときにも有効です。

curl "some/remote/csv" |  bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv

引数を受け取るタスクをコマンドラインで実行するには、--argumentsオプションを指定し、続けて<キー>:<値>ペアのセットを渡します。

bundle exec maintenance_tasks perform Maintenance::ParamsTask \  --arguments post_ids:1,2,3 content:"Hello, World!"
🔗 タスクをRubyから実行する

runにタスク名を指定してランナーに送信すれば、タスクをRubyで実行することも可能です。

MaintenanceTasks::Runner.run(name: "Maintenance::UpdatePostsTask")

CSVをランナーで処理するタスクを実行するには、オープンするioオブジェクトとファイル名を含むハッシュをrunに渡します。

MaintenanceTasks::Runner.run(  name: "Maintenance::ImportPostsTask",  csv_file: { io: File.open("path/to/my_csv.csv"), filename: "my_csv.csv" })

ランナーを使い、かつ引数を受け取るタスクを実行するには、run{ パラメータ名: 引数の値 }という引数セットを含むハッシュを渡します。

MaintenanceTasks::Runner.run(  name: "Maintenance::ParamsTask",  arguments: { post_ids: "1,2,3" })

🔗 タスクのステータスを監視する

Web UIでは、タスクの最新ステータス情報を得られます。タスクのステータスは以下のとおりです。

new(新規)
まだ実行されていないタスク。
enqueued(エンキュー済み)
ユーザーが実行を指示した後で実行待機中のタスク。
running(実行中)
現在ジョブワーカーによって実行中の状態にあるタスク。
pausing(一時停止処理中)
ユーザーが一時停止を指示した後、停止前に作業を完了する必要のあるタスク。
paused(一時停止済み)
ユーザーによって一時停止され、現在は実行していないタスク。再開(resume)可能。
interrupted(中断済み)
ジョブインフラストラクチャによって一時的に中断されたタスク。
cancelling(キャンセル処理中)
ユーザーによってキャンセルされ、停止前に作業を完了する必要のあるタスク。
cancelled(キャンセル済み)
ユーザーによってキャンセルされ、現在は実行していないタスク。再開不可。
succeeded(成功)
正常に完了したタスク。
errored(エラー)
実行中にunhandled exceptionが発生したタスク。

🔗 API専用アプリケーションで利用する

maintenance_tasksエンジンは、flashメッセージとCSRFトークンの保存にRailsセッションを利用します。このエンジンをAPIのみのRailsアプリケーションで動作させるには、SessionミドルウェアActionDispatch::Flashミドルウェアを追加する必要があります。また、エンジンは厳格なコンテンツセキュリティポリシー(CSP)を定義しています。CSPがユーザーのブラウザに確実に配信されるように、アプリのミドルウェアスタックにActionDispatch::ContentSecurityPolicy::Middlewareを含めるようにしてください。
Railsアプリケーションの設定については本ドキュメントの範囲外ですが、たとえばアプリケーションの設定ファイルに以下の行を追加することで可能になります。

# config/application.rbmodule YourApplication  class Application < Rails::Application    # ...    config.api_only = true    config.middleware.insert_before ::Rack::Head, ::ActionDispatch::Flash    config.middleware.insert_before ::Rack::Head, ::ActionDispatch::ContentSecurityPolicy::Middleware    config.session_store :cookie_store, key: "_#{railtie_name.chomp("_application")}_session", secure: true    config.middleware.insert_before ::ActionDispatch::Flash, config.session_store, config.session_options    config.middleware.insert_before config.session_store, ActionDispatch::Cookies  endend

詳しくはRailsガイドのRailsによるAPI専用アプリケーションを参照してください。

🔗 maintenance_tasksがタスクを実行するしくみ

メンテナンスタスクの実行は長時間に及ぶ可能性があります。maintenance_tasks gemの目的は、「デプロイ」「Kubernetes Podのスケジューリング」「Heroku dynoの再起動」「その他のインフラストラクチャやコードの変更」を介してタスクの実行継続を支援することです。

つまり、processメソッドが返されれば、以後はイテレーション処理の終盤に手動で介入する必要なしに、タスクの中断やエンキュー、再開を安全に行えるようになります。

デフォルトでは、実行中のタスクは5分以上経過すると自動的に中断されます。このタイムアウトはjob-iteration gemで設定され、必要に応じてイニシャライザで調整可能です。

実行中のタスクも、必要に応じて自動的に中断および再度エンキューされます(例:デプロイでSidekiqワーカーがシャットダウンした場合)。

  • Sidekiqは、TSTPシグナルかTERMシグナルを受信すると、自分自身を停止中とみなします。

  • Sidekiqが停止中は、job-iterationがEnumeratorのイテレーションを停止します。
    このときjob-iterationはイテレーション位置を保存したうえで、作業再開用の新しいジョブをエンキューし、タスクのステータスを"interrupted"にします。

Sidekiqが停止処理中の状態になると、ワーカーに終了まで25秒の猶予が与えられ、それをすぎると強制的にワーカーを終了します(これはデフォルト値:--timeoutオプションで変更可能)。
Sidekiqは、ワーカースレッドが終了する前にジョブの再エンキューを試みてタスクを再開しようとします。ただしこの場合、コレクションをどこまでイテレーションしたかという位置は保存されないため、1個以上のイテレーションが再実行されてしまう可能性があります。

Sidekiq以外のジョブキューでは、これを別の方法で処理しているものもあります。

🔗 タスクが詰まってしまった場合の対応方法

アプリケーションに設定されているキューアダプタにこのプロパティがない場合や、Sidekiqが「クラッシュ」「強制終了」または「実行中のキューを再エンキューできない状態」になると、タスクが詰まってしまったように見える(実行中のように見えるが実際は実行中ではない)ことがあります。

このような状況になると、タスクを一時停止またはキャンセルしようとしても、ステータスがpausingcancellingのまま変わらなくなり、一時停止やキャンセルが行えなくなります。

回避方法として、タスクのcancellingステータスが5分以上続く場合に、タスクを再度キャンセルし、ステータスが完全にcancelledに変わったら、再度実行できることがあります。

ステータスがpausingのまま変わらない場合で、かつ(キャンセルや再実行を行わずに)タスクがどこまで処理を終えたかという位置を保持したい場合は、"Force pause"をクリックします。

🔗 エラーを通知する

maintenance_tasks gemにはいくつかの設定オプションがあります。カスタムの設定項目は、maintenance_tasks.rbイニシャライザに配置する必要があります。

🔗 エラーハンドラをカスタマイズする

タスクの実行中に発生した例外はrescueされ、エラー情報は永続化されたうえでUI上に表示されます。

エラーはRails.error.reportにも通知可能で、これはアプリケーションでカスタマイズ可能です。詳しくはRailsガイドのエラー通知ガイドを参照してください。

エラーレポーターに通知されるデータには以下が含まれます。

  • error: raiseする例外を指定する
  • context: タスクとエラーの追加情報↓を含むハッシュ
    • task_name: エラーが発生したタスクの名前
    • started_at: タスクの開始時刻
    • ended_at: タスクのエラー発生時刻
    • run_id: エラーが発生したタスクの実行id
    • tick_count: えらー発生時刻からの経過時間(tick)
    • errored_element: タスクで例外が発生したときに処理中だった要素(存在する場合)
      このオブジェクトを外部の例外監視サービスに渡す場合は、必ず機密データ漏洩防止のため「オブジェクトのサニタイズ」を行ってから、バグトラッカーと互換性のある「フォーマットへの変換」を行うこと。
  • source: これはmaintenance-tasksに設定されます。
  • handled:MaintenanceTasks.report_errors_as_handledの値(デフォルトはtrue: 後述)

コンテキストが集められる前にタスクでエラーが発生すると、contextが空になることがあります(タスクをデシリアライズするジョブが失敗した場合など)。

以下は、例外監視サービス (Bugsnag) と統合するためのRailsエラーレポーターのカスタムサブスクライバの例です。

# config/initializers/maintenance_tasks.rbMaintenanceTasks.report_errors_as_handled = falseclass MaintenanceTasksErrorSubscriber  def report(error, handled:, severity:, context:, source: nil)    return unless source == "maintenance-tasks"    unless handled      Bugsnag.notify(error) do |notification|        notification.add_metadata(:task, context)      end    else      Rails.logger.info(error)  endendRails.error.subscribe(MaintenanceTasksErrorSubscriber.new)

handledの値は、この例ではMaintenanceTasks.report_errors_as_handledによって決まります。
この値は、後方互換性のため、デフォルトではtrueに設定されます。
これをfalseに設定すると、エラーサブスクライバで期待されるエラー(例:report_on経由)と期待されていないエラーを区別可能になるので、より正確なエラーレポートが提供されます。v3.0ではfalseがデフォルトになります。

🔗 イテレーション中のエラーを通知する

デフォルトでは、タスクのイテレーション(反復処理)中に発生したエラーはアプリケーションに通知され、イテレーションは停止します。しかし、一部のエラーについては処理を行ってからイテレーションを継続したい場合もあります。MaintenanceTasks::Task.report_onを使うことで、特定の例外をrescueしてRailsエラーレポーターに通知できます。キーワードパラメータはすべてreportメソッドに渡されます。

class MyTask < MaintenanceTasks::Task  report_on(MyException, OtherException, severity: :info, context: {task_name: "my_task"})end

MaintenanceTasks::TaskにはActiveSupport::Rescuableも含まれているので、これを用いてカスタムエラー処理を実装できます。

class MyTask < MaintenanceTasks::Task  rescue_from(MyException) do |exception|    handle(exception)  endend
🔗MaintenanceTasksモジュールをカスタマイズする

以下のようにMaintenanceTasks.tasks_moduleを設定することで、タスクに配置するモジュールを定義できます。

# config/initializers/maintenance_tasks.rbMaintenanceTasks.tasks_module = "TaskModule"

この値が指定されていない場合は、デフォルトでMaintenanceになります。

🔗 タスクを名前空間で整理する

タスクは、app/tasks/maintenanceディレクトリの下に任意の深さでネストできます。
たとえば、app/tasks/maintenance/team_name/service_name/update_posts_task.rbというファイルに以下のタスクを定義できます。

module Maintenance  module TeamName    module ServiceName      class UpdatePostsTask < MaintenanceTasks::Task        def process(rows)          # ...        end      end    end  endend
🔗 ベースとなるジョブクラスをカスタマイズする

MaintenanceTasks.jobコンフィグは、タスクで使うジョブクラスを定義するのに使えます。この設定はグローバルなので、ジョブクラスはアプリケーション内のあらゆるメンテナンスタスクで使われます。

# config/initializers/maintenance_tasks.rbMaintenanceTasks.job = "CustomTaskJob"# app/jobs/custom_task_job.rbclass CustomTaskJob < MaintenanceTasks::TaskJob  queue_as :low_priorityend

このジョブクラスは、MaintenanceTasks::TaskJob必ず継承する必要があります。

ただし、retry_onはカスタムジョブクラスではサポートされないため、失敗したジョブのリトライはできない点にご注意ください。

🔗 タスク進捗状況の永続化頻度をカスタマイズする

MaintenanceTasks.ticker_delayコンフィグは、タスクの進捗状況を永続化する頻度をカスタマイズするのに使えます。Numeric値またはActiveSupport::Duration値を指定できます。

# config/initializers/maintenance_tasks.rbMaintenanceTasks.ticker_delay = 2.seconds

値を指定しない場合のデフォルト値は1秒です。

🔗 Active Storageで使うストレージサービスをカスタマイズする

Rails 6.1以降のActive Storageフレームワークは、複数のストレージサービスをサポートしています。使うサービスを指定するには、MaintenanceTasks.active_storage_serviceコンフィグに、アプリケーションのconfig/storage.ymlで指定されているサービスのキーを設定できます。

# config/storage.ymluser_data:  service: GCS  credentials: <%= Rails.root.join("path/to/user/data/keyfile.json") %>  project: "my-project"  bucket: "user-data-bucket"internal:  service: GCS  credentials: <%= Rails.root.join("path/to/internal/keyfile.json") %>  project: "my-project"  bucket: "internal-bucket"
# config/initializers/maintenance_tasks.rbMaintenanceTasks.active_storage_service = :internal

アプリケーションで利用するストレージサービスが1種類のみの場合は、このコンフィグオプションは不要です。その場合、Rails.configuration.active_storage.serviceがデフォルトで使われます。

🔗 バックトレースクリーナーをカスタマイズする

MaintenanceTasks.backtrace_cleanerコンフィグは、タスクのエラー時にバックトレースをクリーンアップして永続化するのに使うバックトレースクリーナーを指定するのに使えます。ActiveSupport::BacktraceCleanerを使う必要があります。

# config/initializers/maintenance_tasks.rbcleaner = ActiveSupport::BacktraceCleaner.newcleaner.add_silencer { |line| line =~ /ignore_this_dir/ }MaintenanceTasks.backtrace_cleaner = cleaner

値を指定しない場合は、デフォルトでRails.backtrace_cleanerがバックトレースのクリーンアップに使われます。

🔗 Web UIで使われる親コントローラをカスタマイズする

MaintenanceTasks.parent_controllerコンフィグは、Web UIエンジンのすべてのコントローラに継承されるコントローラクラスを指定するのに使えます。

これにより、ApplicationController(または任意のコントローラー)に共通ロジックを持つアプリケーションは、イニシャライザでシンプルな割当を行うだけで、Web UIがそのロジックを継承する設定がオプションで可能になります。

# config/initializers/maintenance_tasks.rbMaintenanceTasks.parent_controller = "Services::CustomController"# app/controllers/services/custom_controller.rbclass Services::CustomController < ActionController::Base  include CustomSecurityThings  include CustomLoggingThings  # ...end

親コントローラの値は、必ず既存のコントローラクラスに対応する文字列でなければならず、その既存のコントローラはActionController::Base必ず継承していなければなりません

値が指定されていない場合は、デフォルトで"ActionController::Base"が使われます。

🔗 タスクが詰まっていると判断するまでの時間を設定する

タスクが更新されていない場合に、タスクが詰まっているとみなすまでのタイムアウト時間を設定するには、MaintenanceTasks.stuck_task_durationコンフィグを使えます。この時間を設定するときは、ジョブインフラストラクチャのイベントがメンテナンスタスクのジョブ実行を妨げたりタスクをキャンセルしたりする分の時間も考慮して盛り込んでおく必要があります。

MaintenanceTasks.stuck_task_durationの値にはActiveSupport::Duration型を必ず使う必要があります
値が指定されていない場合は、デフォルトで5分が設定されます。

🔗 ステータスの再読み込み頻度を設定する

MaintenanceTasks.status_reload_frequencyを設定すると、イテレーション中に実行ステータスを再読み込みする頻度を指定できます。デフォルトのステータスは1秒ごとに再読み込みされますが、パフォーマンスを向上させるためにこの間隔を長くできます。再読み込み間隔を長くすると、タスクが一時停止または中断した場合の停止速度に影響することに注意してください。

# config/initializers/maintenance_tasks.rbMaintenanceTasks.status_reload_frequency = 10.seconds  # 10秒おきに再読み込み

個別のタスクでreload_status_everyメソッドを設定することで、設定を上書きできます。

# app/tasks/maintenance/update_posts_task.rbmodule Maintenance  class UpdatePostsTask < MaintenanceTasks::Task    # 5秒おきにステータスを再読み込み(グローバルなデフォルト値を使わない)    reload_status_every(5.seconds)    def collection      Post.all    end    def process(post)      post.update!(content: "New content!")    end  endend

この最適化によって、特に短時間のイテレーションでデータベースクエリを大幅に削減可能になります。
これは、タスクのキャンセルや一時停止を頻繁にチェックする必要がない場合に特に有用です。

🔗 メタデータ

MaintenanceTasks.metadataコンフィグは、実行に関する追加情報を取得するprocを指定するのに使えます。このprocはMaintenanceTasks.parent_controllerのコンテキストで実行されるので、メンテナンスタスクを実行したユーザーのIDやメールアドレスを取得するのに利用できます。

# config/initializers/maintenance_tasks.rbMaintenanceTasks.metadata = ->() do  { user_email: current_user.email }end

🔗 アップグレード方法

maintenance_tasksの新バージョンをチェックして更新するには、bundlerを使います。新バージョンをインストールした後は、インストールコマンドを再度実行すること。

bin/rails generate maintenance_tasks:install

これにより、新しいマイグレーションもインストールされて実行可能になります。

🔗 メンテナンスタスクの古いマイグレーションを削除してしまった場合の対応方法

maintenance_tasksのインストールコマンドは、古いマイグレーションの再インストールを試みるため、古いマイグレーションが削除されていると、マイグレーション中にデータベースで問題が発生します。

対応方法:bin/rails maintenance_tasks:install:migrationsを実行して、maintenance_tasksのマイグレーションをdb/migrateディレクトリにコピーしてください。
次に、maintenance_tasksに新しいマイグレーションが追加されていないかどうかをリリースノートで確認し、新しいマイグレーションがあれば残し、既に実行済みの古いマイグレーションは削除します。

以上の確認が終わったら、bin/rails db:migrateコマンドでマイグレーションを実行します。

🔗 貢献方法

私たちは、問題報告用のissueや、プルリクエストによるコードへの貢献を受け付けています。貢献方法についてはCONTRIBUTING.mdのガイドラインを参照してください。

🔗 新バージョンのリリースについて

プルリクエストがマージされると、GitHub上の最新ドラフトリリースに追加されます。

新バージョンのリリース準備ができたら、以下の手順を実行します。

  • maintenance_tasks.gemspecファイルのspec.versionフィールドを更新します。
  • bundle installを実行して、Gemfile.lockのgemバージョンを上げます。
  • プルリクエストをオープンし、承認を経てからマージします。
  • Shipit経由でデプロイし、rubygems.orgに新バージョンが表示されていることを確認します。
  • リリースノートにすべてのChangelogが記載されていることを確認してから、新バージョンを公開します。
  • 次のリリースに備えて、“Upcoming Release”というタイトルのドラフトリリースをGitHub上に新規作成します。タグバージョンは空欄のままで構いません。これは、次期リリースに関連する変更をドキュメント化するときの開始ポイントになります。

関連記事

Solid Queue README -- DBベースのActive Jobバックエンド(翻訳)

Rails: Mission Control Jobs gem README(翻訳)

Rails: Active Jobスタイルガイド(翻訳)


この記事を書いた人

hachi8833

X:@hachi8833GitHub:@hachi8833コボラー、ITコンサル、ローカライズ業界、Rails開発を経てTechRachoの編集・記事作成を担当。これまでにRuby on Rails チュートリアル第2版のコンテンツ監修、Railsガイドのコンテンツ作成を担当。かと思うと、正規表現の粋を尽くした日本語エラーチェックサービスenno.jpを運営。Claude Codeに夢中になりすぎないための方法を模索中。ブログ:note.com/hachi8833Amazonウィッシュリスト:https://bit.ly/32aAmiI

hachi8833の書いた記事一覧へ

本記事の内容へのお問い合せはTwitterで@techrachoへMentionまたはDMにてご連絡頂くか、運営会社であるBPS株式会社のお問い合せフォームよりお問い合せ下さい。

Our Services

各種サービスのご依頼やお問い合わせなど、お気軽にご相談ください。

Our Products

製品のご利用希望や疑問・質問など、お気軽にご相談ください。

Recruit & Contacts

お問い合わせ、採用へのお申し込みはこちらから。

積極採用中 開発エンジニアCONTACT

関連記事

CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。
CONTACT FORM

[8]ページ先頭

©2009-2025 Movatter.jp