Jump to content

Importieren WARP3 mqtt Daten nach InfluxDB


universal-dilettant

Recommended Posts

Hallo,
ich stehe vor der Aufgabe, die über die WB geladenen kWh in eine InfluxDB zu importieren (so wie ich das mit vielen anderen Daten der PV etc. bereits mache). Dazu nutze ich die Influx-Komponente telegraf, die auch JSON parsen kann. Spannend sind für mich die IDs 209 und 211, die die Absolut-kWh seit Produktion speichern. 

Die Schwierigkeit, die ich habe: telegraf arbeitet mqtt topic by topic; um an die Werte der oa IDs zu kommen, müsste ich aber 2 topics (value_ids und values )in ein Array speichern und dann daraus die Importzeilen für InfluxDB generieren. Die würden etwa so aussehen:

measurement,topic=…/meters/0/209 value=5.39
measurement,topic=…/meters/0/211 value=0

Da fehlt mir aber schlicht die Erfahrung, wie man das in telegraf angehen könnte. Hat das schonmal Jemand gemacht und kann mir evtl. einen Ausschnitt aus seiner Konfig geben?

 

Anmerkung: wenn die beiden topics value_ids und values in einem kombiniert wären, etwa so

warp3/2bB4/meters/0/1 = 243.999
...
warp3/2bB4/meters/value/209 = 5.39

oder auch so

warp3/2bB4/meters/value = {"1":243.999, "2":244.888, "3":236.777, "4":0, "209":5.39, "211":0.0}

dann wäre die Aufgabe trivial und die Werte konsistent. (So wie es heute ist, müsste ich mich ja 100% drauf verlassen, dass beide topics hintereinander kommen?!)

 

Danke und Gruß

PS: cross-post im InfluxData Forum

Link zu diesem Kommentar
Share on other sites

.../meter/values ist die veraltete API, die aktuell noch zur Rückwärtskompatibilität mit aktualisiert wird, in Zukunft aber irgendwann entfernt werden kann.

Quote

(So wie es heute ist, müsste ich mich ja 100% drauf verlassen, dass beide topics hintereinander kommen?!)

Das ist auch so. value_ids wird aktualisiert, bevor Werte auf values rausgehen. Die Anzahl und Reihenfolge der Werte in values bleibt so lange unverändert, bis ein Update per value_ids rausgeht. Anschließend entsprechen Anzahl und Reihenfolge der Werte den neuen value_ids. In der Praxis wirst du allerdings nur Änderungen von einer leeren Liste zu Werten bekommen und wieder zurück zur leeren Liste, wenn die Wallbox neugestartet wird und sie ihren Zähler noch nicht initialisiert hat. Dass sich Anzahl oder Reihenfolge ändern, ohne den Umweg über die leere Liste zu nehmen, gibt es aktuell nicht. Anzahl und Reihenfolge der Werte sind auch über Neustarts hinweg konstant und können sich nur durch ein Firmware-Update ändern.

Wir lesen bei uns auch die Werte für verschiedene Aufgaben aus und das Pattern ist immer so, dass auf Updates von value_ids gelauscht wird und der Datensatz dann festlegt, wie folgende Updates auf values interpretiert werden. Alles andere wäre in der Praxis auch einfach unbrauchbar.

Falls du ein Python-Freund bist, hier ein Beispiel aus einem meiner Datenlogger, der die drei Phasenströme sammelt:

def cb_mqtt_grid_value_ids(client, userdata, msg):
    global grid_current_values_length
    global grid_current_value_indices

    grid_current_value_indices = 3 * [None]
    grid_currents_found = 0

    value_ids = json.loads(msg.payload)
    for idx, value_id in enumerate(value_ids):
        if value_id == 13:
            grid_current_value_indices[0] = idx
            grid_currents_found += 1
        elif value_id == 17:
            grid_current_value_indices[1] = idx
            grid_currents_found += 1
        elif value_id == 21:
            grid_current_value_indices[2] = idx
            grid_currents_found += 1
    if grid_currents_found != 3:
        print("Received value ID data doesn't contain phase currents", file=sys.stderr)
        return

    grid_current_values_length = len(value_ids)
    print(f"Grid phase currents' value IDs found", file=sys.stderr)

def cb_mqtt_grid_values(client, userdata, msg):
    global grid_current_values_length
    global grid_current_value_indices
    global grid_currents

    values = json.loads(msg.payload)
    if len(values) != grid_current_values_length:
        print(f"Values length mismatch for grid: expected {grid_current_values_length}, got {len(values)}", file=sys.stderr)
        return

    for i in range(len(grid_current_value_indices)):
        current_index = grid_current_value_indices[i]
        if current_index == None:
            print("Received values but don't know grid's phase current value index yet", file=sys.stderr)
            return

        grid_currents[i] = values[current_index]

Die Phasenströme haben die Value IDs 13, 17 und 21. Wenn ein Update auf value_ids reinkommt, werden die Indizes in grid_current_value_indices gespeichert. Wenn dann ein Update auf values reinkommt, werden die Werte an den Stellen der grid_current_value_indices ausgelesen. Zur Sicherheit wird auch die Anzahl der Value IDs gespeichert und mit der Anzahl der Werte in values verglichen.

Link zu diesem Kommentar
Share on other sites

(Achtung Totschlagargument! :D ) Das hat technische Gründe. Konkret: Der Mikrocontroller, den wir benutzen, hat nur begrenzt RAM. Deshalb müssen wir die Maximallänge von APIs beschränken, gerade von APIs die sich oft ändern, weil dann viele MQTT-Pakete generiert werden, die sich bei eventuell langsamen WLAN durchaus mal aufstauen können. Die IDs (die sich nur maximal einmal ändern sollten) und die Werte (die sich sekündlich ändern können) zu koppeln würde dazu führen, dass deutlich größere MQTT-Pakete generiert werden. Das wird auf einer WARP1 (die weniger RAM als eine WARP2 oder 3 hat) dann eng.

Link zu diesem Kommentar
Share on other sites

Schon. Führt dann aber unter anderem dazu dass 1. die API an der Stelle auseinanderläuft zwischen den Wallbox-Varianten (das wäre hässlich aber notfalls verschmerzbar) und 2. dann wieder andere Leute hier im Forum sich beklagen, dass wir unnötig viel Traffic per MQTT rausschicken. API-Design ist schwierig (und bei der Wallbox definitiv nicht optimal, aber brechende Änderungen nach Jahren tun weh) und wir balancieren oft mehr Anforderungen, als man auf den ersten Blick sieht.

Wir haben u.A. vor irgendwann in der Zukunft eine vereinfachte API zu entwickeln, als Alternative zum Vollausbau, der im Moment existiert (und teilweise auf die Anforderungen des Webinterfaces zugeschnitten ist). Eventuell können wir da auch eine einfacher zu benutzende Variante des Meldens von Zählerdaten einbauen.

Um dein konkretes Problem zu lösen: Es gibt z.B. das final-Plugin für telegraph: https://docs.influxdata.com/telegraf/v1/plugins/#aggregator-final mit dem du anscheinend zwei Datenquellen zusammenlegen kannst. Dann hast du IDs und Messwerte auf einem "Topic" (oder wie das in Influx-sprech auch immer heißt) und das Plugin sollte für dich auch das Reihenfolgenproblem lösen. (Disclaimer: Ich bin kein Influx-Experte, aber es klingt als könnte das irgendwie funktionieren)

Edit: Alternativ gibt es noch das merge-Plugin: https://github.com/influxdata/telegraf/blob/release-1.31/plugins/aggregators/merge/README.md damit hatte jemand, der deinen Use-Case hatte aber Probleme: https://community.influxdata.com/t/why-are-my-fields-from-mqtt-topics-not-merged/32929 und wurde auf das final-Plugin verwiesen.

Link zu diesem Kommentar
Share on other sites

Danke für die Links. Ich verstehe auch deine Argumente. Erlaube mir trotzdem einen Hinweis: neben der semantischen Abhängigkeit gibt es auch eine temporale Kontextabhängigkeit in der MQTT API. Du unterstellst, dass der mqtt Consumer Prozess (egal ob telegraf oder ein Python Script) ständig läuft. Insbesondere, dass er vor dem letzten Reset der WB bereits gestartet wurde und selbst keinen Restart benötigt (warum auch immer der Restart notwendig wurde: sei es, weil sich eine Konfiguration geändert hat, sei es, weil der Rechner, auf dem er läuft gebootet wurde, sei es ...). Würde also bedeuten, dass nach einem Neustart des mqtt Consumers auch die WB neu gestartet werden muss, damit sie zuverlässig die value_ids produziert. Das ist in meiner kleinen Umgebung sicherlich irgendwie leistbar - und verschmerzbar, wenn ich es vergessen würde. In einer größeren produktiven Umgebung würde ich sowas als kritsich ansehen. [Oder habe ich dich falsch verstanden???]

Ich habe mich nicht mit der Architektur der WB Software befasst und kenne die Abhängigkeiten nicht. Eine Möglichkeit, breaking changes zu mitigieren wäre jedoch sicherlich, den meters output (und ggf. anderen) konfigurierbar zu machen. Man taggt in der WEB UI diejenigen Felder, die man gerne im Output hätte, und es werden die entsprechenden mqtt messages, die für sich eineindeutig sind, generiert. Meinethalben auch noch mit einem Intervall, so dass nicht jede Sekunde, sondern nur alle x sec dieser zusätzliche Output erzeugt wird. Und wenn du es richtig geil machen willst, machst du auch die Namen der topics konfigurierbar :-)

Davon unabhängig ist es zwar ehrenwert, die alte HW weiter voll zu unterstützen. Irgendwann werdet ihr euch jedoch davon trennen müssen, eben weil die Funktionalitäten und Features mit jeder Version wachsen und dann nicht mehr in die veraltete HW passen.

Link zu diesem Kommentar
Share on other sites

On 9/11/2024 at 5:45 PM, universal-dilettant said:

[N]eben der semantischen Abhängigkeit gibt es auch eine temporale Kontextabhängigkeit in der MQTT API. Du unterstellst, dass der mqtt Consumer Prozess (egal ob telegraf oder ein Python Script) ständig läuft. Insbesondere, dass er vor dem letzten Reset der WB bereits gestartet wurde und selbst keinen Restart benötigt (warum auch immer der Restart notwendig wurde: sei es, weil sich eine Konfiguration geändert hat, sei es, weil der Rechner, auf dem er läuft gebootet wurde, sei es ...). Würde also bedeuten, dass nach einem Neustart des mqtt Consumers auch die WB neu gestartet werden muss, damit sie zuverlässig die value_ids produziert. Das ist in meiner kleinen Umgebung sicherlich irgendwie leistbar - und verschmerzbar, wenn ich es vergessen würde. In einer größeren produktiven Umgebung würde ich sowas als kritsich ansehen.

Die Wallbox schickt alle MQTT-Nachrichten mit gesetztem „retain“-Flag raus, was bedeutet, dass der MQTT-Server sie vorhalten soll. Startet der MQTT-Consumer neu, bekommt er direkt nach dem Verbindungsaufbau und Subscribe auf die Topics die jeweils letzte Nachricht auf dem jeweiligen Topic geschickt. Machst du erst ein Subscribe auf die value_ids und dann ein Subcribe auf die values, bekommst du erst die value_ids und dann die values zugeschickt, ganz unabhängig von einem Neustart der Wallbox oder des Consumers.

Das funktioniert auch bei einem Neustart des MQTT-Servers, selbst wenn der die Nachrichten mit „retain“-Flag über einen Neustart vergisst, da sich die Wallbox neu mit dem MQTT-Server verbinden muss und dann wieder zuerst die value_ids rausschickt.

Link zu diesem Kommentar
Share on other sites

On 9/11/2024 at 5:45 PM, universal-dilettant said:

Ich habe mich nicht mit der Architektur der WB Software befasst und kenne die Abhängigkeiten nicht. Eine Möglichkeit, breaking changes zu mitigieren wäre jedoch sicherlich, den meters output (und ggf. anderen) konfigurierbar zu machen. Man taggt in der WEB UI diejenigen Felder, die man gerne im Output hätte, und es werden die entsprechenden mqtt messages, die für sich eineindeutig sind, generiert. Meinethalben auch noch mit einem Intervall, so dass nicht jede Sekunde, sondern nur alle x sec dieser zusätzliche Output erzeugt wird. Und wenn du es richtig geil machen willst, machst du auch die Namen der topics konfigurierbar :-)

https://github.com/Tinkerforge/esp32-firmware/issues/36 steht auf der Liste, aber wie du siehst schon länger :D

Link zu diesem Kommentar
Share on other sites

Ich habe das jetzt in node-red mit 2 Flows und paar Zeilen Javascript gelöst.

image.thumb.png.06ff01f25d9504abe1e40bbf80747d63.png

mqtt Output fehlt noch. Erzeuge damit ein neues Topic...
Vielleicht hilft es dem einen oder anderen.

 

Funktion: find index for ...

for (let i= 0; i<msg.payload.length; i++) {
    if ( msg.payload[i] == 209 ) {
      global.set('warp_energy_index', i)
    }
}

return null;

Funktion: get value of ...

var i= null;

if ( (i= global.get("warp_energy_index")) != null) {
    var newMsg = { payload: msg.payload[i] };
    return newMsg;
}

return null;

 

Link zu diesem Kommentar
Share on other sites

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Gast
Reply to this topic...

×   Du hast formatierten Text eingefügt.   Formatierung jetzt entfernen

  Only 75 emoji are allowed.

×   Dein Link wurde automatisch eingebettet.   Einbetten rückgängig machen und als Link darstellen

×   Dein vorheriger Inhalt wurde wiederhergestellt.   Clear editor

×   Du kannst Bilder nicht direkt einfügen. Lade Bilder hoch oder lade sie von einer URL.

×
×
  • Neu erstellen...