以前、Python で非同期の STOMP クライアントを書くためにstomp.py とthreading を書いてみたことがあったのですが、どうしても安定して動作させることができなくて結局 node.js で実装したことがありましたが、今回はstompest.async という違うライブラリを使って再チャレンジしてみました。
nikipore/stompest: STOMP client library for Python including both synchronous and Twisted implementations.
stompest is a full-featured STOMP 1.0, 1.1, and 1.2 implementation for Python 2.7 and Python 3 (versions 3.3 and higher) including both synchronous and asynchronous clients:
stompest.async はTwisted という Python のイベントドリブンなフレームワークを使って書かれており、動作自体は非常に安定しています。欠点としては async という名前が Python 3.7 から予約語になった*1ので、現在のリリースされているパッケージはそのままでは Python 3.6 でしか動かすことができないということでしょうか。
今回書いたコードはサンプルほぼそのままですが、 Heart Beat を有効化したかったので、プロトコルを STOMP v1.2 にするために以下のような感じにしました。
Consumer.py
# -*- coding: utf-8 -*-import jsonimport loggingfrom twisted.internet import defer, reactorfrom stompest.config import StompConfigfrom stompest.protocol import StompSpecfrom stompest.async import Stompfrom stompest.async.listener import SubscriptionListenerclass Consumer(object): QUEUE = '/queue/testOut' ERROR_QUEUE = '/queue/testConsumerError' def __init__(self, config=None): if config is None: config = StompConfig('tcp://localhost:61613', version=StompSpec.VERSION_1_2) self.config = config self.amq_id = 1 @defer.inlineCallbacks def run(self): client = Stomp(self.config) yield client.connect(heartBeats = (600000, 600000)) headers = { # client-individual mode is necessary for concurrent processing # (requires ActiveMQ >= 5.2) StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, StompSpec.ID_HEADER: str(self.amq_id), # the maximal number of messages the broker will let you work on at the same time 'activemq.prefetchSize': '100', } self.amq_id += 1 client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE)) def consume(self, client, frame): """ NOTE: you can return a Deferred here """ data = json.loads(frame.body.decode()) print('Received frame with count %d' % data['count'])if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) Consumer().run() reactor.run()