Module spin_sdk.wit.imports.mqtt
Global variables
var Error
-
Errors related to interacting with Mqtt
Classes
class Connection
-
Expand source code
class Connection: @classmethod def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self: """ Open a connection to the Mqtt instance at `address`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)` """ raise NotImplementedError def publish(self, topic: str, payload: bytes, qos: Qos) -> None: """ Publish an Mqtt message to the specified `topic`. Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)` """ raise NotImplementedError def __enter__(self) -> Self: """Returns self""" return self def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> bool | None: """ Release this resource. """ raise NotImplementedError
Static methods
def open(address: str, username: str, password: str, keep_alive_interval_in_secs: int) ‑> Self
Methods
def publish(self,
topic: str,
payload: bytes,
qos: Qos) ‑> None
class Error_ConnectionFailed (value: str)
-
Error_ConnectionFailed(value: str)
Expand source code
@dataclass class Error_ConnectionFailed: value: str
Class variables
var value : str
class Error_InvalidAddress
-
Error_InvalidAddress()
Expand source code
@dataclass class Error_InvalidAddress: pass
class Error_Other (value: str)
-
Error_Other(value: str)
Expand source code
@dataclass class Error_Other: value: str
Class variables
var value : str
class Error_TooManyConnections
-
Error_TooManyConnections()
Expand source code
@dataclass class Error_TooManyConnections: pass
class Qos (*args, **kwds)
-
QoS for publishing Mqtt messages
Expand source code
class Qos(Enum): """ QoS for publishing Mqtt messages """ AT_MOST_ONCE = 0 AT_LEAST_ONCE = 1 EXACTLY_ONCE = 2
Ancestors
- enum.Enum
Class variables
var AT_LEAST_ONCE
var AT_MOST_ONCE
var EXACTLY_ONCE