Movatterモバイル変換


[0]ホーム

URL:


BLOGTIMES

cles::blog

平常心是道
« :: »
2020/12/25

Python で非同期の STOMP クライアントを書く

  python 
このエントリーをはてなブックマークに追加

以前、Python で非同期の STOMP クライアントを書くためにstomp.pythreading を書いてみたことがあったのですが、どうしても安定して動作させることができなくて結局 node.js で実装したことがありましたが、今回はstompest.async という違うライブラリを使って再チャレンジしてみました。

nikipore/stompest: STOMP client library for Python including both synchronous and Twisted implementations.

stompest is a full-featured STOMP 1.0, 1.1, and 1.2 implementation for Python 2.7 and Python 3 (versions 3.3 and higher) including both synchronous and asynchronous clients:

stompest.async はTwisted という Python のイベントドリブンなフレームワークを使って書かれており、動作自体は非常に安定しています。欠点としては async という名前が Python 3.7 から予約語になった*1ので、現在のリリースされているパッケージはそのままでは Python 3.6 でしか動かすことができないということでしょうか。

今回書いたコードはサンプルほぼそのままですが、 Heart Beat を有効化したかったので、プロトコルを STOMP v1.2 にするために以下のような感じにしました。

Consumer.py

# -*- coding: utf-8 -*-import jsonimport loggingfrom twisted.internet import defer, reactorfrom stompest.config import StompConfigfrom stompest.protocol import StompSpecfrom stompest.async import Stompfrom stompest.async.listener import SubscriptionListenerclass Consumer(object): QUEUE = '/queue/testOut' ERROR_QUEUE = '/queue/testConsumerError' def __init__(self, config=None): if config is None: config = StompConfig('tcp://localhost:61613', version=StompSpec.VERSION_1_2) self.config = config self.amq_id = 1 @defer.inlineCallbacks def run(self): client = Stomp(self.config) yield client.connect(heartBeats = (600000, 600000)) headers = { # client-individual mode is necessary for concurrent processing # (requires ActiveMQ >= 5.2) StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, StompSpec.ID_HEADER: str(self.amq_id), # the maximal number of messages the broker will let you work on at the same time 'activemq.prefetchSize': '100', } self.amq_id += 1 client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE)) def consume(self, client, frame): """ NOTE: you can return a Deferred here """ data = json.loads(frame.body.decode()) print('Received frame with count %d' % data['count'])if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) Consumer().run() reactor.run()

byhsur at 16:02[5年前][4年前][3年前][2年前][1年前][1年後][2年後][3年後][4年後] |
こんな記事もあります 「QUEUE Stomp headers
2022 年の人気エントリ Top 100
2021 年の人気エントリ Top 100
curl で CORS の設定を確認する
2020 年の人気エントリ Top 100
Python 開発に Pipenv を使うようにしてみた
Zapier を使ってメールの一部を Pushbullet でプッシュ通知する
POSIX Message Queue を Python から使う
POSIX Message Queue を使ってプロセス間通信をする
Python から SystemV IPC 共有メモリを使う
Cookie を JavaScript から取得させない
トラックバックについて
Trackback URL:
お気軽にどうぞ。トラックバック前にポリシーをお読みください。[policy]
このエントリへのTrackbackにはこのURLが必要です→https://blog.cles.jp/item/12190
Trackbacks
このエントリにトラックバックはありません
Comments
愛のあるツッコミをお気軽にどうぞ。[policy]
古いエントリについてはコメント制御しているため、即時に反映されないことがあります。
コメントはありません
Comments Form

コメントは承認後の表示となります。
OpenIDでログインすると、即時に公開されます。

OpenID を使ってログインすることができます。

Identity URL:Yahoo! JAPAN IDでログイン

« :: »
Copyright © 2004-2023 by CLES All Rights Reserved.
サイト内検索
検索ワードランキング
へぇが多いエントリ
閲覧数が多いエントリ
1 .アーロンチェアのポスチャーフィットを修理(99743)
2 .福岡銀がデマの投稿者への刑事告訴を検討中(99137)
3 .年次の人間ドックへ(99136)
4 .三菱鉛筆がラミーを買収(98744)
5 .2023 年分の確定申告完了!(1つめ)(98706)
最新のエントリ
cles::blogについて
誰が書いてる?
最近行った場所
サイトポリシー
タグ一覧
検索ワードランキング

Referrers

    Powered by CLES
    Nucleus CMS v3.31SP3/w memcached
    21376593(W:7218 Y:1720 T:0692)
    cles::blogのはてなブックマーク数
    benchmark


    [8]ページ先頭

    ©2009-2025 Movatter.jp