w MicroPython – Odporny na Błędy
Claude Opus 4.1: Oto kompletny program klienta MQTT dla ESP32 z obsługą reconnect dla WiFi i MQTT:
Główny program (main.py)
import machine
import network
import time
import ubinascii
from umqtt.robust import MQTTClient
import gc
# ============= KONFIGURACJA =============
# WiFi
WIFI_SSID = "Twoja_Nazwa_WiFi"
WIFI_PASSWORD = "Twoje_Haslo_WiFi"
# MQTT
MQTT_BROKER = "broker.hivemq.com" # lub twój broker
MQTT_PORT = 1883
MQTT_CLIENT_ID = ubinascii.hexlify(machine.unique_id())
MQTT_USER = "" # opcjonalne
MQTT_PASSWORD = "" # opcjonalne
MQTT_KEEPALIVE = 60
# Tematy MQTT
TOPIC_SUBSCRIBE = "esp32/commands"
TOPIC_PUBLISH = "esp32/status"
TOPIC_SENSOR = "esp32/sensor"
# Timery i interwały
WIFI_RETRY_DELAY = 5 # sekund
MQTT_RETRY_DELAY = 10 # sekund
SENSOR_INTERVAL = 30 # sekund
HEARTBEAT_INTERVAL = 60 # sekund
# LED do sygnalizacji (opcjonalne)
LED_PIN = 2 # wbudowana LED na ESP32
led = machine.Pin(LED_PIN, machine.Pin.OUT)
# ============= KLASA MQTT CLIENT =============
class RobustMQTTClient:
def __init__(self):
self.wlan = network.WLAN(network.STA_IF)
self.mqtt_client = None
self.connected_wifi = False
self.connected_mqtt = False
self.last_sensor_time = 0
self.last_heartbeat_time = 0
def blink_led(self, times=1, delay=0.2):
"""Miganie LED dla sygnalizacji statusu"""
for _ in range(times):
led.value(1)
time.sleep(delay)
led.value(0)
time.sleep(delay)
def connect_wifi(self):
"""Łączenie z WiFi z obsługą błędów"""
try:
if self.wlan.isconnected():
return True
print(f"[WiFi] Łączenie z {WIFI_SSID}...")
self.wlan.active(True)
self.wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# Czekaj na połączenie
timeout = 20
start_time = time.time()
while not self.wlan.isconnected():
if time.time() - start_time > timeout:
print("[WiFi] Timeout połączenia")
self.wlan.disconnect()
self.wlan.active(False)
time.sleep(1)
self.wlan.active(True)
return False
self.blink_led(1, 0.1)
time.sleep(0.5)
if self.wlan.isconnected():
self.connected_wifi = True
ip = self.wlan.ifconfig()[0]
print(f"[WiFi] Połączono! IP: {ip}")
self.blink_led(3, 0.1)
return True
except Exception as e:
print(f"[WiFi] Błąd: {e}")
self.connected_wifi = False
return False
def mqtt_callback(self, topic, msg):
"""Callback dla wiadomości MQTT"""
try:
topic_str = topic.decode('utf-8')
msg_str = msg.decode('utf-8')
print(f"[MQTT] Otrzymano: {topic_str} -> {msg_str}")
# Obsługa komend
if topic_str == TOPIC_SUBSCRIBE:
self.handle_command(msg_str)
except Exception as e:
print(f"[MQTT] Błąd callback: {e}")
def handle_command(self, command):
"""Obsługa otrzymanych komend"""
try:
if command == "LED_ON":
led.value(1)
self.publish_message(TOPIC_PUBLISH, "LED włączona")
elif command == "LED_OFF":
led.value(0)
self.publish_message(TOPIC_PUBLISH, "LED wyłączona")
elif command == "RESTART":
self.publish_message(TOPIC_PUBLISH, "Restartuję...")
time.sleep(2)
machine.reset()
elif command == "STATUS":
status = self.get_device_status()
self.publish_message(TOPIC_PUBLISH, status)
else:
print(f"[MQTT] Nieznana komenda: {command}")
except Exception as e:
print(f"[MQTT] Błąd obsługi komendy: {e}")
def connect_mqtt(self):
"""Łączenie z brokerem MQTT"""
try:
if not self.connected_wifi:
print("[MQTT] Brak połączenia WiFi")
return False
print(f"[MQTT] Łączenie z {MQTT_BROKER}:{MQTT_PORT}...")
# Tworzenie klienta MQTT
self.mqtt_client = MQTTClient(
MQTT_CLIENT_ID,
MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USER if MQTT_USER else None,
password=MQTT_PASSWORD if MQTT_PASSWORD else None,
keepalive=MQTT_KEEPALIVE
)
# Ustawienie callback
self.mqtt_client.set_callback(self.mqtt_callback)
# Ustawienie Last Will Testament
self.mqtt_client.set_last_will(
TOPIC_PUBLISH,
"ESP32 rozłączony (nieoczekiwanie)",
retain=True,
qos=1
)
# Połączenie
self.mqtt_client.connect()
# Subskrypcja
self.mqtt_client.subscribe(TOPIC_SUBSCRIBE)
self.connected_mqtt = True
print(f"[MQTT] Połączono z brokerem!")
# Wyślij wiadomość powitalną
self.publish_message(TOPIC_PUBLISH, "ESP32 połączony")
self.blink_led(5, 0.05)
return True
except Exception as e:
print(f"[MQTT] Błąd połączenia: {e}")
self.connected_mqtt = False
self.cleanup_mqtt()
return False
def cleanup_mqtt(self):
"""Czyszczenie połączenia MQTT"""
try:
if self.mqtt_client:
try:
self.mqtt_client.disconnect()
except:
pass
self.mqtt_client = None
except:
pass
self.connected_mqtt = False
def publish_message(self, topic, message, retain=False):
"""Publikowanie wiadomości MQTT"""
try:
if self.connected_mqtt and self.mqtt_client:
self.mqtt_client.publish(topic, message, retain=retain)
print(f"[MQTT] Wysłano: {topic} -> {message}")
return True
except Exception as e:
print(f"[MQTT] Błąd publikacji: {e}")
self.connected_mqtt = False
return False
def read_sensor_data(self):
"""Odczyt danych z czujników (przykład)"""
try:
# Przykład: temperatura z wewnętrznego czujnika
import esp32
temp = esp32.raw_temperature()
temp_c = (temp - 32) / 1.8
# Wolna pamięć
free_mem = gc.mem_free()
data = {
"temp": round(temp_c, 2),
"memory": free_mem,
"uptime": time.time()
}
return str(data)
except Exception as e:
print(f"[Sensor] Błąd odczytu: {e}")
return None
def get_device_status(self):
"""Pobierz status urządzenia"""
try:
status = {
"wifi": self.wlan.isconnected(),
"mqtt": self.connected_mqtt,
"ip": self.wlan.ifconfig()[0] if self.wlan.isconnected() else "N/A",
"rssi": self.wlan.status('rssi') if self.wlan.isconnected() else 0,
"free_mem": gc.mem_free(),
"uptime": time.time()
}
return str(status)
except:
return "Status niedostępny"
def check_mqtt_connection(self):
"""Sprawdzenie połączenia MQTT poprzez ping"""
try:
if self.mqtt_client:
self.mqtt_client.ping()
return True
except:
self.connected_mqtt = False
return False
def maintain_connections(self):
"""Utrzymywanie połączeń WiFi i MQTT"""
# Sprawdź WiFi
if not self.wlan.isconnected():
self.connected_wifi = False
self.connected_mqtt = False
print("[WiFi] Utracono połączenie, próba reconnect...")
if self.connect_wifi():
time.sleep(2)
self.connect_mqtt()
else:
time.sleep(WIFI_RETRY_DELAY)
# Sprawdź MQTT
elif not self.connected_mqtt or not self.check_mqtt_connection():
print("[MQTT] Utracono połączenie, próba reconnect...")
self.cleanup_mqtt()
time.sleep(2)
if not self.connect_mqtt():
time.sleep(MQTT_RETRY_DELAY)
def run(self):
"""Główna pętla programu"""
print("\n=== ESP32 MQTT Client ===")
print(f"Client ID: {MQTT_CLIENT_ID}")
# Początkowe połączenie
while not self.connect_wifi():
print(f"[WiFi] Ponowna próba za {WIFI_RETRY_DELAY}s...")
time.sleep(WIFI_RETRY_DELAY)
while not self.connect_mqtt():
print(f"[MQTT] Ponowna próba za {MQTT_RETRY_DELAY}s...")
time.sleep(MQTT_RETRY_DELAY)
print("\n[System] Rozpoczynam główną pętlę...")
while True:
try:
# Utrzymuj połączenia
self.maintain_connections()
# Jeśli połączony z MQTT
if self.connected_mqtt:
# Sprawdź wiadomości
try:
self.mqtt_client.check_msg()
except:
self.connected_mqtt = False
continue
# Wysyłaj dane z czujników
current_time = time.time()
if current_time - self.last_sensor_time >= SENSOR_INTERVAL:
sensor_data = self.read_sensor_data()
if sensor_data:
self.publish_message(TOPIC_SENSOR, sensor_data)
self.last_sensor_time = current_time
# Heartbeat
if current_time - self.last_heartbeat_time >= HEARTBEAT_INTERVAL:
self.publish_message(TOPIC_PUBLISH, "heartbeat")
self.last_heartbeat_time = current_time
gc.collect() # Czyszczenie pamięci
# Krótka przerwa
time.sleep(0.1)
except KeyboardInterrupt:
print("\n[System] Przerwano przez użytkownika")
break
except Exception as e:
print(f"[System] Błąd głównej pętli: {e}")
time.sleep(5)
# Czyszczenie przed zakończeniem
try:
self.publish_message(TOPIC_PUBLISH, "ESP32 rozłączany")
time.sleep(1)
self.cleanup_mqtt()
self.wlan.disconnect()
self.wlan.active(False)
except:
pass
print("[System] Zakończono")
# ============= URUCHOMIENIE =============
if __name__ == "__main__":
client = RobustMQTTClient()
client.run()
Plik startowy (boot.py)
import esp
import gc
import webrepl
# Wyłącz debug ESP
esp.osdebug(None)
# Czyszczenie pamięci
gc.collect()
# Opcjonalnie: włącz WebREPL dla debugowania
# webrepl.start()
print("Boot completed")
Instalacja biblioteki MQTT
Przed uruchomieniem musisz zainstalować bibliotekę umqtt.robust:
# W REPL ESP32 lub przez Thonny
import upip
upip.install('micropython-umqtt.robust')
Kluczowe funkcje odpornościowe:
1. Automatyczny reconnect WiFi
- Monitorowanie stanu połączenia
- Ponowne próby z timeoutem
- Reset modułu WiFi przy problemach
2. Automatyczny reconnect MQTT
- Używa
umqtt.robustdla lepszej stabilności - Last Will Testament dla powiadomień o rozłączeniu
- Ping do sprawdzania żywotności połączenia
3. Obsługa błędów
- Try/except we wszystkich krytycznych miejscach
- Graceful degradation przy problemach
- Czyszczenie zasobów
4. Optymalizacja pamięci
- Regularne
gc.collect() - Monitorowanie wolnej pamięci
- Minimalne użycie pamięci
5. Monitoring i diagnostyka
- LED sygnalizująca stan
- Heartbeat do monitorowania
- Logi z tagami dla łatwego debugowania
Program automatycznie radzi sobie z typowymi problemami jak zanik WiFi, restart routera, problemy z brokerem MQTT czy chwilowe przerwy w sieci.

No responses yet