MQTT: WIP, added publish thread waking-up when published variables change.
authorEdouard Tisserant <edouard@beremiz.fr>
Tue, 16 Jul 2024 09:41:45 +0200
changeset 3989 987c69b1582f
parent 3988 150599d9073f
child 3990 24656e0e8732
MQTT: WIP, added publish thread waking-up when published variables change.
mqtt/mqtt_client_gen.py
--- a/mqtt/mqtt_client_gen.py	Mon Jul 15 09:40:11 2024 +0200
+++ b/mqtt/mqtt_client_gen.py	Tue Jul 16 09:41:45 2024 +0200
@@ -367,9 +367,13 @@
     LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
 }}
 
+#define CHANGED 1
+#define UNCHANGED 0
+
 #define DECL_VAR(iec_type, C_type, c_loc_name)                                                     \\
 static C_type PLC_##c_loc_name##_buf = 0;                                                          \\
 static C_type MQTT_##c_loc_name##_buf = 0;                                                         \\
+static int MQTT_##c_loc_name##_state = UNCHANGED;  /* systematically published at init */          \\
 C_type *c_loc_name = &PLC_##c_loc_name##_buf;
 
 {decl}
@@ -380,7 +384,21 @@
 #else
 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
 #endif
-static pthread_mutex_t clientMutex;  // mutex to keep PLC data consistent
+
+/* condition to quit publish thread */
+static int MQTT_stop_thread = 0;
+
+/* condition to wakeup publish thread */
+static int MQTT_any_pub_var_changed = 0;
+
+/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
+static pthread_mutex_t MQTT_mutex;
+
+/* wakeup publish thread when PLC changed published variable */
+static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
+
+/* publish thread */
+static pthread_t publishThread;
 
 #define INIT_TOPIC(topic, iec_type, c_loc_name)                                                    \\
 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
@@ -480,9 +498,6 @@
     }}
 
 
-
-
-
 #ifdef USE_MQTT_5
 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
         MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type),               \\
@@ -501,11 +516,45 @@
         _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
         {{                                                                                        \\
-            LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);   \\
+            LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\
+            /* TODO update status variable accordingly */                                         \\
         }}                                                                                        \\
     }}
 
-
+#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained)                                  \\
+    if(MQTT_##c_loc_name##_state == CHANGED)                                                      \\
+    {{                                                                                            \\
+        int rc;                                                                                   \\
+        _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
+        if (rc != MQTTCLIENT_SUCCESS)                                                             \\
+        {{                                                                                        \\
+            LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc);        \\
+            /* TODO update status variable accordingly */                                         \\
+        }} else {{                                                                                \\
+            MQTT_##c_loc_name##_state = UNCHANGED;                                                \\
+        }}                                                                                        \\
+    }}
+
+static void *__publish_thread(void *_unused) {{
+    int rc = 0;
+    while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
+        pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
+        if(MQTT_any_pub_var_changed){{
+
+            /* publish changes, and reset variable's state to UNCHANGED */
+{publish_changes}
+            MQTT_any_pub_var_changed = 0;
+        }}
+
+        pthread_mutex_unlock(&MQTT_mutex);
+    }}
+
+    if(!MQTT_stop_thread){{
+        /* if thread exits outside of normal shutdown, report error*/
+        LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc);
+    }}
+}}
+    
 int __init_{locstr}(int argc,char **argv)
 {{
     char *uri = "{uri}";
@@ -552,6 +601,7 @@
 {init}
 
     /* TODO start publish thread */
+    rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
 
     return 0;
 }}
@@ -567,28 +617,41 @@
 }}
 
 #define WRITE_VALUE(c_loc_name, C_type) \\
-    MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf;
+    if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\
+        MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\
+        MQTT_##c_loc_name##_state = CHANGED; \\
+        MQTT_any_pub_var_changed = 1; \\
+    }}
 
 void __publish_{locstr}(void)
 {{
-    /* TODO try take mutex */
+    if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+        MQTT_any_pub_var_changed = 0;
+        /* copy PLC_* variables to MQTT_*, and mark those who changed */
 {publish}
-    /* TODO free mutex */
-    /* TODO unblock publish thread */
+        /* if any change detcted, unblock publish thread */
+        if(MQTT_any_pub_var_changed){{
+            pthread_cond_signal(&MQTT_new_data);
+        }}
+        pthread_mutex_unlock(&MQTT_mutex);
+    }} else {{
+        /* TODO if couldn't lock mutex set status variable accordingly */ 
+    }}
 }}
 
 """
 
         formatdict = dict(
-            locstr   = locstr,
-            uri      = config["URI"],
-            clientID = config["clientID"],
-            decl     = "",
-            topics   = "",
-            cleanup  = "",
-            init     = "",
-            retrieve = "",
-            publish  = ""
+            locstr          = locstr,
+            uri             = config["URI"],
+            clientID        = config["clientID"],
+            decl            = "",
+            topics          = "",
+            cleanup         = "",
+            init            = "",
+            retrieve        = "",
+            publish         = "",
+            publish_changes = ""
         )
 
 
@@ -637,6 +700,8 @@
     INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
                     formatdict["publish"] += """
     WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
+                    formatdict["publish_changes"] += """
+    PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
 
         Ccode = template.format(**formatdict)