Module spin_sdk.wit.imports.mqtt

Global variables

var Error

Errors related to interacting with Mqtt

Classes

class Connection
Expand source code
class Connection:
    
    @classmethod
    def open(cls, address: str, username: str, password: str, keep_alive_interval_in_secs: int) -> Self:
        """
        Open a connection to the Mqtt instance at `address`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
        """
        raise NotImplementedError
    def publish(self, topic: str, payload: bytes, qos: Qos) -> None:
        """
        Publish an Mqtt message to the specified `topic`.
        
        Raises: `spin_sdk.wit.types.Err(spin_sdk.wit.imports.mqtt.Error)`
        """
        raise NotImplementedError
    def __enter__(self) -> Self:
        """Returns self"""
        return self
                                
    def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> bool | None:
        """
        Release this resource.
        """
        raise NotImplementedError

Static methods

def open(address: str, username: str, password: str, keep_alive_interval_in_secs: int) ‑> Self

Open a connection to the Mqtt instance at address.

Raises: Err(Error)

Methods

def publish(self,
topic: str,
payload: bytes,
qos: Qos) ‑> None

Publish an Mqtt message to the specified topic.

Raises: Err(Error)

class Error_ConnectionFailed (value: str)

Error_ConnectionFailed(value: str)

Expand source code
@dataclass
class Error_ConnectionFailed:
    value: str

Class variables

var value : str
class Error_InvalidAddress

Error_InvalidAddress()

Expand source code
@dataclass
class Error_InvalidAddress:
    pass
class Error_Other (value: str)

Error_Other(value: str)

Expand source code
@dataclass
class Error_Other:
    value: str

Class variables

var value : str
class Error_TooManyConnections

Error_TooManyConnections()

Expand source code
@dataclass
class Error_TooManyConnections:
    pass
class Qos (*args, **kwds)

QoS for publishing Mqtt messages

Expand source code
class Qos(Enum):
    """
    QoS for publishing Mqtt messages
    """
    AT_MOST_ONCE = 0
    AT_LEAST_ONCE = 1
    EXACTLY_ONCE = 2

Ancestors

  • enum.Enum

Class variables

var AT_LEAST_ONCE
var AT_MOST_ONCE
var EXACTLY_ONCE