Skip to main content

Overview

Protocol: MQTT v5.0 over TLS (MQTTS) Pattern: Publish/Subscribe Endpoint: mqtts://mqtt.petstoreapi.com:8883 When to Use MQTT:
  • ✅ IoT devices and sensors
  • ✅ Low-bandwidth networks
  • ✅ Battery-powered devices
  • ✅ Unreliable network connections
  • ✅ Many devices publishing data
When NOT to Use MQTT:
  • ❌ Standard web applications (use REST/WebSocket instead)
  • ❌ High-bandwidth data transfer
  • ❌ Request/response pattern needed

How MQTT Works

Publisher (IoT Device)              Broker                  Subscriber
    │                                 │                         │
    ├──────── Publish ───────────────>│                         │
    │      Topic: orders/abc123/status │                         │
    │                                 │────────────────────────>│
    │                                 │      Forward Message    │
    │                                 │                         │
    ├──────── Publish ───────────────>│                         │
    │      Topic: payments/xyz789/status                       │
    │                                 │────────────────────────>│
    │                                 │      Forward Message    │
Key Characteristics:
  • Extremely lightweight (2-byte header)
  • Publish/Subscribe pattern
  • Three Quality of Service (QoS) levels
  • Works on unreliable networks
  • Battery-efficient

Connection

Python (paho-mqtt)

import paho.mqtt.client as mqtt
import ssl
import json

# MQTT callbacks
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT broker")
        # Subscribe to topics
        client.subscribe("orders/+/status", qos=1)
        client.subscribe("pets/+/availability", qos=1)
    else:
        print(f"Connection failed with code {rc}")

def on_message(client, userdata, msg):
    topic = msg.topic
    payload = json.loads(msg.payload.decode())
    print(f"Topic: {topic}")
    print(f"Message: {payload}")

def on_subscribe(client, userdata, mid, granted_qos):
    print(f"Subscribed with QoS: {granted_qos[0]}")

# Create MQTT client
client = mqtt.Client(
    client_id="petstore-iot-device-001",
    protocol=mqtt.MQTTv5
)

# Set authentication
client.username_pw_set("YOUR_USERNAME", "YOUR_PASSWORD")

# Enable TLS/SSL
client.tls_set(
    cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLS
)

# Set callbacks
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe

# Connect to broker
client.connect("mqtt.petstoreapi.com", 8883, 60)

# Start loop
client.loop_forever()

JavaScript (MQTT.js)

const mqtt = require('mqtt');

// Connect to MQTT broker
const client = mqtt.connect('mqtts://mqtt.petstoreapi.com:8883', {
  clientId: 'petstore-iot-device-001',
  username: 'YOUR_USERNAME',
  password: 'YOUR_PASSWORD',
  protocol: 'mqtts',
  protocolVersion: 5,
  clean: true,
  keepalive: 60,
  reconnectPeriod: 5000
});

client.on('connect', () => {
  console.log('Connected to MQTT broker');

  // Subscribe to topics
  client.subscribe('orders/+/status', { qos: 1 }, (err) => {
    if (!err) {
      console.log('Subscribed to order status updates');
    }
  });

  client.subscribe('pets/+/availability', { qos: 1 });
});

client.on('message', (topic, message) => {
  const payload = JSON.parse(message.toString());
  console.log(`Topic: ${topic}`);
  console.log('Payload:', payload);

  if (topic.startsWith('orders/')) {
    console.log(`Order ${payload.orderId} is now ${payload.status}`);
  }
});

client.on('error', (error) => {
  console.error('MQTT error:', error);
});

ESP32 (Arduino)

#include <WiFi.h>
#include <PubSubClient.h>
#include <WiFiClientSecure.h>

const char* mqtt_server = "mqtt.petstoreapi.com";
const int mqtt_port = 8883;
const char* mqtt_user = "YOUR_USERNAME";
const char* mqtt_password = "YOUR_PASSWORD";

WiFiClientSecure espClient;
PubSubClient client(espClient);

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("]: ");

  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();
}

void setup() {
  // Connect to WiFi
  WiFi.begin(ssid, password);

  // Configure MQTT
  espClient.setCACert(root_ca); // Set CA certificate
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
}

void reconnect() {
  while (!client.connected()) {
    if (client.connect("ESP32Client", mqtt_user, mqtt_password)) {
      client.subscribe("orders/+/status");
    } else {
      delay(5000);
    }
  }
}

Topics

Topic Structure

pets/{petId}/availability       # Pet availability updates
pets/{petId}/status             # Pet status changes
orders/{orderId}/status         # Order status updates
orders/{orderId}/payment        # Payment status
payments/{paymentId}/status     # Payment processing updates
inventory/{productId}/stock     # Stock level updates

Wildcards

# Single-level wildcard (+)
client.subscribe("orders/+/status")  # Matches: orders/abc123/status, orders/xyz789/status

# Multi-level wildcard (#)
client.subscribe("pets/#")  # Matches: pets/abc123/availability, pets/xyz789/status/updated

Publishing Messages

Python Example

# Publish order status
client.publish(
    "orders/abc123/status",
    payload=json.dumps({
        "orderId": "abc123",
        "status": "PROCESSING",
        "timestamp": "2025-01-06T12:00:00Z"
    }),
    qos=1,
    retain=False
)

# Publish pet availability
client.publish(
    "pets/pet_abc123/availability",
    payload=json.dumps({
        "petId": "pet_abc123",
        "available": True,
        "timestamp": "2025-01-06T12:00:00Z"
    }),
    qos=1
)

JavaScript Example

// Publish sensor data
client.publish('pets/pet_abc123/health', JSON.stringify({
  petId: 'pet_abc123',
  heartRate: 85,
  temperature: 38.5,
  activity: 'sleeping',
  timestamp: new Date().toISOString()
}), { qos: 1 });

Quality of Service (QoS)

QoS 0 - At Most Once

# Fire and forget, no acknowledgment
client.publish("topic", payload, qos=0)
Use for:
  • Non-critical data
  • High-frequency updates
  • Where message loss is acceptable

QoS 1 - At Least Once

# Delivered at least once, acknowledged
client.publish("topic", payload, qos=1)
Use for:
  • Important data
  • Must be delivered at least once
  • Can handle duplicate messages

QoS 2 - Exactly Once

# Delivered exactly once, 4-way handshake
client.publish("topic", payload, qos=2)
Use for:
  • Critical financial transactions
  • Where duplicates cannot be tolerated
  • Payment processing

Last Will and Testament

# Set Last Will message
lw_topic = "devices/connected/esp32_001"
lw_payload = json.dumps({"connected": False, "timestamp": time.time()})

client.will_set(
    topic=lw_topic,
    payload=lw_payload,
    qos=1,
    retain=True
)

client.connect("mqtt.petstoreapi.com", 8883, 60)

Retained Messages

# Retain last message for new subscribers
client.publish(
    "pets/pet_abc123/availability",
    payload=json.dumps({"available": True}),
    qos=1,
    retain=True  # New subscribers receive this immediately
)

Advanced Usage

Persistent Sessions

# Clean session = False (persistent)
client = mqtt.Client(client_id="device_001")
client.connect("mqtt.petstoreapi.com", 8883, clean_session=False)

# Broker stores subscriptions and undelivered messages

Batch Messages

messages = [
    ("pets/001/health", {"status": "healthy"}),
    ("pets/002/health", {"status": "healthy"}),
    ("pets/003/health", {"status": "sick"})
]

for topic, payload in messages:
    client.publish(topic, json.dumps(payload), qos=1)

Best Practices

1. Error Handling

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection. Reconnecting...")
        reconnect()

client.on_disconnect = on_disconnect

2. Connection Management

def reconnect():
    while True:
        try:
            client.reconnect()
            break
        except:
            time.sleep(5)

3. Message Validation

def on_message(client, userdata, msg):
    try:
        payload = json.loads(msg.payload.decode())

        # Validate required fields
        if 'timestamp' not in payload:
            raise ValueError("Missing timestamp")

        # Process message
        process_message(msg.topic, payload)

    except json.JSONDecodeError:
        print("Invalid JSON")
    except ValueError as e:
        print(f"Validation error: {e}")

Troubleshooting

Connection Failures

  • Check TLS/SSL certificate
  • Verify username/password
  • Ensure port 8883 is accessible
  • Check firewall rules

Missing Messages

  • Verify QoS level
  • Check topic subscriptions
  • Ensure broker permissions
  • Review message queue size

High Latency

  • Reduce message size
  • Optimize topic structure
  • Check network connectivity
  • Consider local broker

Comparison with Alternatives

FeatureMQTTWebSocketHTTP
PatternPub/SubBidirectionalRequest/Response
OverheadVery LowLowHigh
Battery Efficient⚠️
Unreliable Networks
QoS Levels
IoT Support⚠️

Security Best Practices

  1. Always use TLS (mqtts://)
  2. Use strong authentication (username/password or client certificates)
  3. Implement access control (topic-based permissions)
  4. Validate all messages (schema validation)
  5. Monitor for anomalies (unusual message patterns)

Interactive Documentation