MQTT: WIP, now connects to broker. Added MQTT 3 support and protocol version selection in IDE.
authorEdouard Tisserant <edouard@beremiz.fr>
Fri, 12 Jul 2024 11:24:06 +0200 (6 months ago)
changeset 3986 98bd0bb33ce4
parent 3985 d0c5d77a33af
child 3987 cec48fc7ccd0
MQTT: WIP, now connects to broker. Added MQTT 3 support and protocol version selection in IDE.
mqtt/client.py
mqtt/mqtt_client_gen.py
--- a/mqtt/client.py	Wed Jul 10 11:10:05 2024 +0200
+++ b/mqtt/client.py	Fri Jul 12 11:24:06 2024 +0200
@@ -55,6 +55,7 @@
               </xsd:complexType>
             </xsd:element>
           </xsd:sequence>
+          <xsd:attribute name="Use_MQTT_5" type="xsd:boolean" use="optional" default="true"/>
           <xsd:attribute name="Broker_URI" type="xsd:string" use="optional" default="ws://localhost:1883"/>
           <xsd:attribute name="Client_ID" type="xsd:string" use="optional" default=""/>
         </xsd:complexType>
@@ -86,7 +87,11 @@
             return attr["value"]
 
         AuthType = cfg("AuthType")
-        res = dict(URI=cfg("Broker_URI"), AuthType=AuthType, clientID=cfg("Client_ID"))
+        res = dict(
+            URI=cfg("Broker_URI"),
+            AuthType=AuthType,
+            clientID=cfg("Client_ID"),
+            UseMQTT5=cfg("Use_MQTT_5"))
 
         paramList = authParams.get(AuthType, None)
         if paramList:
--- a/mqtt/mqtt_client_gen.py	Wed Jul 10 11:10:05 2024 +0200
+++ b/mqtt/mqtt_client_gen.py	Fri Jul 12 11:24:06 2024 +0200
@@ -330,22 +330,18 @@
 #include "MQTTClient.h"
 #include "MQTTClientPersistence.h"
 
-#define _Log(level, ...)                                                                           \\
-    {{                                                                                             \\
+#define _Log(level, ...)                                                                          \\
+    {{                                                                                            \\
         /* char mstr[256];                          */                                            \\
         /* snprintf(mstr, 255, __VA_ARGS__);        */                                            \\
         /* LogMessage(level, mstr, strlen(mstr));   */                                            \\
-        printf(__VA_ARGS__);                                                          \\
+        printf(__VA_ARGS__);                                                                      \\
     }}
 
 #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
 #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
 #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
 
-static MQTTClient client;
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
-static pthread_mutex_t clientMutex;  // mutex to keep PLC data consistent
-
 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
 {{
     LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
@@ -358,7 +354,15 @@
 
 {decl}
 
-#define INIT_TOPIC(topic, iec_type, c_loc_name)                                            \\
+static MQTTClient client;
+#ifdef USE_MQTT_5
+static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
+#else
+static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
+#endif
+static pthread_mutex_t clientMutex;  // mutex to keep PLC data consistent
+
+#define INIT_TOPIC(topic, iec_type, c_loc_name)                                                    \\
 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
 
 static struct {{
@@ -372,6 +376,8 @@
 static int _connect_mqtt(void)
 {{
     int rc;
+
+#ifdef USE_MQTT_5
     MQTTProperties props = MQTTProperties_initializer;
     MQTTProperties willProps = MQTTProperties_initializer;
     MQTTResponse response = MQTTResponse_initializer;
@@ -379,6 +385,9 @@
     response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
     rc = response.reasonCode;
     MQTTResponse_free(response);
+#else
+    rc = MQTTClient_connect(client, &conn_opts);
+#endif
 
 	return rc;
 }}
@@ -389,7 +398,11 @@
 
     /* TODO stop publish thread */
 
+#ifdef USE_MQTT_5
     if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
+#else
+    if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS)
+#endif
     {{
         LogError("MQTT Failed to disconnect, return code %d\\n", rc);
     }}
@@ -415,25 +428,33 @@
 }}
 
 #define INIT_NoAuth()                                                                             \\
-    LogInfo("MQTT Init no auth");
+    LogInfo("MQTT Init no auth\\n");
 
 #define INIT_x509(PrivateKey, Certificate)                                                        \\
-    LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate);
+    LogInfo("MQTT Init x509 %s,%s\\n", PrivateKey, Certificate);
     /* TODO */
 
 #define INIT_UserPassword(User, Password)                                                         \\
-    LogInfo("MQTT Init UserPassword %s,%s", User, Password);                                      \\
+    LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password);                                   \\
 	conn_opts.username = User;                                                                    \\
 	conn_opts.password = Password;
 
+#ifdef USE_MQTT_5
+#define MY_SUBSCRIBE(Topic, QoS)                                                                  \\
+        MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL);           \\
+        rc = response.reasonCode;                                                                 \\
+        MQTTResponse_free(response);
+#else
+#define MY_SUBSCRIBE(Topic, QoS)                                                                  \\
+        rc = MQTTClient_subscribe(client, #Topic, QoS);
+#endif
+
 #define INIT_SUBSCRIPTION(Topic, QoS)                                                             \\
     {{                                                                                            \\
-        MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL);            \\
-        rc = response.reasonCode;                                                                 \\
-        MQTTResponse_free(response);                                                              \\
+        MY_SUBSCRIBE(Topic, QoS)                                                                  \\
         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
         {{                                                                                        \\
-            LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\
+            LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);   \\
         }}                                                                                        \\
     }}
 
@@ -445,10 +466,14 @@
 
 	MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
 
+#ifdef USE_MQTT_5
 	conn_opts.MQTTVersion = MQTTVERSION_5;
     conn_opts.cleanstart = 1;
 
     createOpts.MQTTVersion = MQTTVERSION_5;
+#else
+	conn_opts.cleansession = 1;
+#endif
 
     MQTTClient_setTraceCallback(trace_callback);
     MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
@@ -465,18 +490,19 @@
 	rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
 	if (rc != MQTTCLIENT_SUCCESS)
 	{{
-        LogError("MQTT Failed to set callbacks %d", rc);
+        LogError("MQTT Failed to set callbacks, return code %d\\n", rc);
         return rc;
 	}}
 
-{init}
-
 	rc = _connect_mqtt();
 
 	if (rc != MQTTCLIENT_SUCCESS) {{
-        LogError("MQTT Init Failed %d", rc);
+        LogError("MQTT Connect Failed, return code %d\\n", rc);
         return rc;
     }}
+
+{init}
+
     /* TODO start publish thread */
 
     return 0;
@@ -504,7 +530,7 @@
 }}
 
 """
-        
+
         formatdict = dict(
             locstr   = locstr,
             uri      = config["URI"],
@@ -514,9 +540,15 @@
             cleanup  = "",
             init     = "",
             retrieve = "",
-            publish  = "" 
+            publish  = ""
         )
 
+
+        # Use Config's "MQTTVersion" to switch between protocol version at build time
+        if config["UseMQTT5"]:
+            formatdict["decl"] += """
+#define USE_MQTT_5""".format(**config)
+
         AuthType = config["AuthType"]
         if AuthType == "x509":
             formatdict["init"] += """
@@ -549,12 +581,13 @@
     READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
 
                 if direction == "output":
+                    # TODO: publish at init
                     # formatdict["init"] += " NOTHING ! publish doesn't need init. "
                     formatdict["publish"] += """
     WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
 
         Ccode = template.format(**formatdict)
-        
+
         return Ccode
 
 if __name__ == "__main__":
@@ -572,6 +605,7 @@
     config["URI"] = sys.argv[1] if argc>1 else "tcp://localhost:1883"
     config["clientID"] = sys.argv[2] if argc>2 else ""
     config["AuthType"] = None
+    config["UseMQTT5"] = True
 
     if argc > 3:
         AuthType = sys.argv[3]
@@ -595,7 +629,7 @@
     def OnGenerate(evt):
         dlg = wx.FileDialog(
             frame, message="Generate file as ...", defaultDir=os.getcwd(),
-            defaultFile="", 
+            defaultFile="",
             wildcard="C (*.c)|*.c", style=wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT
             )
 
@@ -620,9 +654,9 @@
 int main(int argc, char *argv[]) {
 
     __init_test(arc,argv);
-   
+
     __retrieve_test();
-   
+
     __publish_test();
 
     __cleanup_test();
@@ -655,7 +689,7 @@
     def OnSave(evt):
         dlg = wx.FileDialog(
             frame, message="Save file as ...", defaultDir=os.getcwd(),
-            defaultFile="", 
+            defaultFile="",
             wildcard="CSV (*.csv)|*.csv", style=wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT
             )