MQTT: WIP. Added handling of received messages (subscriptions)
authorEdouard Tisserant <edouard@beremiz.fr>
Wed, 17 Jul 2024 17:02:32 +0200
changeset 3990 24656e0e8732
parent 3989 987c69b1582f
child 3991 28354ba489b9
MQTT: WIP. Added handling of received messages (subscriptions)

Dropped perfect hash in favor of bisection
Also fixed indentation (tabs)
exemples/first_steps/plc.xml
mqtt/mqtt_client_gen.py
--- a/exemples/first_steps/plc.xml	Tue Jul 16 09:41:45 2024 +0200
+++ b/exemples/first_steps/plc.xml	Wed Jul 17 17:02:32 2024 +0200
@@ -1,7 +1,7 @@
 <?xml version='1.0' encoding='utf-8'?>
 <project xmlns:ns1="http://www.plcopen.org/xml/tc6_0201" xmlns:xhtml="http://www.w3.org/1999/xhtml" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://www.plcopen.org/xml/tc6_0201">
   <fileHeader companyName="Beremiz" productName="Beremiz" productVersion="1" creationDateTime="2016-10-24T18:09:22"/>
-  <contentHeader name="First Steps" modificationDateTime="2018-09-26T12:52:51">
+  <contentHeader name="First Steps" modificationDateTime="2024-07-12T16:04:57">
     <coordinateInfo>
       <fbd>
         <scaling x="0" y="0"/>
--- a/mqtt/mqtt_client_gen.py	Tue Jul 16 09:41:45 2024 +0200
+++ b/mqtt/mqtt_client_gen.py	Wed Jul 17 17:02:32 2024 +0200
@@ -308,53 +308,21 @@
                 for row in data:
                     writer.writerow([direction] + row)
 
-    def _TopicsPerfectHash(self, topics):
-        template = """
-#define NK  $NK       /* number of keys */
-#define NG  $NG       /* number of vertices */
-#define NS  $NS       /* length of array T1 and T2 */
-
-int S1[] = {$S1};
-int S2[] = {$S2};
-int G[] = {$G};
-char *K[] = {$K};
-"""
-        code = generate_code(topics, Hash=IntSaltHash, template=template)
-        code += """
-/* return index of key in K if key is found, -1 otherwise */
-int MQTT_Topic_index(const char *key)
-{
-    int i, f1 = 0, f2 = 0;
-
-    for (i = 0; key[i] != '\\0' && i < NS; i++) {
-        f1 += S1[i] * key[i];
-        f2 += S2[i] * key[i];
-        f1 %= NG;
-        f2 %= NG;
-    }
-    i = (G[f1] + G[f2]) % NG;
-    if (i < NK && strcmp(key, K[i]) == 0)
-        return i;
-
-    return -1;
-}
-""" 
-        return code
-
     def GenerateC(self, path, locstr, config):
         template = """/* code generated by beremiz MQTT extension */
 
 #include <stdint.h>
 #include <pthread.h>
+#include <string.h>
 
 #include "MQTTClient.h"
 #include "MQTTClientPersistence.h"
 
 #define _Log(level, ...)                                                                          \\
     {{                                                                                            \\
-        /* char mstr[256];                          */                                            \\
-        /* snprintf(mstr, 255, __VA_ARGS__);        */                                            \\
-        /* LogMessage(level, mstr, strlen(mstr));   */                                            \\
+        char mstr[256];                                                                           \\
+        snprintf(mstr, 255, __VA_ARGS__);                                                         \\
+        LogMessage(level, mstr, strlen(mstr));                                                    \\
         printf(__VA_ARGS__);                                                                      \\
     }}
 
@@ -364,7 +332,7 @@
 
 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
 {{
-    LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
+    LogInfo("Paho MQTT Trace : %d, %s\\n", level, message);
 }}
 
 #define CHANGED 1
@@ -401,11 +369,12 @@
 static pthread_t publishThread;
 
 #define INIT_TOPIC(topic, iec_type, c_loc_name)                                                    \\
-{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
+{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}},
 
 static struct {{
     const char *topic; //null terminated topic string
-    void *mqtt_pdata; //data from/for MQTT stack
+    void *mqtt_pdata; // pointer to data from/for MQTT stack
+    int *mqtt_pchanged; // pointer to changed flag
     __IEC_types_enum vartype;
 }} topics [] = {{
 {topics}
@@ -427,7 +396,7 @@
     rc = MQTTClient_connect(client, &conn_opts);
 #endif
 
-	return rc;
+    return rc;
 }}
 
 void __cleanup_{locstr}(void)
@@ -450,16 +419,48 @@
 void connectionLost(void* context, char* reason)
 {{
     LogWarning("ConnectionLost, reconnecting\\n");
-	_connect_mqtt();
+    _connect_mqtt();
     /* TODO wait if error */
 }}
 
 int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
 {{
-    /* TODO : something with message */
-    printf("Message arrived\\n");
-    printf("     topic: %s\\n", topicName);
-    printf("   message: %.*s\\n", message->payloadlen, (char*)message->payload);
+    int low = 0;
+    int size = sizeof(topics) / sizeof(topics[0]);
+    int high = size - 1;
+    int mid;
+
+    // bisect topic among subscribed topics
+    while (low <= high) {{
+        int res;
+        mid = low + (high - low) / 2;
+        res = strncmp(topics[mid].topic, topicName, topicLen);
+
+        // Check if key is present at mid
+        if (res == 0)
+            goto found;
+
+        // If key greater, ignore left half
+        if (res < 0)
+            low = mid + 1;
+
+        // If key is smaller, ignore right half
+        else
+            high = mid - 1;
+    }}
+    // If we reach here, then the element was not present
+    LogWarning("MQTT unknown topic: %s", topicName);
+    goto exit;
+
+found:
+    if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
+        memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
+        *topics[mid].mqtt_pchanged = 1;
+    }} else {{
+        LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", 
+            topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
+    }}
+exit:
     MQTTClient_freeMessage(&message);
     MQTTClient_free(topicName);
     return 1;
@@ -474,16 +475,17 @@
 
 #define INIT_UserPassword(User, Password)                                                         \\
     LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password);                                   \\
-	conn_opts.username = User;                                                                    \\
-	conn_opts.password = Password;
+    conn_opts.username = User;                                                                    \\
+    conn_opts.password = Password;
 
 #ifdef USE_MQTT_5
-#define _SUBSCRIBE(Topic, QoS)                                                                  \\
+#define _SUBSCRIBE(Topic, QoS)                                                                    \\
         MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL);           \\
-        rc = response.reasonCode;                                                                 \\
+        /* when using MQTT5 responce code is 1 for some reason even if no error */                \\
+        rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode;                 \\
         MQTTResponse_free(response);
 #else
-#define _SUBSCRIBE(Topic, QoS)                                                                  \\
+#define _SUBSCRIBE(Topic, QoS)                                                                    \\
         rc = MQTTClient_subscribe(client, #Topic, QoS);
 #endif
 
@@ -501,7 +503,7 @@
 #ifdef USE_MQTT_5
 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
         MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type),               \\
-            &PLC_##c_loc_name##_buf, QoS, Retained, NULL, NULL);                                  \\
+            &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL);                                  \\
         rc = response.reasonCode;                                                                 \\
         MQTTResponse_free(response);
 #else
@@ -561,39 +563,39 @@
     char *clientID = "{clientID}";
     int rc;
 
-	MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
+    MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
 
 #ifdef USE_MQTT_5
-	conn_opts.MQTTVersion = MQTTVERSION_5;
+    conn_opts.MQTTVersion = MQTTVERSION_5;
     conn_opts.cleanstart = 1;
 
     createOpts.MQTTVersion = MQTTVERSION_5;
 #else
-	conn_opts.cleansession = 1;
+    conn_opts.cleansession = 1;
 #endif
 
     MQTTClient_setTraceCallback(trace_callback);
     MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
 
 
-	rc = MQTTClient_createWithOptions(
+    rc = MQTTClient_createWithOptions(
         &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
-	if (rc != MQTTCLIENT_SUCCESS)
-	{{
+    if (rc != MQTTCLIENT_SUCCESS)
+    {{
         LogError("MQTT Failed to create client, return code %d\\n", rc);
         return rc;
     }}
 
-	rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
-	if (rc != MQTTCLIENT_SUCCESS)
-	{{
+    rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
+    if (rc != MQTTCLIENT_SUCCESS)
+    {{
         LogError("MQTT Failed to set callbacks, return code %d\\n", rc);
         return rc;
-	}}
-
-	rc = _connect_mqtt();
-
-	if (rc != MQTTCLIENT_SUCCESS) {{
+    }}
+
+    rc = _connect_mqtt();
+
+    if (rc != MQTTCLIENT_SUCCESS) {{
         LogError("MQTT Connect Failed, return code %d\\n", rc);
         return rc;
     }}
@@ -607,16 +609,22 @@
 }}
 
 #define READ_VALUE(c_loc_name, C_type) \\
-    PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf;
+    if(MQTT_##c_loc_name##_state == CHANGED){{ \\
+        /* TODO care about endianess */ \\
+        PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \\
+        MQTT_##c_loc_name##_state = UNCHANGED; \\
+    }}
 
 void __retrieve_{locstr}(void)
 {{
-    /* TODO try take mutex */
+    if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
 {retrieve}
-    /* TODO free mutex */
+        pthread_mutex_unlock(&MQTT_mutex);
+    }}
 }}
 
 #define WRITE_VALUE(c_loc_name, C_type) \\
+    /* TODO care about endianess */ \\
     if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\
         MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\
         MQTT_##c_loc_name##_state = CHANGED; \\
@@ -671,37 +679,34 @@
             formatdict["init"] += """
     INIT_NoAuth()"""
 
-        topics = OrderedDict()
-        for direction, data in self.items():
-            iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
-            for row in data:
-                if direction == "input":
-                    Topic, QoS, iec_type, iec_number = row
-                else:
-                    Topic, QoS, _Retained, iec_type, iec_number = row
-                    Retained = 1 if _Retained=="True" else 0
-                C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
-                c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
-
-                formatdict["decl"] += """
+        for row in self["output"]:
+            Topic, QoS, _Retained, iec_type, iec_number = row
+            Retained = 1 if _Retained=="True" else 0
+            C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
+            c_loc_name = "__Q" + iec_size_prefix + locstr + "_" + str(iec_number)
+
+            formatdict["decl"] += """
 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
-
-                formatdict["topics"] += """
+            formatdict["init"] += """
+    INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
+            formatdict["publish"] += """
+        WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
+            formatdict["publish_changes"] += """
+            PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
+
+        # inputs need to be sorted for bisection search 
+        for row in sorted(self["input"]):
+            Topic, QoS, iec_type, iec_number = row
+            C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
+            c_loc_name = "__I" + iec_size_prefix + locstr + "_" + str(iec_number)
+            formatdict["decl"] += """
+DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
+            formatdict["topics"] += """
     INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
-
-                if direction == "input":
-                    formatdict["init"] += """
+            formatdict["init"] += """
     INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals())
-                    formatdict["retrieve"] += """
-    READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
-
-                else:
-                    formatdict["init"] += """
-    INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
-                    formatdict["publish"] += """
-    WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
-                    formatdict["publish_changes"] += """
-    PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
+            formatdict["retrieve"] += """
+        READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
 
         Ccode = template.format(**formatdict)