仕事でバッチの多重起動を防止するスクリプトを書いたのですが、これだと後続のバッチがキャンセルされてしまって実行されないので、ジョブをQueueに溜めて逐次実行してくれる仕組みが欲しくなったので、Rubyを使ってMQサーバを書いてみました。
RubyにはQueueクラスがあるので、これを使ってdrbからメッセージを投げ込むようにすれば簡単なMQサーバはすぐにできるのですが、これだとメッセージの永続化機能がないので、不慮の事故でマシンの電源が落ちたりするとそのままメッセージが消滅してしまいます。それだと色々と問題があるので、今回はap4rのバックエンドにも使われている永続化機能を持ったメッセージキューであるreliable-msgを使います。
僕がかつて使っていたIBM MQ Series ( Webshere MQ Series )と比べればおもちゃのようなものですが、ちょっと使う分にはこれで十分です。
† reliable-msgのインストールとパッチ当て
まず、gemでreliable-msgをインストールします。
# gem install reliable-msg
gemでインストールされるreliable-msgはUUID 2.0系で動かないなどのいくつかの不具合があるので、下記のパッチを当ててバグを修正します。
/usr/lib/ruby/gems/1.8/gems/reliable-msg-1.1.0/lib/reliable-msg
--- cli.rb.org 2009-04-23 00:31:40.000000000 +0900+++ cli.rb 2009-04-23 00:52:53.000000000 +0900@@ -209,8 +209,8 @@ exit end drb = Config::DEFAULT_DRB- drb.merge(config.drb) if config.drb- drb_uri = "druby://localhost:#{drb['port']}"+ drb = drb.merge(config.drb) if config.drb+ drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}" else drb_uri = Queue::DEFAULT_DRB_URI end
--- message-store.rb.org 2009-04-23 00:31:40.000000000 +0900+++ mescdsage-store.rb 2009-04-23 00:41:20.000000000 +0900@@ -305,7 +305,7 @@ free = @mutex.synchronize do @file_free.shift end- name = free ? free[0] : "#{@path}/#{UUID.new}.msg"+ name = free ? free[0] : "#{@path}/#{UUID.generate}.msg" file = if free && free[1] free[1] else
--- queue-manager.rb.org 2009-04-23 00:31:40.000000000 +0900+++ queue-manager.rb 2009-04-23 00:52:14.000000000 +0900@@ -233,8 +233,8 @@ # Get the DRb URI from the configuration, or use the default. Create a DRb server. drb = Config::DEFAULT_DRB- drb.merge(@config.drb) if @config.drb- drb_uri = "druby://localhost:#{drb['port']}"+ drb = drb.merge(@config.drb) if @config.drb+ drb_uri = "druby://#{drb['host']||'localhost'}:#{drb['port']}" @drb_server = DRb::DRbServer.new drb_uri, self, :tcp_acl=>ACL.new(drb["acl"].split(" "), ACL::ALLOW_DENY) @logger.info format(INFO_ACCEPTING_DRB, drb_uri)@@ -305,7 +305,7 @@ raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty? time = Time.new.to_i # TODO: change this to support the RM delivery protocol.- id = args[:id] || UUID.new+ id = args[:id] || UUID.generate created = args[:created] || time # Validate and freeze the headers. The cloning ensures that the headers we hold in memory@@ -478,7 +478,7 @@ message, headers, topic, tid = args[:message], args[:headers], args[:topic].downcase, args[:tid] raise ArgumentError, ERROR_PUBLISH_MISSING_TOPIC unless topic and topic.instance_of?(String) and !topic.empty? time = Time.new.to_i- id = args[:id] || UUID.new+ id = args[:id] || UUID.generate created = args[:created] || time # Validate and freeze the headers. The cloning ensures that the headers we hold in memory@@ -559,7 +559,7 @@ # Called by client to begin a transaction. def begin timeout- tid = UUID.new+ tid = UUID.generate @transactions[tid] = {:inserts=>[], :deletes=>[], :timeout=>Time.new.to_i + timeout} tid end
† サーバとクライアントの作成
スクリプトを配置するための適当なディレクトリと、メッセージが永続化されるディレクトリを作ります。
ココでは前者をqmtest、後者をqmtest/queuesと仮定します。
$ makedir -p qmtest/queues
次に、サーバとクライアントのスクリプト、設定ファイルを用意します。
qmtest/queues.cfg(QueueManagerの設定ファイル)
store: type: diskdrb: host: 0.0.0.0 port: 6438 acl: allow 127.0.0.1 allow ::1 allow localhost
qmtest/server.rb(サーバ本体)
#!/usr/bin/ruby -Kurequire 'rubygems'require 'reliable-msg'include ReliableMsgrequire 'logger'logger = Logger.new(STDOUT)qm = QueueManager.new(:config => "queues.cfg", :logger => logger)qm.startlogger.info "QueueManager started."q = ReliableMsg::Queue.new "queue.test"queueEnabled = trueSignal.trap(:INT){ logger.info "SIGINT trapped." queueEnabled = false}while queueEnabled executed = q.get { |msg| if msg msg.headers.each do |k, v| logger.debug("MessageHeader: #{k} => #{v}") end logger.debug("MessageObject: #{msg.object}") end } sleep 10 unless executedendqm.stoplogger.info "QueueManager terminated."
qmtest/client.rb(QueueManagerにメッセージを送信するクライアント)
#!/usr/bin/ruby -Kurequire 'rubygems'require 'reliable-msg'include ReliableMsgrequire 'logger'logger = Logger.new(STDOUT)q = ReliableMsg::Queue.new "queue.test"logger.info q.put("hoge")
† 実際に動作させてみる
実際にサーバを起動して、メッセージを放りこんでみます。このサーバはテスト用なので、10秒ごとにQueueを監視してメッセージがあれば取り出して、メッセージをログに出力するだけで特に何も処理をしません。
コンソールを2つ開き、1つ目のコンソールでqmtestディレクトリに入ってサーバを起動します。
下記のようなログが出力されるはずです。
$ cd qmtest$ ruby server.rbI, [2009-04-26T02:23:14.215998 #9154] INFO -- : Loaded queues configuration from: /tmp/qmtest/queues.cfgI, [2009-04-26T02:23:14.216099 #9154] INFO -- : Using message store: diskI, [2009-04-26T02:23:14.216701 #9154] INFO -- : Accepting requests at: druby://0.0.0.0:6438I, [2009-04-26T02:23:14.216792 #9154] INFO -- : QueueManager started.
もう一つのコンソールも同様にして、クライアントを起動させると、投入されたメッセージのIDが出力されるはずです。
$ cd qmtest$ ruby client.rbI, [2009-04-26T02:23:23.736733 #9157] INFO -- : b43c53d0-13eb-012c-d180-003048d48a0e
そうすると、1つめのコンソールがメッセージを受け取って下記のようなログを出力するはずです。
D, [2009-04-26T02:23:24.219530 #9154] DEBUG -- : MessageHeader: priority => 0D, [2009-04-26T02:23:24.219609 #9154] DEBUG -- : MessageHeader: delivery => best_effortD, [2009-04-26T02:23:24.219667 #9154] DEBUG -- : MessageHeader: created => 1240680203D, [2009-04-26T02:23:24.219722 #9154] DEBUG -- : MessageHeader: max_deliveries => 5D, [2009-04-26T02:23:24.219775 #9154] DEBUG -- : MessageHeader: id => b43c53d0-13eb-012c-d180-003048d48a0eD, [2009-04-26T02:23:24.219830 #9154] DEBUG -- : MessageHeader: expires =>D, [2009-04-26T02:23:24.219882 #9154] DEBUG -- : MessageObject: hoge
サーバを終了させるにはCtrl+Cを押します。
現在のメッセージの処理が完了すると、QueueManagerがシャットダウンされます。
I, [2009-04-26T02:23:30.856519 #9154] INFO -- : SIGINT trapped.I, [2009-04-26T02:23:34.228837 #9154] INFO -- : Stopped queue manager at: druby://0.0.0.0:6438I, [2009-04-26T02:23:34.228932 #9154] INFO -- : QueueManager terminated.
あとは、具体的な処理を書いたり、ThreadやProcessを使って多重化するなり自由自在です。
† シェルスクリプトでさらに起動と停止を便利にする
運用の時にサーバの上げ下げが面倒なのは嫌なので、start.shとstop.shで一発で起動と停止が出来るようにしてみました。
qmtest/start.sh
#!/bin/bashBASENAME=`basename $0 .sh`DIRNAME=`dirname $0`PID_FILE="server.pid"cd $DIRNAMEif [ -f $PID_FILE ]; then STORED_PID=`cat $PID_FILE` if (ps -p ${STORED_PID} -o pid= >/dev/null); then echo "Already started. (PID: $STORED_PID)" exit fifiruby server.rb >> server.log 2>&1 &QM_PID=$!echo $QM_PID > $PID_FILEecho "QueueManager started. (PID: $QM_PID)"
qmtest/stop.sh
#!/bin/bashBASENAME=`basename $0 .sh`DIRNAME=`dirname $0`PID_FILE="server.pid"cd $DIRNAMEif [ -f $PID_FILE ]; then STORED_PID=`cat $PID_FILE` if (ps -p ${STORED_PID} -o pid= >/dev/null); then kill -2 $STORED_PID echo -n "Signal sent, waiting for exit..." fi while (ps -p ${STORED_PID} -o pid= >/dev/null); do sleep 3 echo -n "." done echo ""firm -f $PID_FILEecho "done!"
† 参考
・AP4R で Parallels の壁を越える
・Rails Wiki - ReliableMsg