--- a/mqtt/mqtt_client_gen.py Mon Jul 15 09:40:11 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Tue Jul 16 09:41:45 2024 +0200
@@ -367,9 +367,13 @@
LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
}}
+#define CHANGED 1
+#define UNCHANGED 0
+
#define DECL_VAR(iec_type, C_type, c_loc_name) \\
static C_type PLC_##c_loc_name##_buf = 0; \\
static C_type MQTT_##c_loc_name##_buf = 0; \\
+static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \\
C_type *c_loc_name = &PLC_##c_loc_name##_buf;
{decl}
@@ -380,7 +384,21 @@
#else
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
#endif
-static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent
+
+/* condition to quit publish thread */
+static int MQTT_stop_thread = 0;
+
+/* condition to wakeup publish thread */
+static int MQTT_any_pub_var_changed = 0;
+
+/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
+static pthread_mutex_t MQTT_mutex;
+
+/* wakeup publish thread when PLC changed published variable */
+static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
+
+/* publish thread */
+static pthread_t publishThread;
#define INIT_TOPIC(topic, iec_type, c_loc_name) \\
{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
@@ -480,9 +498,6 @@
}}
-
-
-
#ifdef USE_MQTT_5
#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\
@@ -501,11 +516,45 @@
_PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
if (rc != MQTTCLIENT_SUCCESS) \\
{{ \\
- LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\
+ LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\
+ /* TODO update status variable accordingly */ \\
}} \\
}}
-
+#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \\
+ if(MQTT_##c_loc_name##_state == CHANGED) \\
+ {{ \\
+ int rc; \\
+ _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
+ if (rc != MQTTCLIENT_SUCCESS) \\
+ {{ \\
+ LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc); \\
+ /* TODO update status variable accordingly */ \\
+ }} else {{ \\
+ MQTT_##c_loc_name##_state = UNCHANGED; \\
+ }} \\
+ }}
+
+static void *__publish_thread(void *_unused) {{
+ int rc = 0;
+ while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
+ pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
+ if(MQTT_any_pub_var_changed){{
+
+ /* publish changes, and reset variable's state to UNCHANGED */
+{publish_changes}
+ MQTT_any_pub_var_changed = 0;
+ }}
+
+ pthread_mutex_unlock(&MQTT_mutex);
+ }}
+
+ if(!MQTT_stop_thread){{
+ /* if thread exits outside of normal shutdown, report error*/
+ LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc);
+ }}
+}}
+
int __init_{locstr}(int argc,char **argv)
{{
char *uri = "{uri}";
@@ -552,6 +601,7 @@
{init}
/* TODO start publish thread */
+ rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
return 0;
}}
@@ -567,28 +617,41 @@
}}
#define WRITE_VALUE(c_loc_name, C_type) \\
- MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf;
+ if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\
+ MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\
+ MQTT_##c_loc_name##_state = CHANGED; \\
+ MQTT_any_pub_var_changed = 1; \\
+ }}
void __publish_{locstr}(void)
{{
- /* TODO try take mutex */
+ if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ MQTT_any_pub_var_changed = 0;
+ /* copy PLC_* variables to MQTT_*, and mark those who changed */
{publish}
- /* TODO free mutex */
- /* TODO unblock publish thread */
+ /* if any change detcted, unblock publish thread */
+ if(MQTT_any_pub_var_changed){{
+ pthread_cond_signal(&MQTT_new_data);
+ }}
+ pthread_mutex_unlock(&MQTT_mutex);
+ }} else {{
+ /* TODO if couldn't lock mutex set status variable accordingly */
+ }}
}}
"""
formatdict = dict(
- locstr = locstr,
- uri = config["URI"],
- clientID = config["clientID"],
- decl = "",
- topics = "",
- cleanup = "",
- init = "",
- retrieve = "",
- publish = ""
+ locstr = locstr,
+ uri = config["URI"],
+ clientID = config["clientID"],
+ decl = "",
+ topics = "",
+ cleanup = "",
+ init = "",
+ retrieve = "",
+ publish = "",
+ publish_changes = ""
)
@@ -637,6 +700,8 @@
INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
formatdict["publish"] += """
WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
+ formatdict["publish_changes"] += """
+ PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
Ccode = template.format(**formatdict)