Movatterモバイル変換


[0]ホーム

URL:


BLOGTIMES

cles::blog

平常心是道
« :: »
2009/04/25

Rubyでお手軽MQ

  ruby  sh  mom 
このエントリーをはてなブックマークに追加

仕事でバッチの多重起動を防止するスクリプトを書いたのですが、これだと後続のバッチがキャンセルされてしまって実行されないので、ジョブをQueueに溜めて逐次実行してくれる仕組みが欲しくなったので、Rubyを使ってMQサーバを書いてみました。

RubyにはQueueクラスがあるので、これを使ってdrbからメッセージを投げ込むようにすれば簡単なMQサーバはすぐにできるのですが、これだとメッセージの永続化機能がないので、不慮の事故でマシンの電源が落ちたりするとそのままメッセージが消滅してしまいます。それだと色々と問題があるので、今回はap4rのバックエンドにも使われている永続化機能を持ったメッセージキューであるreliable-msgを使います。

僕がかつて使っていたIBM MQ Series ( Webshere MQ Series )と比べればおもちゃのようなものですが、ちょっと使う分にはこれで十分です。

reliable-msgのインストールとパッチ当て

まず、gemでreliable-msgをインストールします。

# gem install reliable-msg

gemでインストールされるreliable-msgはUUID 2.0系で動かないなどのいくつかの不具合があるので、下記のパッチを当ててバグを修正します。

/usr/lib/ruby/gems/1.8/gems/reliable-msg-1.1.0/lib/reliable-msg

--- cli.rb.org 2009-04-23 00:31:40.000000000 +0900+++ cli.rb 2009-04-23 00:52:53.000000000 +0900@@ -209,8 +209,8 @@ exit end drb = Config::DEFAULT_DRB- drb.merge(config.drb) if config.drb- drb_uri = "druby://localhost:#{drb['port']}"+ drb = drb.merge(config.drb) if config.drb+ drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}" else drb_uri = Queue::DEFAULT_DRB_URI end
--- message-store.rb.org 2009-04-23 00:31:40.000000000 +0900+++ mescdsage-store.rb 2009-04-23 00:41:20.000000000 +0900@@ -305,7 +305,7 @@ free = @mutex.synchronize do @file_free.shift end- name = free ? free[0] : "#{@path}/#{UUID.new}.msg"+ name = free ? free[0] : "#{@path}/#{UUID.generate}.msg" file = if free && free[1] free[1] else
--- queue-manager.rb.org 2009-04-23 00:31:40.000000000 +0900+++ queue-manager.rb 2009-04-23 00:52:14.000000000 +0900@@ -233,8 +233,8 @@ # Get the DRb URI from the configuration, or use the default. Create a DRb server. drb = Config::DEFAULT_DRB- drb.merge(@config.drb) if @config.drb- drb_uri = "druby://localhost:#{drb['port']}"+ drb = drb.merge(@config.drb) if @config.drb+ drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}" @drb_server = DRb::DRbServer.new drb_uri, self, :tcp_acl=>ACL.new(drb["acl"].split(" "), ACL::ALLOW_DENY) @logger.info format(INFO_ACCEPTING_DRB, drb_uri)@@ -305,7 +305,7 @@ raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty? time = Time.new.to_i # TODO: change this to support the RM delivery protocol.- id = args[:id] || UUID.new+ id = args[:id] || UUID.generate created = args[:created] || time # Validate and freeze the headers. The cloning ensures that the headers we hold in memory@@ -478,7 +478,7 @@ message, headers, topic, tid = args[:message], args[:headers], args[:topic].downcase, args[:tid] raise ArgumentError, ERROR_PUBLISH_MISSING_TOPIC unless topic and topic.instance_of?(String) and !topic.empty? time = Time.new.to_i- id = args[:id] || UUID.new+ id = args[:id] || UUID.generate created = args[:created] || time # Validate and freeze the headers. The cloning ensures that the headers we hold in memory@@ -559,7 +559,7 @@ # Called by client to begin a transaction. def begin timeout- tid = UUID.new+ tid = UUID.generate @transactions[tid] = {:inserts=>[], :deletes=>[], :timeout=>Time.new.to_i + timeout} tid end

サーバとクライアントの作成

スクリプトを配置するための適当なディレクトリと、メッセージが永続化されるディレクトリを作ります。
ココでは前者をqmtest、後者をqmtest/queuesと仮定します。

$ makedir -p qmtest/queues

次に、サーバとクライアントのスクリプト、設定ファイルを用意します。

qmtest/queues.cfg(QueueManagerの設定ファイル)

store: type: diskdrb: host: 0.0.0.0 port: 6438 acl: allow 127.0.0.1 allow ::1 allow localhost

qmtest/server.rb(サーバ本体)

#!/usr/bin/ruby -Kurequire 'rubygems'require 'reliable-msg'include ReliableMsgrequire 'logger'logger = Logger.new(STDOUT)qm = QueueManager.new(:config => "queues.cfg", :logger => logger)qm.startlogger.info "QueueManager started."q = ReliableMsg::Queue.new "queue.test"queueEnabled = trueSignal.trap(:INT){ logger.info "SIGINT trapped." queueEnabled = false}while queueEnabled executed = q.get { |msg| if msg msg.headers.each do |k, v| logger.debug("MessageHeader: #{k} => #{v}") end logger.debug("MessageObject: #{msg.object}") end } sleep 10 unless executedendqm.stoplogger.info "QueueManager terminated."

qmtest/client.rb(QueueManagerにメッセージを送信するクライアント)

#!/usr/bin/ruby -Kurequire 'rubygems'require 'reliable-msg'include ReliableMsgrequire 'logger'logger = Logger.new(STDOUT)q = ReliableMsg::Queue.new "queue.test"logger.info q.put("hoge")

実際に動作させてみる

実際にサーバを起動して、メッセージを放りこんでみます。このサーバはテスト用なので、10秒ごとにQueueを監視してメッセージがあれば取り出して、メッセージをログに出力するだけで特に何も処理をしません。

コンソールを2つ開き、1つ目のコンソールでqmtestディレクトリに入ってサーバを起動します。
下記のようなログが出力されるはずです。

$ cd qmtest$ ruby server.rbI, [2009-04-26T02:23:14.215998 #9154] INFO -- : Loaded queues configuration from: /tmp/qmtest/queues.cfgI, [2009-04-26T02:23:14.216099 #9154] INFO -- : Using message store: diskI, [2009-04-26T02:23:14.216701 #9154] INFO -- : Accepting requests at: druby://0.0.0.0:6438I, [2009-04-26T02:23:14.216792 #9154] INFO -- : QueueManager started.

もう一つのコンソールも同様にして、クライアントを起動させると、投入されたメッセージのIDが出力されるはずです。

$ cd qmtest$ ruby client.rbI, [2009-04-26T02:23:23.736733 #9157] INFO -- : b43c53d0-13eb-012c-d180-003048d48a0e

そうすると、1つめのコンソールがメッセージを受け取って下記のようなログを出力するはずです。

D, [2009-04-26T02:23:24.219530 #9154] DEBUG -- : MessageHeader: priority => 0D, [2009-04-26T02:23:24.219609 #9154] DEBUG -- : MessageHeader: delivery => best_effortD, [2009-04-26T02:23:24.219667 #9154] DEBUG -- : MessageHeader: created => 1240680203D, [2009-04-26T02:23:24.219722 #9154] DEBUG -- : MessageHeader: max_deliveries => 5D, [2009-04-26T02:23:24.219775 #9154] DEBUG -- : MessageHeader: id => b43c53d0-13eb-012c-d180-003048d48a0eD, [2009-04-26T02:23:24.219830 #9154] DEBUG -- : MessageHeader: expires =>D, [2009-04-26T02:23:24.219882 #9154] DEBUG -- : MessageObject: hoge

サーバを終了させるにはCtrl+Cを押します。
現在のメッセージの処理が完了すると、QueueManagerがシャットダウンされます。

I, [2009-04-26T02:23:30.856519 #9154] INFO -- : SIGINT trapped.I, [2009-04-26T02:23:34.228837 #9154] INFO -- : Stopped queue manager at: druby://0.0.0.0:6438I, [2009-04-26T02:23:34.228932 #9154] INFO -- : QueueManager terminated.

あとは、具体的な処理を書いたり、ThreadやProcessを使って多重化するなり自由自在です。

シェルスクリプトでさらに起動と停止を便利にする

運用の時にサーバの上げ下げが面倒なのは嫌なので、start.shとstop.shで一発で起動と停止が出来るようにしてみました。

qmtest/start.sh

#!/bin/bashBASENAME=`basename $0 .sh`DIRNAME=`dirname $0`PID_FILE="server.pid"cd $DIRNAMEif [ -f $PID_FILE ]; then STORED_PID=`cat $PID_FILE` if (ps -p ${STORED_PID} -o pid= >/dev/null); then echo "Already started. (PID: $STORED_PID)" exit fifiruby server.rb >> server.log 2>&1 &QM_PID=$!echo $QM_PID > $PID_FILEecho "QueueManager started. (PID: $QM_PID)"

qmtest/stop.sh

#!/bin/bashBASENAME=`basename $0 .sh`DIRNAME=`dirname $0`PID_FILE="server.pid"cd $DIRNAMEif [ -f $PID_FILE ]; then STORED_PID=`cat $PID_FILE` if (ps -p ${STORED_PID} -o pid= >/dev/null); then kill -2 $STORED_PID echo -n "Signal sent, waiting for exit..." fi while (ps -p ${STORED_PID} -o pid= >/dev/null); do sleep 3 echo -n "." done echo ""firm -f $PID_FILEecho "done!"

参考
 ・AP4R で Parallels の壁を越える
 ・Rails Wiki - ReliableMsg


    byhsur at 23:11[5年前][4年前][3年前][2年前][1年前][1年後][2年後][3年後][4年後][5年後] |
    こんな記事もあります 「メッセージ指向 Queue バッチ
    2022 年の人気エントリ Top 100
    デバイス製造元からの HEVC ビデオ拡張機能がダウンロードできない?
    2021 年の人気エントリ Top 100
    .heic を .jpg にドラッグアンドドロップで変換する BAT ファイル
    AbuseIPDB を使い始めてみた
    ドラッグアンドドロップでフォルダ毎に zip アーカイブを作る bat
    2020 年の人気エントリ Top 100
    Python で非同期の STOMP クライアントを書く
    Python で Elasticsearch をいじる
    PowerShell を CentOS にインストール
    トラックバックについて
    Trackback URL:
    お気軽にどうぞ。トラックバック前にポリシーをお読みください。[policy]
    このエントリへのTrackbackにはこのURLが必要です→https://blog.cles.jp/item/3010
    Trackbacks
    このエントリにトラックバックはありません
    Comments
    愛のあるツッコミをお気軽にどうぞ。[policy]
    古いエントリについてはコメント制御しているため、即時に反映されないことがあります。
    コメントはありません
    Comments Form

    コメントは承認後の表示となります。
    OpenIDでログインすると、即時に公開されます。

    OpenID を使ってログインすることができます。

    Identity URL:Yahoo! JAPAN IDでログイン

    « :: »
    Copyright © 2004-2023 by CLES All Rights Reserved.
    サイト内検索
    検索ワードランキング
    へぇが多いエントリ
    閲覧数が多いエントリ
    1 .アーロンチェアのポスチャーフィットを修理(99679)
    2 .年次の人間ドックへ(99091)
    3 .福岡銀がデマの投稿者への刑事告訴を検討中(99081)
    4 .三菱鉛筆がラミーを買収(98691)
    5 .2023 年分の確定申告完了!(1つめ)(98660)
    最新のエントリ
    cles::blogについて
    誰が書いてる?
    最近行った場所
    サイトポリシー
    タグ一覧
    検索ワードランキング

    Referrers

      Powered by CLES
      Nucleus CMS v3.31SP3/w memcached
      21375631(W:6256 Y:1545 T:1450)
      cles::blogのはてなブックマーク数
      benchmark


      [8]ページ先頭

      ©2009-2025 Movatter.jp