# HG changeset patch # User Edouard Tisserant # Date 1721115705 -7200 # Node ID 987c69b1582fbd3565ab33a3cfe753f2a9597f2b # Parent 150599d9073f1438dee97a40ca8b70daf419a18f MQTT: WIP, added publish thread waking-up when published variables change. diff -r 150599d9073f -r 987c69b1582f mqtt/mqtt_client_gen.py --- a/mqtt/mqtt_client_gen.py Mon Jul 15 09:40:11 2024 +0200 +++ b/mqtt/mqtt_client_gen.py Tue Jul 16 09:41:45 2024 +0200 @@ -367,9 +367,13 @@ LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); }} +#define CHANGED 1 +#define UNCHANGED 0 + #define DECL_VAR(iec_type, C_type, c_loc_name) \\ static C_type PLC_##c_loc_name##_buf = 0; \\ static C_type MQTT_##c_loc_name##_buf = 0; \\ +static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \\ C_type *c_loc_name = &PLC_##c_loc_name##_buf; {decl} @@ -380,7 +384,21 @@ #else static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; #endif -static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent + +/* condition to quit publish thread */ +static int MQTT_stop_thread = 0; + +/* condition to wakeup publish thread */ +static int MQTT_any_pub_var_changed = 0; + +/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */ +static pthread_mutex_t MQTT_mutex; + +/* wakeup publish thread when PLC changed published variable */ +static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; + +/* publish thread */ +static pthread_t publishThread; #define INIT_TOPIC(topic, iec_type, c_loc_name) \\ {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, @@ -480,9 +498,6 @@ }} - - - #ifdef USE_MQTT_5 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\ @@ -501,11 +516,45 @@ _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ if (rc != MQTTCLIENT_SUCCESS) \\ {{ \\ - LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\ + LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\ + /* TODO update status variable accordingly */ \\ }} \\ }} - +#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \\ + if(MQTT_##c_loc_name##_state == CHANGED) \\ + {{ \\ + int rc; \\ + _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ + if (rc != MQTTCLIENT_SUCCESS) \\ + {{ \\ + LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc); \\ + /* TODO update status variable accordingly */ \\ + }} else {{ \\ + MQTT_##c_loc_name##_state = UNCHANGED; \\ + }} \\ + }} + +static void *__publish_thread(void *_unused) {{ + int rc = 0; + while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{ + pthread_cond_wait(&MQTT_new_data, &MQTT_mutex); + if(MQTT_any_pub_var_changed){{ + + /* publish changes, and reset variable's state to UNCHANGED */ +{publish_changes} + MQTT_any_pub_var_changed = 0; + }} + + pthread_mutex_unlock(&MQTT_mutex); + }} + + if(!MQTT_stop_thread){{ + /* if thread exits outside of normal shutdown, report error*/ + LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc); + }} +}} + int __init_{locstr}(int argc,char **argv) {{ char *uri = "{uri}"; @@ -552,6 +601,7 @@ {init} /* TODO start publish thread */ + rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL); return 0; }} @@ -567,28 +617,41 @@ }} #define WRITE_VALUE(c_loc_name, C_type) \\ - MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; + if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\ + MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\ + MQTT_##c_loc_name##_state = CHANGED; \\ + MQTT_any_pub_var_changed = 1; \\ + }} void __publish_{locstr}(void) {{ - /* TODO try take mutex */ + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + MQTT_any_pub_var_changed = 0; + /* copy PLC_* variables to MQTT_*, and mark those who changed */ {publish} - /* TODO free mutex */ - /* TODO unblock publish thread */ + /* if any change detcted, unblock publish thread */ + if(MQTT_any_pub_var_changed){{ + pthread_cond_signal(&MQTT_new_data); + }} + pthread_mutex_unlock(&MQTT_mutex); + }} else {{ + /* TODO if couldn't lock mutex set status variable accordingly */ + }} }} """ formatdict = dict( - locstr = locstr, - uri = config["URI"], - clientID = config["clientID"], - decl = "", - topics = "", - cleanup = "", - init = "", - retrieve = "", - publish = "" + locstr = locstr, + uri = config["URI"], + clientID = config["clientID"], + decl = "", + topics = "", + cleanup = "", + init = "", + retrieve = "", + publish = "", + publish_changes = "" ) @@ -637,6 +700,8 @@ INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) formatdict["publish"] += """ WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) + formatdict["publish_changes"] += """ + PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) Ccode = template.format(**formatdict)