# HG changeset patch # User Edouard Tisserant # Date 1721657352 -7200 # Node ID 84a668564748af654f6582ad6eec412ca0baaae0 # Parent c399fe412dbd46a5abf41861ba16d9961d1694c8 MQTT: take C part away from python code for readability. C lines change a bit because of unnecessary escaping for '\'. diff -r c399fe412dbd -r 84a668564748 mqtt/mqtt_client_gen.py --- a/mqtt/mqtt_client_gen.py Mon Jul 22 12:12:33 2024 +0200 +++ b/mqtt/mqtt_client_gen.py Mon Jul 22 16:09:12 2024 +0200 @@ -9,6 +9,8 @@ import wx import wx.dataview as dv +import util.paths as paths + # from perfect_hash import generate_code, IntSaltHash MQTT_IEC_types = dict( @@ -299,363 +301,10 @@ writer.writerow([direction] + row) def GenerateC(self, path, locstr, config): - template = """/* code generated by beremiz MQTT extension */ - -#include -#include -#include -#include -#include - -#include "MQTTClient.h" -#include "MQTTClientPersistence.h" - -#define _Log(level, ...) \\ - {{ \\ - char mstr[256]; \\ - snprintf(mstr, 255, __VA_ARGS__); \\ - LogMessage(level, mstr, strlen(mstr)); \\ - printf(__VA_ARGS__); \\ - fflush(stdout); \\ - }} - -#define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); -#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); -#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); - -void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) -{{ - LogInfo("Paho MQTT Trace : %d, %s\\n", level, message); -}} - -#define CHANGED 1 -#define UNCHANGED 0 - -#define DECL_VAR(iec_type, C_type, c_loc_name) \\ -static C_type PLC_##c_loc_name##_buf = 0; \\ -static C_type MQTT_##c_loc_name##_buf = 0; \\ -static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \\ -C_type *c_loc_name = &PLC_##c_loc_name##_buf; - -{decl} - -static MQTTClient client; -#ifdef USE_MQTT_5 -static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; -#else -static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; -#endif - -/* condition to quit publish thread */ -static int MQTT_stop_thread = 0; - -/* condition to wakeup publish thread */ -static int MQTT_any_pub_var_changed = 0; - -/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */ -static pthread_mutex_t MQTT_mutex; - -/* wakeup publish thread when PLC changed published variable */ -static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; - -/* publish thread */ -static pthread_t publishThread; - -#define INIT_TOPIC(topic, iec_type, c_loc_name) \\ -{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}}, - -static struct {{ - const char *topic; //null terminated topic string - void *mqtt_pdata; // pointer to data from/for MQTT stack - int *mqtt_pchanged; // pointer to changed flag - __IEC_types_enum vartype; -}} topics [] = {{ -{topics} -}}; - -static int _connect_mqtt(void) -{{ - int rc; - -#ifdef USE_MQTT_5 - MQTTProperties props = MQTTProperties_initializer; - MQTTProperties willProps = MQTTProperties_initializer; - MQTTResponse response = MQTTResponse_initializer; - - response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); - rc = response.reasonCode; - MQTTResponse_free(response); -#else - rc = MQTTClient_connect(client, &conn_opts); -#endif - - return rc; -}} - -void __cleanup_{locstr}(void) -{{ - int rc; - - /* stop publish thread */ - MQTT_stop_thread = 1; - if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ - /* unblock publish thread so that it can stop normally */ - pthread_cond_signal(&MQTT_new_data); - pthread_mutex_unlock(&MQTT_mutex); - }} - pthread_join(publishThread, NULL); - -#ifdef USE_MQTT_5 - if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) -#else - if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS) -#endif - {{ - LogError("MQTT Failed to disconnect, return code %d\\n", rc); - }} - MQTTClient_destroy(&client); -}} - -void connectionLost(void* context, char* reason) -{{ - int rc; - LogWarning("ConnectionLost, reconnecting\\n"); - // rc = _connect_mqtt(); - - // if (rc != MQTTCLIENT_SUCCESS) {{ - // LogError("MQTT reconnect Failed, waiting 5 seconds, return code %d\\n", rc); - // /* wait if error */ - // sleep(5); - // }} -}} - -int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) -{{ - int low = 0; - int size = sizeof(topics) / sizeof(topics[0]); - int high = size - 1; - int mid; - - // bisect topic among subscribed topics - while (low <= high) {{ - int res; - mid = low + (high - low) / 2; - res = strncmp(topics[mid].topic, topicName, topicLen); - - // Check if key is present at mid - if (res == 0) - goto found; - - // If key greater, ignore left half - if (res < 0) - low = mid + 1; - - // If key is smaller, ignore right half - else - high = mid - 1; - }} - // If we reach here, then the element was not present - LogWarning("MQTT unknown topic: %s", topicName); - goto exit; - -found: - if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{ - memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); - *topics[mid].mqtt_pchanged = 1; - }} else {{ - LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", - topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen); - }} -exit: - MQTTClient_freeMessage(&message); - MQTTClient_free(topicName); - return 1; -}} - -#define INIT_NoAuth() \\ - LogInfo("MQTT Init no auth\\n"); - -#define INIT_x509(PrivateKey, Certificate) \\ - LogInfo("MQTT Init x509 %s,%s\\n", PrivateKey, Certificate); - /* TODO */ - -#define INIT_UserPassword(User, Password) \\ - LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password); \\ - conn_opts.username = User; \\ - conn_opts.password = Password; - -#ifdef USE_MQTT_5 -#define _SUBSCRIBE(Topic, QoS) \\ - MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\ - /* when using MQTT5 responce code is 1 for some reason even if no error */ \\ - rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \\ - MQTTResponse_free(response); -#else -#define _SUBSCRIBE(Topic, QoS) \\ - rc = MQTTClient_subscribe(client, #Topic, QoS); -#endif - -#define INIT_SUBSCRIPTION(Topic, QoS) \\ - {{ \\ - int rc; \\ - _SUBSCRIBE(Topic, QoS) \\ - if (rc != MQTTCLIENT_SUCCESS) \\ - {{ \\ - LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\ - }} \\ - }} - - -#ifdef USE_MQTT_5 -#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ - MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\ - &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \\ - rc = response.reasonCode; \\ - MQTTResponse_free(response); -#else -#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ - rc = MQTTClient_publish(client, #Topic, sizeof(C_type), \\ - &PLC_##c_loc_name##_buf, QoS, Retained, NULL); -#endif - -#define INIT_PUBLICATION(Topic, QoS, C_type, c_loc_name, Retained) \\ - {{ \\ - int rc; \\ - _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ - if (rc != MQTTCLIENT_SUCCESS) \\ - {{ \\ - LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\ - /* TODO update status variable accordingly */ \\ - }} \\ - }} - -#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \\ - if(MQTT_##c_loc_name##_state == CHANGED) \\ - {{ \\ - int rc; \\ - _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ - if (rc != MQTTCLIENT_SUCCESS) \\ - {{ \\ - LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc); \\ - /* TODO update status variable accordingly */ \\ - }} else {{ \\ - MQTT_##c_loc_name##_state = UNCHANGED; \\ - }} \\ - }} - -static void *__publish_thread(void *_unused) {{ - int rc = 0; - while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{ - pthread_cond_wait(&MQTT_new_data, &MQTT_mutex); - if(MQTT_any_pub_var_changed && MQTTClient_isConnected(client)){{ - - /* publish changes, and reset variable's state to UNCHANGED */ -{publish_changes} - MQTT_any_pub_var_changed = 0; - }} - - pthread_mutex_unlock(&MQTT_mutex); - - if(MQTT_stop_thread) break; - }} - - if(!MQTT_stop_thread){{ - /* if thread exits outside of normal shutdown, report error*/ - LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc); - }} -}} - -int __init_{locstr}(int argc,char **argv) -{{ - char *uri = "{uri}"; - char *clientID = "{clientID}"; - int rc; - - MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; - -#ifdef USE_MQTT_5 - conn_opts.MQTTVersion = MQTTVERSION_5; - conn_opts.cleanstart = 1; - - createOpts.MQTTVersion = MQTTVERSION_5; -#else - conn_opts.cleansession = 1; -#endif - - MQTTClient_setTraceCallback(trace_callback); - MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); - - - rc = MQTTClient_createWithOptions( - &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); - if (rc != MQTTCLIENT_SUCCESS) - {{ - LogError("MQTT Failed to create client, return code %d\\n", rc); - return rc; - }} - - rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); - if (rc != MQTTCLIENT_SUCCESS) - {{ - LogError("MQTT Failed to set callbacks, return code %d\\n", rc); - return rc; - }} - - rc = _connect_mqtt(); - - if (rc != MQTTCLIENT_SUCCESS) {{ - LogError("MQTT Connect Failed, return code %d\\n", rc); - return rc; - }} - -{init} - - /* TODO start publish thread */ - rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL); - - return 0; -}} - -#define READ_VALUE(c_loc_name, C_type) \\ - if(MQTT_##c_loc_name##_state == CHANGED){{ \\ - /* TODO care about endianess */ \\ - PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \\ - MQTT_##c_loc_name##_state = UNCHANGED; \\ - }} - -void __retrieve_{locstr}(void) -{{ - if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ -{retrieve} - pthread_mutex_unlock(&MQTT_mutex); - }} -}} - -#define WRITE_VALUE(c_loc_name, C_type) \\ - /* TODO care about endianess */ \\ - if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\ - MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\ - MQTT_##c_loc_name##_state = CHANGED; \\ - MQTT_any_pub_var_changed = 1; \\ - }} - -void __publish_{locstr}(void) -{{ - if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ - MQTT_any_pub_var_changed = 0; - /* copy PLC_* variables to MQTT_*, and mark those who changed */ -{publish} - /* if any change detcted, unblock publish thread */ - if(MQTT_any_pub_var_changed){{ - pthread_cond_signal(&MQTT_new_data); - }} - pthread_mutex_unlock(&MQTT_mutex); - }} else {{ - /* TODO if couldn't lock mutex set status variable accordingly */ - }} -}} - -""" + c_template_filepath = paths.AbsNeighbourFile(__file__, "mqtt_template.c") + c_template_file = open(c_template_filepath , 'rb') + c_template = c_template_file.read() + c_template_file.close() formatdict = dict( locstr = locstr, @@ -716,7 +365,7 @@ formatdict["retrieve"] += """ READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) - Ccode = template.format(**formatdict) + Ccode = c_template.format(**formatdict) return Ccode diff -r c399fe412dbd -r 84a668564748 mqtt/mqtt_template.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mqtt/mqtt_template.c Mon Jul 22 16:09:12 2024 +0200 @@ -0,0 +1,343 @@ +/* code generated by beremiz MQTT extension */ + +#include +#include +#include +#include +#include + +#include "MQTTClient.h" +#include "MQTTClientPersistence.h" + +#define _Log(level, ...) \ + {{ \ + /* char mstr[256]; */ \ + /* snprintf(mstr, 255, __VA_ARGS__); */ \ + /* LogMessage(level, mstr, strlen(mstr)); */ \ + printf(__VA_ARGS__); \ + fflush(stdout); \ + }} + +#define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); +#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); +#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); + +void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) +{{ + LogInfo("Paho MQTT Trace : %d, %s\n", level, message); +}} + +#define CHANGED 1 +#define UNCHANGED 0 + +#define DECL_VAR(iec_type, C_type, c_loc_name) \ +static C_type PLC_##c_loc_name##_buf = 0; \ +static C_type MQTT_##c_loc_name##_buf = 0; \ +static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \ +C_type *c_loc_name = &PLC_##c_loc_name##_buf; + +{decl} + +static MQTTClient client; +#ifdef USE_MQTT_5 +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; +#else +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; +#endif + +/* condition to quit publish thread */ +static int MQTT_stop_thread = 0; + +/* condition to wakeup publish thread */ +static int MQTT_any_pub_var_changed = 0; + +/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */ +static pthread_mutex_t MQTT_mutex; + +/* wakeup publish thread when PLC changed published variable */ +static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; + +/* publish thread */ +static pthread_t publishThread; + +#define INIT_TOPIC(topic, iec_type, c_loc_name) \ +{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}}, + +static struct {{ + const char *topic; //null terminated topic string + void *mqtt_pdata; // pointer to data from/for MQTT stack + int *mqtt_pchanged; // pointer to changed flag + __IEC_types_enum vartype; +}} topics [] = {{ +{topics} +}}; + +static int _connect_mqtt(void) +{{ + int rc; + +#ifdef USE_MQTT_5 + MQTTProperties props = MQTTProperties_initializer; + MQTTProperties willProps = MQTTProperties_initializer; + MQTTResponse response = MQTTResponse_initializer; + + response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); + rc = response.reasonCode; + MQTTResponse_free(response); +#else + rc = MQTTClient_connect(client, &conn_opts); +#endif + + return rc; +}} + +void __cleanup_{locstr}(void) +{{ + int rc; + + /* stop publish thread */ + MQTT_stop_thread = 1; + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + /* unblock publish thread so that it can stop normally */ + pthread_cond_signal(&MQTT_new_data); + pthread_mutex_unlock(&MQTT_mutex); + }} + pthread_join(publishThread, NULL); + +#ifdef USE_MQTT_5 + if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) +#else + if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS) +#endif + {{ + LogError("MQTT Failed to disconnect, return code %d\n", rc); + }} + MQTTClient_destroy(&client); +}} + +int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) +{{ + int low = 0; + int size = sizeof(topics) / sizeof(topics[0]); + int high = size - 1; + int mid; + + // bisect topic among subscribed topics + while (low <= high) {{ + int res; + mid = low + (high - low) / 2; + res = strncmp(topics[mid].topic, topicName, topicLen); + + // Check if key is present at mid + if (res == 0) + goto found; + + // If key greater, ignore left half + if (res < 0) + low = mid + 1; + + // If key is smaller, ignore right half + else + high = mid - 1; + }} + // If we reach here, then the element was not present + LogWarning("MQTT unknown topic: %s", topicName); + goto exit; + +found: + if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{ + memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); + *topics[mid].mqtt_pchanged = 1; + }} else {{ + LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", + topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen); + }} +exit: + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +}} + +#define INIT_NoAuth() \ + LogInfo("MQTT Init no auth\n"); + +#define INIT_x509(PrivateKey, Certificate) \ + LogInfo("MQTT Init x509 %s,%s\n", PrivateKey, Certificate); + /* TODO */ + +#define INIT_UserPassword(User, Password) \ + LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \ + conn_opts.username = User; \ + conn_opts.password = Password; + +#ifdef USE_MQTT_5 +#define _SUBSCRIBE(Topic, QoS) \ + MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \ + /* when using MQTT5 responce code is 1 for some reason even if no error */ \ + rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \ + MQTTResponse_free(response); +#else +#define _SUBSCRIBE(Topic, QoS) \ + rc = MQTTClient_subscribe(client, #Topic, QoS); +#endif + +#define INIT_SUBSCRIPTION(Topic, QoS) \ + {{ \ + int rc; \ + _SUBSCRIBE(Topic, QoS) \ + if (rc != MQTTCLIENT_SUCCESS) \ + {{ \ + LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \ + }} \ + }} + + +#ifdef USE_MQTT_5 +#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \ + &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \ + rc = response.reasonCode; \ + MQTTResponse_free(response); +#else +#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + rc = MQTTClient_publish(client, #Topic, sizeof(C_type), \ + &PLC_##c_loc_name##_buf, QoS, Retained, NULL); +#endif + +#define INIT_PUBLICATION(Topic, QoS, C_type, c_loc_name, Retained) \ + {{ \ + int rc; \ + _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + if (rc != MQTTCLIENT_SUCCESS) \ + {{ \ + LogError("MQTT client failed to init publication of '%s', return code %d\n", #Topic, rc);\ + /* TODO update status variable accordingly */ \ + }} \ + }} + +#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \ + if(MQTT_##c_loc_name##_state == CHANGED) \ + {{ \ + int rc; \ + _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ + if (rc != MQTTCLIENT_SUCCESS) \ + {{ \ + LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \ + /* TODO update status variable accordingly */ \ + }} else {{ \ + MQTT_##c_loc_name##_state = UNCHANGED; \ + }} \ + }} + +static void *__publish_thread(void *_unused) {{ + int rc = 0; + while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{ + pthread_cond_wait(&MQTT_new_data, &MQTT_mutex); + if(MQTT_any_pub_var_changed && MQTTClient_isConnected(client)){{ + + /* publish changes, and reset variable's state to UNCHANGED */ +{publish_changes} + MQTT_any_pub_var_changed = 0; + }} + + pthread_mutex_unlock(&MQTT_mutex); + + if(MQTT_stop_thread) break; + }} + + if(!MQTT_stop_thread){{ + /* if thread exits outside of normal shutdown, report error*/ + LogError("MQTT client thread exited unexpectedly, return code %d\n", rc); + }} +}} + +int __init_{locstr}(int argc,char **argv) +{{ + char *uri = "{uri}"; + char *clientID = "{clientID}"; + int rc; + + MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; + +#ifdef USE_MQTT_5 + conn_opts.MQTTVersion = MQTTVERSION_5; + conn_opts.cleanstart = 1; + + createOpts.MQTTVersion = MQTTVERSION_5; +#else + conn_opts.cleansession = 1; +#endif + + MQTTClient_setTraceCallback(trace_callback); + MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); + + + rc = MQTTClient_createWithOptions( + &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); + if (rc != MQTTCLIENT_SUCCESS) + {{ + LogError("MQTT Failed to create client, return code %d\n", rc); + return rc; + }} + + rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL); + if (rc != MQTTCLIENT_SUCCESS) + {{ + LogError("MQTT Failed to set callbacks, return code %d\n", rc); + return rc; + }} + + rc = _connect_mqtt(); + + if (rc != MQTTCLIENT_SUCCESS) {{ + LogError("MQTT Connect Failed, return code %d\n", rc); + return rc; + }} + +{init} + + /* TODO start publish thread */ + rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL); + + return 0; +}} + +#define READ_VALUE(c_loc_name, C_type) \ + if(MQTT_##c_loc_name##_state == CHANGED){{ \ + /* TODO care about endianess */ \ + PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \ + MQTT_##c_loc_name##_state = UNCHANGED; \ + }} + +void __retrieve_{locstr}(void) +{{ + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ +{retrieve} + pthread_mutex_unlock(&MQTT_mutex); + }} +}} + +#define WRITE_VALUE(c_loc_name, C_type) \ + /* TODO care about endianess */ \ + if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \ + MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \ + MQTT_##c_loc_name##_state = CHANGED; \ + MQTT_any_pub_var_changed = 1; \ + }} + +void __publish_{locstr}(void) +{{ + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + MQTT_any_pub_var_changed = 0; + /* copy PLC_* variables to MQTT_*, and mark those who changed */ +{publish} + /* if any change detcted, unblock publish thread */ + if(MQTT_any_pub_var_changed){{ + pthread_cond_signal(&MQTT_new_data); + }} + pthread_mutex_unlock(&MQTT_mutex); + }} else {{ + /* TODO if couldn't lock mutex set status variable accordingly */ + }} +}} +