Rozumiem, że tworzysz system IoT, który musi być absolutnie niezawodny. ESP32-C3 to świetny mikrokontroler do takich zadań, ale stabilność połączeń w MicroPythonie wymaga napisania przemyślanej pętli głównej (tzw. watchdoga programowego), która sama będzie leczyć problemy z siecią i brokerem.
Poniżej przygotowałem dla Ciebie kompletne rozwiązanie spełniające wszystkie Twoje założenia: rozdzielenie danych wrażliwych, odporność na błędy, obsługa flagi retain, sprzętowy reset w razie „zawieszenia” oraz zdalne komendy MQTT.
1. Plik konfiguracyjny z danymi wrażliwymi (secrets.py)
Utwórz plik secrets.py i wgraj go na urządzenie. Dzięki temu główny kod pozostanie czysty i bezpieczny do ewentualnego udostępniania.
Python
# secrets.py
WIFI_SSID = "Twoja_Siec_WiFi"
WIFI_PASS = "Twoje_Haslo_WiFi"
MQTT_BROKER = "192.168.1.100" # Adres IP serwera Mosquitto
MQTT_USER = "twoj_uzytkownik" # Opcjonalnie, zostaw puste "" jeśli brak
MQTT_PASS = "twoje_haslo" # Opcjonalnie, zostaw puste "" jeśli brak
CLIENT_ID = "esp32-c3-przekazniki"
2. Główny program (main.py)
Ten kod realizuje główne zadanie. Wykorzystuje bibliotekę umqtt.simple (która jest standardem w MicroPythonie, w razie jej braku pobierz ją używając mip).
Python
# main.py
import time
import machine
import network
from umqtt.simple import MQTTClient
import secrets
# Konfiguracja pinów dla ESP32-C3 (dostosuj piny do swojej płytki, np. 8 i 9)
PIN_DEV_1 = 8
PIN_DEV_2 = 9
dev1 = machine.Pin(PIN_DEV_1, machine.Pin.OUT)
dev2 = machine.Pin(PIN_DEV_2, machine.Pin.OUT)
# Tematy MQTT
TOPIC_CMD_1 = b"dom/urzadzenie1/set"
TOPIC_STAT_1 = b"dom/urzadzenie1/status"
TOPIC_CMD_2 = b"dom/urzadzenie2/set"
TOPIC_STAT_2 = b"dom/urzadzenie2/status"
# Tematy systemowe urządzenia
TOPIC_SYS_CMD = b"dom/esp/cmd" # Do wysyłania komend: "STATUS" lub "RESET"
TOPIC_SYS_STAT = b"dom/esp/status" # "ONLINE", "OFFLINE" (LWT) lub "REBOOTING"
mqtt_client = None
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Łączenie z siecią WiFi...")
wlan.connect(secrets.WIFI_SSID, secrets.WIFI_PASS)
timeout = 20
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Połączono z WiFi. IP:", wlan.ifconfig()[0])
return True
return False
def publish_state(topic, state):
"""Publikuje stan z użyciem flagi retain=True"""
try:
mqtt_client.publish(topic, state, retain=True)
print("Opublikowano: {} -> {}".format(topic.decode(), state.decode()))
except Exception as e:
print("Błąd publikacji:", e)
raise e # Przekaż błąd wyżej, aby pętla główna mogła zresetować połączenie
def publish_all_states():
"""Wysyła aktualny stan wszystkich urządzeń i samego ESP"""
state1 = b"ON" if dev1.value() else b"OFF"
state2 = b"ON" if dev2.value() else b"OFF"
publish_state(TOPIC_STAT_1, state1)
publish_state(TOPIC_STAT_2, state2)
publish_state(TOPIC_SYS_STAT, b"ONLINE")
def mqtt_callback(topic, msg):
"""Reakcja na przychodzące wiadomości"""
print("Otrzymano: {} -> {}".format(topic.decode(), msg.decode()))
# Obsługa Urządzenia 1
if topic == TOPIC_CMD_1:
if msg == b"ON":
dev1.value(1)
publish_state(TOPIC_STAT_1, b"ON")
elif msg == b"OFF":
dev1.value(0)
publish_state(TOPIC_STAT_1, b"OFF")
# Obsługa Urządzenia 2
elif topic == TOPIC_CMD_2:
if msg == b"ON":
dev2.value(1)
publish_state(TOPIC_STAT_2, b"ON")
elif msg == b"OFF":
dev2.value(0)
publish_state(TOPIC_STAT_2, b"OFF")
# Obsługa komend systemowych
elif topic == TOPIC_SYS_CMD:
if msg == b"RESET":
print("Wymuszono zdalny reset. Uruchamiam ponownie...")
publish_state(TOPIC_SYS_STAT, b"REBOOTING")
time.sleep(1)
machine.reset()
elif msg == b"STATUS":
print("Wymuszono odświeżenie statusu.")
publish_all_states()
def connect_mqtt():
global mqtt_client
mqtt_client = MQTTClient(
client_id=secrets.CLIENT_ID,
server=secrets.MQTT_BROKER,
user=secrets.MQTT_USER,
password=secrets.MQTT_PASS,
keepalive=60 # Ważne dla szybkiego wykrywania zerwania u brokera
)
mqtt_client.set_callback(mqtt_callback)
# Ustawienie LWT (Last Will and Testament) dla wykrywania problemów (retain=True)
mqtt_client.set_last_will(TOPIC_SYS_STAT, b"OFFLINE", retain=True)
mqtt_client.connect()
# Subskrypcje
mqtt_client.subscribe(TOPIC_CMD_1)
mqtt_client.subscribe(TOPIC_CMD_2)
mqtt_client.subscribe(TOPIC_SYS_CMD)
print("Zalogowano do MQTT i zasubskrybowano tematy.")
def main():
consecutive_errors = 0
max_errors = 10 # Po ilu nieudanych próbach resetujemy urządzenie
while True:
try:
# 1. Sprawdź / odnów połączenie WiFi
if not network.WLAN(network.STA_IF).isconnected():
if not connect_wifi():
raise OSError("Brak połączenia WiFi")
# 2. Inicjalizacja / odnowienie MQTT
connect_mqtt()
publish_all_states()
# Zerowanie licznika błędów po udanym połączeniu
consecutive_errors = 0
# 3. Pętla nasłuchująca MQTT
while True:
# check_msg() sprawdza czy są nowe wiadomości (nie blokuje długo)
# Jeśli WiFi/MQTT padnie, ta funkcja rzuci wyjątek, który złapie zewnętrzny 'except'
mqtt_client.check_msg()
# Opcjonalnie mqtt_client.ping() jeśli ESP tylko nasłuchuje
# a nie wysyła danych, aby podtrzymać keepalive.
time.sleep(0.1)
except Exception as e:
print("Wystąpił błąd w pętli głównej:", e)
consecutive_errors += 1
print("Licznik błędów: {}/{}".format(consecutive_errors, max_errors))
if mqtt_client is not None:
try:
mqtt_client.disconnect()
except:
pass
if consecutive_errors >= max_errors:
print("Przekroczono limit błędów. Twardy reset sprzętowy...")
time.sleep(1)
machine.reset()
# Poczekaj przed kolejną próbą przywrócenia połączenia
time.sleep(5)
if __name__ == "__main__":
main()
Najważniejsze mechanizmy obronne w kodzie:
- Pętla samolecząca
try...except: Zamiast kończyć działanie skryptu w razie braku prądu u rutera czy resetu Mosquitto, kod zlicza błędy. Odczeka 5 sekund i spróbuje ponownie nawiązać połączenie z WiFi i brokerem. - Sprzętowy Reset (Hard Reset): Jeżeli połączenie zawiedzie
max_errorsrazy (w kodzie ustawione na 10, czyli ok. 50 sekund bez sieci), moduł wykonuje twardy restart za pomocąmachine.reset(). Gwarantuje to wyczyszczenie ewentualnych wycieków pamięci i zawieszonych socketów w ESP32. - Zarządzanie flagą Retain:
- Wszelkie aktualizacje statusów przekaźników wychodzą z parametrem
retain=True. Zatem dowolna aplikacja kliencka podłączająca się do MQTT od razu wie, w jakim stanie jest maszyna. - Kod wykorzystuje LWT (Last Will and Testament). Kiedy ESP32-C3 loguje się do Mosquitto, mówi brokerowi: „Jeśli chamsko zniknę z sieci bez słowa wylogowania, wrzuć na ten temat status OFFLINE (z opcją retain)”.
- Wszelkie aktualizacje statusów przekaźników wychodzą z parametrem
- Zdalne zarządzanie (Temat CMD):
- Wysyłając string
STATUSna tematdom/esp/cmd, zmuszasz układ do natychmiastowego opublikowania stanu wszystkiego. - Wysyłając string
RESETna tematdom/esp/cmd, zdalnie resetujesz fizycznie moduł, uprzednio otrzymując po MQTT notyfikacjęREBOOTING.
- Wysyłając string
Obsługa esp32
Aby włączyć urządzenie, musisz opublikować wiadomość na odpowiedni temat komend (z końcówką /set). Temat /status, który podałeś, służy wyłącznie do odczytu (tam ESP32 publikuje informację o tym, w jakim aktualnie jest stanie).
Oto dokładne dane, których musisz użyć w swoim kliencie MQTT, aby sterować przekaźnikami:
Włączanie / Wyłączanie Urządzenia 1
- Temat:
gempro/urzadzenie1/set - Wiadomość (payload):
ON(aby wyłączyć, wyślijOFF)
Włączanie / Wyłączanie Urządzenia 2
- Temat:
gempro/urzadzenie2/set - Wiadomość (payload):
ON(aby wyłączyć, wyślijOFF)
Komendy systemowe (opcjonalnie)
- Temat:
gempro/esp/cmd - Wiadomość (payload):
STATUS(zmusza ESP do ponownego wysłania statusów wszystkich urządzeń) - Wiadomość (payload):
RESET(wykonuje twardy restart układu ESP32)
Ważna uwaga: Wielkość liter w wiadomości ma znaczenie. Musisz wysłać dokładnie ON lub OFF (wielkimi literami), ponieważ nasz kod w MicroPythonie szuka dokładnie takich ciągów znaków.
Jakiego programu używasz do testowania brokera (np. MQTT Explorer, konsola Linuxa, czy może Home Assistant)? Czy chciałbyś, abym wygenerował dla Ciebie gotową komendę terminalową mosquitto_pub, która wyśle ten sygnał?

No responses yet