MQTT: still WIP, generated C code builds and link.
authorEdouard Tisserant <edouard@beremiz.fr>
Tue, 09 Jul 2024 11:46:19 +0200 (6 months ago)
changeset 3984 883a85b9ebcc
parent 3983 466be4f52cb9
child 3985 d0c5d77a33af
MQTT: still WIP, generated C code builds and link.
mqtt/client.py
mqtt/mqtt_client_gen.py
--- a/mqtt/client.py	Tue Jul 09 11:44:49 2024 +0200
+++ b/mqtt/client.py	Tue Jul 09 11:46:19 2024 +0200
@@ -10,9 +10,15 @@
 
 import util.paths as paths
 
-PahoMqttCPath = paths.ThirdPartyPath("MQTT")
-PahoMqttCLibraryPath = PahoMqttCPath 
-PahoMqttCIncludePaths = [PahoMqttCPath]
+
+# assumes that "build" directory was created in paho.mqtt.c source directory,
+# and cmake build was invoked from this directory
+PahoMqttCLibraryPath = paths.ThirdPartyPath("paho.mqtt.c", "build", "src")
+
+PahoMqttCIncludePaths = [
+    paths.ThirdPartyPath("paho.mqtt.c", "build"),  # VersionInfo.h
+    paths.ThirdPartyPath("paho.mqtt.c", "src")
+]
 
 class MQTTClientEditor(ConfTreeNodeEditor):
     CONFNODEEDITOR_TABS = [
@@ -107,7 +113,10 @@
         locstr = "_".join(map(str, current_location))
         c_path = os.path.join(buildpath, "mqtt_client__%s.c" % locstr)
 
-        c_code = '#include "beremiz.h"\n'
+        c_code = """
+#include "iec_types_all.h"
+#include "beremiz.h"
+"""
         c_code += self.modeldata.GenerateC(c_path, locstr, self.GetConfig())
 
         with open(c_path, 'w') as c_file:
--- a/mqtt/mqtt_client_gen.py	Tue Jul 09 11:44:49 2024 +0200
+++ b/mqtt/mqtt_client_gen.py	Tue Jul 09 11:46:19 2024 +0200
@@ -4,11 +4,12 @@
 import csv
 import functools
 from threading import Thread
+from collections import OrderedDict
 
 import wx
 import wx.dataview as dv
 
-from perfect_hash import generate_code, IntSaltHash
+# from perfect_hash import generate_code, IntSaltHash
 
 MQTT_IEC_types = dict(
 # IEC61131|  C  type   | sz
@@ -298,7 +299,7 @@
 int G[] = {$G};
 char *K[] = {$K};
 """
-		code = generate_code(topics, Hash=IntSaltHash, template=template)
+        code = generate_code(topics, Hash=IntSaltHash, template=template)
         code += """
 /* return index of key in K if key is found, -1 otherwise */
 int MQTT_Topic_index(const char *key)
@@ -318,189 +319,188 @@
     return -1;
 }
 """ 
-		return code
+        return code
 
     def GenerateC(self, path, locstr, config):
         template = """/* code generated by beremiz MQTT extension */
 
-#include "MQTTAsync.h"
+#include <stdint.h>
+#include <pthread.h>
+
+#include "MQTTClient.h"
 #include "MQTTClientPersistence.h"
 
 #define _Log(level, ...)                                                                           \\
     {{                                                                                             \\
-        char mstr[256];                                                                            \\
-        snprintf(mstr, 255, __VA_ARGS__);                                                          \\
-        LogMessage(level, mstr, strlen(mstr));                                                     \\
+        /* char mstr[256];                          */                                            \\
+        /* snprintf(mstr, 255, __VA_ARGS__);        */                                            \\
+        /* LogMessage(level, mstr, strlen(mstr));   */                                            \\
+        printf(__VA_ARGS__);                                                          \\
     }}
 
 #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
 #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
 #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
 
-static inline void* loadFile(const char *const path) {{
-
-    FILE *fp = fopen(path, "rb");
-    if(!fp) {{
-        errno = 0;
-        LogError("MQTT could not open %s", path);
-        return NULL;
+static MQTTClient client;
+static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
+static pthread_mutex_t clientMutex;  // mutex to keep PLC data consistent
+
+void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
+{{
+    LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
+}}
+
+#define DECL_VAR(iec_type, C_type, c_loc_name)                                                     \\
+static C_type PLC_##c_loc_name##_buf = 0;                                                          \\
+static C_type MQTT_##c_loc_name##_buf = 0;                                                         \\
+C_type *c_loc_name = &PLC_##c_loc_name##_buf;
+
+{decl}
+
+#define INIT_TOPIC(topic, iec_type, c_loc_name)                                            \\
+{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
+
+static struct {{
+    const char *topic; //null terminated topic string
+    void *mqtt_pdata; //data from/for MQTT stack
+    __IEC_types_enum vartype;
+}} topics [] = {{
+{topics}
+}};
+
+static int _connect_mqtt(void)
+{{
+    int rc;
+    MQTTProperties props = MQTTProperties_initializer;
+    MQTTProperties willProps = MQTTProperties_initializer;
+    MQTTResponse response = MQTTResponse_initializer;
+
+    response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
+    rc = response.reasonCode;
+    MQTTResponse_free(response);
+
+	return rc;
+}}
+
+void __cleanup_{locstr}(void)
+{{
+    int rc;
+
+    /* TODO stop publish thread */
+
+    if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
+    {{
+        LogError("MQTT Failed to disconnect, return code %d\\n", rc);
     }}
-
-    fseek(fp, 0, SEEK_END);
-    size_t length = (size_t)ftell(fp);
-    void* data = malloc(length);
-    if(data) {{
-        fseek(fp, 0, SEEK_SET);
-        size_t read = fread(data, 1, fileContents.length, fp);
-        if(read != length){{
-            free(data);
-            LogError("MQTT could not read %s", path);
-        }}
-    }} else {{
-        LogError("MQTT Not enough memoty to load %s", path);
+    MQTTClient_destroy(&client);
+}}
+
+void connectionLost(void* context, char* reason)
+{{
+    LogWarning("ConnectionLost, reconnecting\\n");
+	_connect_mqtt();
+    /* TODO wait if error */
+}}
+
+int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
+{{
+    /* TODO : something with message */
+    printf("Message arrived\\n");
+    printf("     topic: %s\\n", topicName);
+    printf("   message: %.*s\\n", message->payloadlen, (char*)message->payload);
+    MQTTClient_freeMessage(&message);
+    MQTTClient_free(topicName);
+    return 1;
+}}
+
+#define INIT_NoAuth()                                                                             \\
+    LogInfo("MQTT Init no auth");
+
+#define INIT_x509(PrivateKey, Certificate)                                                        \\
+    LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate);
+    /* TODO */
+
+#define INIT_UserPassword(User, Password)                                                         \\
+    LogInfo("MQTT Init UserPassword %s,%s", User, Password);                                      \\
+	conn_opts.username = User;                                                                    \\
+	conn_opts.password = Password;
+
+#define INIT_SUBSCRIPTION(Topic, QoS)                                                             \\
+    {{                                                                                            \\
+        MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL);            \\
+        rc = response.reasonCode;                                                                 \\
+        MQTTResponse_free(response);                                                              \\
+        if (rc != MQTTCLIENT_SUCCESS)                                                             \\
+        {{                                                                                        \\
+            LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\
+        }}                                                                                        \\
     }}
-    fclose(fp);
-
-    return data;
-}}
-
-static MQTTClient client;
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-
-void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
-{
-	LogWarning("Paho MQTT Trace : %d, %s\n", level, message);
-}
-
-#define DECL_VAR(iec_type, C_type, c_loc_name)                                                       \\
-static C_type c_loc_name##_buf = 0;                                                                 \\
-C_type *c_loc_name = &c_loc_name##_buf;
-
-{decl}
-
-#define INIT_TOPIC(topic, iec_type, c_loc_name)                                                  \\
-{topic, &c_loc_name##_buf, iec_type##_ENUM},
-
-ststic struct {
-	const char *topic; //null terminated topic string
-	void *pdata; //pointer to data
-	__IEC_types_enum vartype;
-} topics [] = {
-{topics}
-}
-
-void __cleanup_{locstr}(void)
-{{
-    MQTT_Client_disconnect(client);
-    MQTT_Client_delete(client);
-}}
-
-#define INIT_NoAuth()                                                                              \\
-    LogInfo("MQTT Init no auth");                                                                \\
-    MQTT_ClientConfig_setDefault(cc);                                                                \\
-    retval = MQTT_Client_connect(client, uri);
-
-/* Note : Single policy is enforced here, by default open62541 client supports all policies */
-#define INIT_x509(Policy, UpperCaseMode, PrivateKey, Certificate)                                  \\
-    LogInfo("MQTT Init x509 %s,%s,%s,%s", #Policy, #UpperCaseMode, PrivateKey, Certificate);     \\
-                                                                                                   \\
-    MQTT_ByteString certificate = loadFile(Certificate);                                             \\
-    MQTT_ByteString privateKey  = loadFile(PrivateKey);                                              \\
-                                                                                                   \\
-    cc->securityMode = MQTT_MESSAGESECURITYMODE_##UpperCaseMode;                                     \\
-                                                                                                   \\
-    /* replacement for default behaviour */                                                        \\
-    /* MQTT_ClientConfig_setDefaultEncryption(cc, certificate, privateKey, NULL, 0, NULL, 0); */     \\
-    do{{                                                                                           \\
-        retval = MQTT_ClientConfig_setDefault(cc);                                                   \\
-        if(retval != MQTT_STATUSCODE_GOOD)                                                           \\
-            break;                                                                                 \\
-                                                                                                   \\
-        MQTT_SecurityPolicy *sp = (MQTT_SecurityPolicy*)                                               \\
-            MQTT_realloc(cc->securityPolicies, sizeof(MQTT_SecurityPolicy) * 2);                       \\
-        if(!sp){{                                                                                  \\
-            retval = MQTT_STATUSCODE_BADOUTOFMEMORY;                                                 \\
-            break;                                                                                 \\
-        }}                                                                                         \\
-        cc->securityPolicies = sp;                                                                 \\
-                                                                                                   \\
-        retval = MQTT_SecurityPolicy_##Policy(&cc->securityPolicies[cc->securityPoliciesSize],       \\
-                                                 certificate, privateKey, &cc->logger);            \\
-        if(retval != MQTT_STATUSCODE_GOOD) {{                                                        \\
-            MQTT_LOG_WARNING(&cc->logger, MQTT_LOGCATEGORY_USERLAND,                                   \\
-                           "Could not add SecurityPolicy Policy with error code %s",               \\
-                           MQTT_StatusCode_name(retval));                                            \\
-            MQTT_free(cc->securityPolicies);                                                         \\
-            cc->securityPolicies = NULL;                                                           \\
-            break;                                                                                 \\
-        }}                                                                                         \\
-                                                                                                   \\
-        ++cc->securityPoliciesSize;                                                                \\
-    }} while(0);                                                                                   \\
-                                                                                                   \\
-    retval = MQTT_Client_connect(client, uri);                                                       \\
-                                                                                                   \\
-    MQTT_ByteString_clear(&certificate);                                                             \\
-    MQTT_ByteString_clear(&privateKey);
-
-#define INIT_UserPassword(User, Password)                                                          \\
-    LogInfo("MQTT Init UserPassword %s,%s", User, Password);                                     \\
-    MQTT_ClientConfig_setDefault(cc);                                                                \\
-    retval = MQTT_Client_connectUsername(client, uri, User, Password);
-
-#define INIT_READ_VARIANT(ua_type, c_loc_name)                                                     \\
-    MQTT_Variant_init(&c_loc_name##_variant);
-
-#define INIT_WRITE_VARIANT(ua_type, ua_type_enum, c_loc_name)                                      \\
-    MQTT_Variant_setScalar(&c_loc_name##_variant, (ua_type*)c_loc_name, &MQTT_TYPES[ua_type_enum]);
 
 int __init_{locstr}(int argc,char **argv)
 {{
     char *uri = "{uri}";
     char *clientID = "{clientID}";
     int rc;
-    conn_opts = MQTTClient_connectOptions_initializer;
-
-    if ((rc = MQTTClient_create(&client, uri, clientID,
-        MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
-    {
-        printf("Failed to create client, return code %d\n", rc);
-        rc = EXIT_FAILURE;
-        goto exit;
-    }
+
+	MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
+
+	conn_opts.MQTTVersion = MQTTVERSION_5;
+    conn_opts.cleanstart = 1;
+
+    createOpts.MQTTVersion = MQTTVERSION_5;
+
+    MQTTClient_setTraceCallback(trace_callback);
+    MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
+
+
+	rc = MQTTClient_createWithOptions(
+        &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
+	if (rc != MQTTCLIENT_SUCCESS)
+	{{
+        LogError("MQTT Failed to create client, return code %d\\n", rc);
+        return rc;
+    }}
+
+	rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
+	if (rc != MQTTCLIENT_SUCCESS)
+	{{
+        LogError("MQTT Failed to set callbacks %d", rc);
+        return rc;
+	}}
 
 {init}
 
-    if(retval != MQTT_STATUSCODE_GOOD) {{
-        LogError("MQTT Init Failed %d", retval);
-        MQTT_Client_delete(client);
-        return EXIT_FAILURE;
+	rc = _connect_mqtt();
+
+	if (rc != MQTTCLIENT_SUCCESS) {{
+        LogError("MQTT Init Failed %d", rc);
+        return rc;
     }}
+    /* TODO start publish thread */
+
     return 0;
 }}
 
-#define READ_VALUE(ua_type, ua_type_enum, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id)        \\
-    retval = MQTT_Client_readValueAttribute(                                                         \\
-        client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant);                      \\
-    if(retval == MQTT_STATUSCODE_GOOD && MQTT_Variant_isScalar(&c_loc_name##_variant) &&               \\
-       c_loc_name##_variant.type == &MQTT_TYPES[ua_type_enum]) {{                                    \\
-            c_loc_name##_buf = *(ua_type*)c_loc_name##_variant.data;                               \\
-            MQTT_Variant_clear(&c_loc_name##_variant);  /* Unalloc requiered on each read ! */       \\
-    }}
+#define READ_VALUE(c_loc_name, C_type) \\
+    PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf;
 
 void __retrieve_{locstr}(void)
 {{
-    MQTT_StatusCode retval;
+    /* TODO try take mutex */
 {retrieve}
-}}
-
-#define WRITE_VALUE(ua_type, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id)                     \\
-    MQTT_Client_writeValueAttribute(                                                                 \\
-        client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant);
+    /* TODO free mutex */
+}}
+
+#define WRITE_VALUE(c_loc_name, C_type) \\
+    MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf;
 
 void __publish_{locstr}(void)
 {{
+    /* TODO try take mutex */
 {publish}
+    /* TODO free mutex */
+    /* TODO unblock publish thread */
 }}
 
 """
@@ -528,11 +528,11 @@
             formatdict["init"] += """
     INIT_NoAuth()"""
 
-		topics = OrderedDict()
+        topics = OrderedDict()
         for direction, data in self.items():
             iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
             for row in data:
-				Topic, QoS, Retain, iec_type, iec_number = row
+                Topic, QoS, Retain, iec_type, iec_number = row
                 C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
                 c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
 
@@ -540,19 +540,18 @@
 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
 
                 formatdict["topics"] += """
-INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
-#
-#                if direction == "input":
-#                    formatdict["init"] += """
-#    INIT_READ_VARIANT({ua_type}, {c_loc_name})""".format(**locals())
-#                    formatdict["retrieve"] += """
-#    READ_VALUE({ua_type}, {ua_type_enum}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
-#
-#                if direction == "output":
-#                    formatdict["init"] += """
-#    INIT_WRITE_VARIANT({ua_type}, {ua_type_enum}, {c_loc_name})""".format(**locals())
-#                    formatdict["publish"] += """
-#    WRITE_VALUE({ua_type}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
+    INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
+
+                if direction == "input":
+                    formatdict["init"] += """
+    INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals())
+                    formatdict["retrieve"] += """
+    READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
+
+                if direction == "output":
+                    # formatdict["init"] += " NOTHING ! publish doesn't need init. "
+                    formatdict["publish"] += """
+    WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
 
         Ccode = template.format(**formatdict)