Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

paho.mqtt.python

License

NotificationsYou must be signed in to change notification settings

mqtt-tools/paho.mqtt.python

 
 

Thefull documentation is available here.

Warning breaking change - Release 2.0 contains a breaking change; see therelease notes andmigration details.

This document describes the source code for theEclipse Paho MQTT Python client library, which implements versions 5.0, 3.1.1, and 3.1 of the MQTT protocol.

This code provides a client class which enables applications to connect to anMQTT broker to publish messages, and to subscribe to topics and receive published messages. It also provides some helper functions to make publishing one off messages to an MQTT server very straightforward.

It supports Python 3.7+.

The MQTT protocol is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. Designed as an extremely lightweight publish/subscribe messaging transport, it is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.

Paho is anEclipse Foundation project.

Contents

Installation

The latest stable version is available in the Python Package Index (PyPi) and can be installed using

pip install paho-mqtt

Or withvirtualenv:

virtualenv paho-mqttsource paho-mqtt/bin/activatepip install paho-mqtt

To obtain the full code, including examples and tests, you can clone the git repository:

git clone https://github.com/eclipse/paho.mqtt.python

Once you have the code, it can be installed from your repository as well:

cd paho.mqtt.pythonpip install -e .

To perform all tests (including MQTT v5 tests), you also need to clone paho.mqtt.testing in paho.mqtt.python folder:

git clone https://github.com/eclipse/paho.mqtt.testing.gitcd paho.mqtt.testinggit checkout a4dc694010217b291ee78ee13a6d1db812f9babd

Known limitations

The following are the known unimplemented MQTT features.

Whenclean_session is False, the session is only stored in memory and not persisted. This means thatwhen the client is restarted (not just reconnected, the object is recreated usually because theprogram was restarted) the session is lost. This results in a possible message loss.

The following part of the client session is lost:

  • QoS 2 messages which have been received from the server, but have not been completely acknowledged.

    Since the client will blindly acknowledge any PUBCOMP (last message of a QoS 2 transaction), itwon't hang but will lose this QoS 2 message.

  • QoS 1 and QoS 2 messages which have been sent to the server, but have not been completely acknowledged.

    This means that messages passed topublish() may be lost. This could be mitigated by taking carethat all messages passed topublish() have a correspondingon_publish() call or use wait_for_publish.

    It also means that the broker may have the QoS2 message in the session. Since the client startswith an empty session it don't know it and will reuse the mid. This is not yet fixed.

Also, whenclean_session is True, this library will republish QoS > 0 message across networkreconnection. This means that QoS > 0 message won't be lost. But the standard says thatwe should discard any message for which the publish packet was sent. Our choice means thatwe are not compliant with the standard and it's possible for QoS 2 to be received twice.

You should setclean_session = False if you need the QoS 2 guarantee of only one delivery.

Usage and API

Detailed API documentationis available online or could be built fromdocs/ and samples are available in theexamples directory.

The package provides two modules, a full Client and few helpers for simple publishing or subscribing.

Getting Started

Here is a very simple example that subscribes to the broker $SYS topic tree and prints out the resulting messages:

importpaho.mqtt.clientasmqtt# The callback for when the client receives a CONNACK response from the server.defon_connect(client,userdata,flags,reason_code,properties):print(f"Connected with result code{reason_code}")# Subscribing in on_connect() means that if we lose the connection and# reconnect then subscriptions will be renewed.client.subscribe("$SYS/#")# The callback for when a PUBLISH message is received from the server.defon_message(client,userdata,msg):print(msg.topic+" "+str(msg.payload))mqttc=mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)mqttc.on_connect=on_connectmqttc.on_message=on_messagemqttc.connect("mqtt.eclipseprojects.io",1883,60)# Blocking call that processes network traffic, dispatches callbacks and# handles reconnecting.# Other loop*() functions are available that give a threaded interface and a# manual interface.mqttc.loop_forever()

Client

You can use the client class as an instance, within a class or by subclassing. The general usage flow is as follows:

  • Create a client instance
  • Connect to a broker using one of theconnect*() functions
  • Call one of theloop*() functions to maintain network traffic flow with the broker
  • Usesubscribe() to subscribe to a topic and receive messages
  • Usepublish() to publish messages to the broker
  • Usedisconnect() to disconnect from the broker

Callbacks will be called to allow the application to process events as necessary. These callbacks are described below.

Network loop

These functions are the driving force behind the client. If they are notcalled, incoming network data will not be processed and outgoing network datawill not be sent. There are four options for managing thenetwork loop. Three are described here, the fourth in "External event loopsupport" below. Do not mix the different loop functions.

loop_start() / loop_stop()
mqttc.loop_start()whileTrue:temperature=sensor.blocking_read()mqttc.publish("paho/temperature",temperature)mqttc.loop_stop()

These functions implement a threaded interface to the network loop. Callingloop_start() once, before or afterconnect*(), runs a thread in thebackground to call loop() automatically. This frees up the main thread forother work that may be blocking. This call also handles reconnecting to thebroker. Call loop_stop() to stop the background thread.The loop is also stopped if you call disconnect().

loop_forever()
mqttc.loop_forever(retry_first_connection=False)

This is a blocking form of the network loop and will not return until theclient calls disconnect(). It automatically handles reconnecting.

Except for the first connection attempt when using connect_async, useretry_first_connection=True to make it retry the first connection.

Warning: This might lead to situations where the client keeps connecting to annon existing host without failing.

loop()
run=Truewhilerun:rc=mqttc.loop(timeout=1.0)ifrc!=0:# need to handle error, possible reconnecting or stopping the application

Call regularly to process network events. This call waits inselect() untilthe network socket is available for reading or writing, if appropriate, thenhandles the incoming/outgoing data. This function blocks for up totimeoutseconds.timeout must not exceed thekeepalive value for the client oryour client will be regularly disconnected by the broker.

Using this kind of loop, require you to handle reconnection strategie.

Callbacks

The interface to interact with paho-mqtt include various callback that are called bythe library when some events occur.

The callbacks are functions defined in your code, to implement the require action on those events. This couldbe simply printing received message or much more complex behaviour.

Callbacks API is versioned, and the selected version is the CallbackAPIVersion you provided to Clientconstructor. Currently two version are supported:

  • CallbackAPIVersion.VERSION1: it's the historical version used in paho-mqtt before version 2.0.It's the API used before the introduction of CallbackAPIVersion.This version is deprecated and will be removed in paho-mqtt version 3.0.
  • CallbackAPIVersion.VERSION2: This version is more consistent between protocol MQTT 3.x and MQTT 5.x. It's alsomuch more usable with MQTT 5.x since reason code and properties are always provided when available.It's recommended for all user to upgrade to this version. It's highly recommended for MQTT 5.x user.

The following callbacks exists:

  • on_connect(): called when the CONNACK from the broker is received. The call could be for a refused connection,check the reason_code to see if the connection is successful or rejected.
  • on_connect_fail(): called by loop_forever() and loop_start() when the TCP connection failed to establish.This callback is not called when using connect() or reconnect() directly. It's only called followingan automatic (re)connection made by loop_start() and loop_forever()
  • on_disconnect(): called when the connection is closed.
  • on_message(): called when a MQTT message is received from the broker.
  • on_publish(): called when an MQTT message was sent to the broker. Depending on QoS level the callback is calledat different moment:
    • For QoS == 0, it's called as soon as the message is sent over the network. This could be before the correspondingpublish() return.
    • For QoS == 1, it's called when the corresponding PUBACK is received from the broker
    • For QoS == 2, it's called when the corresponding PUBCOMP is received from the broker
  • on_subscribe(): called when the SUBACK is received from the broker
  • on_unsubscribe(): called when the UNSUBACK is received from the broker
  • on_log(): called when the library log a message
  • on_socket_open, on_socket_close, on_socket_register_write, on_socket_unregister_write: callbacks used for external loop support. See below for details.

For the signature of each callback, see theonline documentation.

Subscriber example
importpaho.mqtt.clientasmqttdefon_subscribe(client,userdata,mid,reason_code_list,properties):# Since we subscribed only for a single channel, reason_code_list contains# a single entryifreason_code_list[0].is_failure:print(f"Broker rejected you subscription:{reason_code_list[0]}")else:print(f"Broker granted the following QoS:{reason_code_list[0].value}")defon_unsubscribe(client,userdata,mid,reason_code_list,properties):# Be careful, the reason_code_list is only present in MQTTv5.# In MQTTv3 it will always be emptyiflen(reason_code_list)==0ornotreason_code_list[0].is_failure:print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")else:print(f"Broker replied with failure:{reason_code_list[0]}")client.disconnect()defon_message(client,userdata,message):# userdata is the structure we choose to provide, here it's a list()userdata.append(message.payload)# We only want to process 10 messagesiflen(userdata)>=10:client.unsubscribe("$SYS/#")defon_connect(client,userdata,flags,reason_code,properties):ifreason_code.is_failure:print(f"Failed to connect:{reason_code}. loop_forever() will retry connection")else:# we should always subscribe from on_connect callback to be sure# our subscribed is persisted across reconnections.client.subscribe("$SYS/#")mqttc=mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)mqttc.on_connect=on_connectmqttc.on_message=on_messagemqttc.on_subscribe=on_subscribemqttc.on_unsubscribe=on_unsubscribemqttc.user_data_set([])mqttc.connect("mqtt.eclipseprojects.io")mqttc.loop_forever()print(f"Received the following message:{mqttc.user_data_get()}")
publisher example
importtimeimportpaho.mqtt.clientasmqttdefon_publish(client,userdata,mid,reason_code,properties):# reason_code and properties will only be present in MQTTv5. It's always unset in MQTTv3try:userdata.remove(mid)exceptKeyError:print("on_publish() is called with a mid not present in unacked_publish")print("This is due to an unavoidable race-condition:")print("* publish() return the mid of the message sent.")print("* mid from publish() is added to unacked_publish by the main thread")print("* on_publish() is called by the loop_start thread")print("While unlikely (because on_publish() will be called after a network round-trip),")print(" this is a race-condition that COULD happen")print("")print("The best solution to avoid race-condition is using the msg_info from publish()")print("We could also try using a list of acknowledged mid rather than removing from pending list,")print("but remember that mid could be re-used !")unacked_publish=set()mqttc=mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)mqttc.on_publish=on_publishmqttc.user_data_set(unacked_publish)mqttc.connect("mqtt.eclipseprojects.io")mqttc.loop_start()# Our application produce some messagesmsg_info=mqttc.publish("paho/test/topic","my message",qos=1)unacked_publish.add(msg_info.mid)msg_info2=mqttc.publish("paho/test/topic","my message2",qos=1)unacked_publish.add(msg_info2.mid)# Wait for all message to be publishedwhilelen(unacked_publish):time.sleep(0.1)# Due to race-condition described above, the following way to wait for all publish is safermsg_info.wait_for_publish()msg_info2.wait_for_publish()mqttc.disconnect()mqttc.loop_stop()

Logger

The Client emit some log message that could be useful during troubleshooting. The easiest way toenable logs is the call enable_logger(). It's possible to provide a custom logger or let thedefault logger being used.

Example:

importloggingimportpaho.mqtt.clientasmqttlogging.basicConfig(level=logging.DEBUG)mqttc=mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)mqttc.enable_logger()mqttc.connect("mqtt.eclipseprojects.io",1883,60)mqttc.loop_start()# Do additional action needed, publish, subscribe, ...[...]

It's also possible to define a on_log callback that will receive a copy of all log messages. Example:

importpaho.mqtt.clientasmqttdefon_log(client,userdata,paho_log_level,messages):ifpaho_log_level==mqtt.LogLevel.MQTT_LOG_ERR:print(message)mqttc=mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)mqttc.on_log=on_logmqttc.connect("mqtt.eclipseprojects.io",1883,60)mqttc.loop_start()# Do additional action needed, publish, subscribe, ...[...]

The correspondence with Paho logging levels and standard ones is the following:

Pahologging
MQTT_LOG_ERRlogging.ERROR
MQTT_LOG_WARNINGlogging.WARNING
MQTT_LOG_NOTICElogging.INFO(no direct equivalent)
MQTT_LOG_INFOlogging.INFO
MQTT_LOG_DEBUGlogging.DEBUG

External event loop support

To support other network loop like asyncio (seeexamples), the library expose somemethod and callback to support those use-case.

The following loop method exists:

  • loop_read: should be called when the socket is ready for reading.
  • loop_write: should be called when the socket is ready for writing AND the library want to write data.
  • loop_misc: should be called every few seconds to handle message retrying and pings.

In pseudo code, it give the following:

whilerun:ifneed_read:mqttc.loop_read()ifneed_write:mqttc.loop_write()mqttc.loop_misc()ifnotneed_readandnotneed_write:# But don't wait more than few seconds, loop_misc() need to be called regularlywait_for_change_in_need_read_or_write()updated_need_read_and_write()

The tricky part is implementing the update of need_read / need_write and wait for condition change. To supportthis, the following method exists:

  • socket(): which return the socket object when the TCP connection is open.This call is particularly useful forselect based loops. Seeexamples/loop_select.py.

  • want_write(): return true if there is data waiting to be written. This is close to theneed_writew of above pseudo-code, but you should also check whether the socket is ready for writing.

  • callbackson_socket_*:

    • on_socket_open: called when the socket is opened.
    • on_socket_close: called when the socket is about to be closed.
    • on_socket_register_write: called when there is data the client want to write on the socket
    • on_socket_unregister_write: called when there is no more data to write on the socket.

    Callbacks are particularly useful for event loops where you register or unregister a socketfor reading+writing. Seeexamples/loop_asyncio.py for an example.

The callbacks are always called in this order:

  • on_socket_open
  • Zero or more times:
    • on_socket_register_write
    • on_socket_unregister_write
  • on_socket_close

Global helper functions

The client module also offers some global helper functions.

topic_matches_sub(sub, topic) can be used to check whether atopicmatches asubscription.

For example:

the topicfoo/bar would match the subscriptionfoo/# or+/bar

the topicnon/matching would not match the subscriptionnon/+/+

Publish

This module provides some helper functions to allow straightforward publishingof messages in a one-shot manner. In other words, they are useful for thesituation where you have a single/multiple messages you want to publish to abroker, then disconnect with nothing else required.

The two functions provided are single() and multiple().

Both functions include support for MQTT v5.0, but do not currently let youset any properties on connection or when sending messages.

Single

Publish a single message to a broker, then disconnect cleanly.

Example:

importpaho.mqtt.publishaspublishpublish.single("paho/test/topic","payload",hostname="mqtt.eclipseprojects.io")

Multiple

Publish multiple messages to a broker, then disconnect cleanly.

Example:

frompaho.mqtt.enumsimportMQTTProtocolVersionimportpaho.mqtt.publishaspublishmsgs= [{'topic':"paho/test/topic",'payload':"multiple 1"},    ("paho/test/topic","multiple 2",0,False)]publish.multiple(msgs,hostname="mqtt.eclipseprojects.io",protocol=MQTTProtocolVersion.MQTTv5)

Subscribe

This module provides some helper functions to allow straightforward subscribingand processing of messages.

The two functions provided are simple() and callback().

Both functions include support for MQTT v5.0, but do not currently let youset any properties on connection or when subscribing.

Simple

Subscribe to a set of topics and return the messages received. This is ablocking function.

Example:

importpaho.mqtt.subscribeassubscribemsg=subscribe.simple("paho/test/topic",hostname="mqtt.eclipseprojects.io")print("%s %s"% (msg.topic,msg.payload))

Using Callback

Subscribe to a set of topics and process the messages received using a userprovided callback.

Example:

importpaho.mqtt.subscribeassubscribedefon_message_print(client,userdata,message):print("%s %s"% (message.topic,message.payload))userdata["message_count"]+=1ifuserdata["message_count"]>=5:# it's possible to stop the program by disconnectingclient.disconnect()subscribe.callback(on_message_print,"paho/test/topic",hostname="mqtt.eclipseprojects.io",userdata={"message_count":0})

Reporting bugs

Please report bugs in the issues tracker athttps://github.com/eclipse/paho.mqtt.python/issues.

More information

Discussion of the Paho clients takes place on theEclipse paho-dev mailing list.

General questions about the MQTT protocol itself (not this library) are discussed in theMQTT Google Group.

There is much more information available via theMQTT community site.

About

paho.mqtt.python

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python96.4%
  • HTML2.4%
  • Other1.2%

[8]ページ先頭

©2009-2025 Movatter.jp