Files
sensecap/datalogger_emulator.py
2024-12-30 20:03:22 +01:00

103 lines
3.1 KiB
Python

from contextlib import asynccontextmanager
import random
import time
import json
from fastapi import FastAPI, BackgroundTasks
import paho.mqtt.client as mqtt
emulator_rnd = f"emulator_{random.randint(1, 50)}"
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
global mqtt_client
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5, client_id='enertec_client')
mqtt_client.username_pw_set('enertec', 'pwd@mqtt!enertec')
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
except Exception as e:
print(f"Errore durante la connessione al broker MQTT: {e}")
yield
try:
mqtt_client.loop_stop()
mqtt_client.disconnect()
except Exception as e:
print(f"Errore durante la disconnessione dal broker MQTT: {e}")
# Configurazione FastAPI
app = FastAPI(lifespan=lifespan)
# Configurazione MQTT
MQTT_BROKER = "mqtt"
MQTT_PORT = 1883
MQTT_TOPIC_STATUS = f"iot/ipnode/{emulator_rnd}/update/event/change-device-status"
MQTT_TOPIC_SENSOR = f"iot/ipnode/{emulator_rnd}/update/event/measure-sensor"
mqtt_client = None
# Simulazione sensori
def generate_sensor_data():
sensors = [
{"channel": "1", "type": "temperature", "min": 20.0, "max": 30.0},
{"channel": "2", "type": "humidity", "min": 40.0, "max": 60.0},
]
data = []
for sensor in sensors:
value = random.uniform(sensor["min"], sensor["max"])
data.append({
"channel": sensor["channel"],
"measurements": {sensor["type"]: round(value, 2)},
"measureTime": int(time.time() * 1000),
})
return data
def generate_status():
return {
"battery": random.randint(50, 100),
"signal_strength": random.randint(-70, -50),
"timestamp": int(time.time() * 1000),
}
def publish_status(client):
payload = {
"deviceEui": emulator_rnd,
"events": [
{
"name": "change-device-status",
"value": generate_status(),
"timestamp": int(time.time() * 1000),
}
],
}
client.publish(MQTT_TOPIC_STATUS, json.dumps(payload), qos=1,retain=True)
print(f"Published status: {payload}")
def publish_sensor_data(client):
payload = {
"deviceEui": emulator_rnd,
"events": [
{
"name": "measure-sensor",
"value": generate_sensor_data(),
"timestamp": int(time.time() * 1000),
}
],
}
client.publish(MQTT_TOPIC_SENSOR, json.dumps(payload), qos=1)
print(f"Published sensor data: {payload}")
# Servizio di simulazione
def run_emulation(client):
while True:
publish_status(client)
publish_sensor_data(client)
time.sleep(0.1) # Invio ogni 10 secondi
@app.post("/start_emulation")
def start_emulation(background_tasks: BackgroundTasks):
background_tasks.add_task(run_emulation, mqtt_client)
return {"message": "Emulazione avviata"}
@app.post("/stop_emulation")
def stop_emulation():
return {"message": "Non implementato, fermare manualmente il processo"}