MQTT: take C part away from python code for readability.
C lines change a bit because of unnecessary escaping for '\'.
--- a/mqtt/mqtt_client_gen.py Mon Jul 22 12:12:33 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Mon Jul 22 16:09:12 2024 +0200
@@ -9,6 +9,8 @@
import wx
import wx.dataview as dv
+import util.paths as paths
+
# from perfect_hash import generate_code, IntSaltHash
MQTT_IEC_types = dict(
@@ -299,363 +301,10 @@
writer.writerow([direction] + row)
def GenerateC(self, path, locstr, config):
- template = """/* code generated by beremiz MQTT extension */
-
-#include <stdint.h>
-#include <unistd.h>
-#include <pthread.h>
-#include <string.h>
-#include <stdio.h>
-
-#include "MQTTClient.h"
-#include "MQTTClientPersistence.h"
-
-#define _Log(level, ...) \\
- {{ \\
- char mstr[256]; \\
- snprintf(mstr, 255, __VA_ARGS__); \\
- LogMessage(level, mstr, strlen(mstr)); \\
- printf(__VA_ARGS__); \\
- fflush(stdout); \\
- }}
-
-#define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
-#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
-#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
-
-void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
-{{
- LogInfo("Paho MQTT Trace : %d, %s\\n", level, message);
-}}
-
-#define CHANGED 1
-#define UNCHANGED 0
-
-#define DECL_VAR(iec_type, C_type, c_loc_name) \\
-static C_type PLC_##c_loc_name##_buf = 0; \\
-static C_type MQTT_##c_loc_name##_buf = 0; \\
-static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \\
-C_type *c_loc_name = &PLC_##c_loc_name##_buf;
-
-{decl}
-
-static MQTTClient client;
-#ifdef USE_MQTT_5
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
-#else
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-#endif
-
-/* condition to quit publish thread */
-static int MQTT_stop_thread = 0;
-
-/* condition to wakeup publish thread */
-static int MQTT_any_pub_var_changed = 0;
-
-/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
-static pthread_mutex_t MQTT_mutex;
-
-/* wakeup publish thread when PLC changed published variable */
-static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
-
-/* publish thread */
-static pthread_t publishThread;
-
-#define INIT_TOPIC(topic, iec_type, c_loc_name) \\
-{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}},
-
-static struct {{
- const char *topic; //null terminated topic string
- void *mqtt_pdata; // pointer to data from/for MQTT stack
- int *mqtt_pchanged; // pointer to changed flag
- __IEC_types_enum vartype;
-}} topics [] = {{
-{topics}
-}};
-
-static int _connect_mqtt(void)
-{{
- int rc;
-
-#ifdef USE_MQTT_5
- MQTTProperties props = MQTTProperties_initializer;
- MQTTProperties willProps = MQTTProperties_initializer;
- MQTTResponse response = MQTTResponse_initializer;
-
- response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
- rc = response.reasonCode;
- MQTTResponse_free(response);
-#else
- rc = MQTTClient_connect(client, &conn_opts);
-#endif
-
- return rc;
-}}
-
-void __cleanup_{locstr}(void)
-{{
- int rc;
-
- /* stop publish thread */
- MQTT_stop_thread = 1;
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
- /* unblock publish thread so that it can stop normally */
- pthread_cond_signal(&MQTT_new_data);
- pthread_mutex_unlock(&MQTT_mutex);
- }}
- pthread_join(publishThread, NULL);
-
-#ifdef USE_MQTT_5
- if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
-#else
- if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS)
-#endif
- {{
- LogError("MQTT Failed to disconnect, return code %d\\n", rc);
- }}
- MQTTClient_destroy(&client);
-}}
-
-void connectionLost(void* context, char* reason)
-{{
- int rc;
- LogWarning("ConnectionLost, reconnecting\\n");
- // rc = _connect_mqtt();
-
- // if (rc != MQTTCLIENT_SUCCESS) {{
- // LogError("MQTT reconnect Failed, waiting 5 seconds, return code %d\\n", rc);
- // /* wait if error */
- // sleep(5);
- // }}
-}}
-
-int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
-{{
- int low = 0;
- int size = sizeof(topics) / sizeof(topics[0]);
- int high = size - 1;
- int mid;
-
- // bisect topic among subscribed topics
- while (low <= high) {{
- int res;
- mid = low + (high - low) / 2;
- res = strncmp(topics[mid].topic, topicName, topicLen);
-
- // Check if key is present at mid
- if (res == 0)
- goto found;
-
- // If key greater, ignore left half
- if (res < 0)
- low = mid + 1;
-
- // If key is smaller, ignore right half
- else
- high = mid - 1;
- }}
- // If we reach here, then the element was not present
- LogWarning("MQTT unknown topic: %s", topicName);
- goto exit;
-
-found:
- if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
- memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
- *topics[mid].mqtt_pchanged = 1;
- }} else {{
- LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.",
- topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
- }}
-exit:
- MQTTClient_freeMessage(&message);
- MQTTClient_free(topicName);
- return 1;
-}}
-
-#define INIT_NoAuth() \\
- LogInfo("MQTT Init no auth\\n");
-
-#define INIT_x509(PrivateKey, Certificate) \\
- LogInfo("MQTT Init x509 %s,%s\\n", PrivateKey, Certificate);
- /* TODO */
-
-#define INIT_UserPassword(User, Password) \\
- LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password); \\
- conn_opts.username = User; \\
- conn_opts.password = Password;
-
-#ifdef USE_MQTT_5
-#define _SUBSCRIBE(Topic, QoS) \\
- MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\
- /* when using MQTT5 responce code is 1 for some reason even if no error */ \\
- rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \\
- MQTTResponse_free(response);
-#else
-#define _SUBSCRIBE(Topic, QoS) \\
- rc = MQTTClient_subscribe(client, #Topic, QoS);
-#endif
-
-#define INIT_SUBSCRIPTION(Topic, QoS) \\
- {{ \\
- int rc; \\
- _SUBSCRIBE(Topic, QoS) \\
- if (rc != MQTTCLIENT_SUCCESS) \\
- {{ \\
- LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\
- }} \\
- }}
-
-
-#ifdef USE_MQTT_5
-#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\
- &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \\
- rc = response.reasonCode; \\
- MQTTResponse_free(response);
-#else
-#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- rc = MQTTClient_publish(client, #Topic, sizeof(C_type), \\
- &PLC_##c_loc_name##_buf, QoS, Retained, NULL);
-#endif
-
-#define INIT_PUBLICATION(Topic, QoS, C_type, c_loc_name, Retained) \\
- {{ \\
- int rc; \\
- _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- if (rc != MQTTCLIENT_SUCCESS) \\
- {{ \\
- LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\
- /* TODO update status variable accordingly */ \\
- }} \\
- }}
-
-#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \\
- if(MQTT_##c_loc_name##_state == CHANGED) \\
- {{ \\
- int rc; \\
- _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
- if (rc != MQTTCLIENT_SUCCESS) \\
- {{ \\
- LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc); \\
- /* TODO update status variable accordingly */ \\
- }} else {{ \\
- MQTT_##c_loc_name##_state = UNCHANGED; \\
- }} \\
- }}
-
-static void *__publish_thread(void *_unused) {{
- int rc = 0;
- while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
- pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
- if(MQTT_any_pub_var_changed && MQTTClient_isConnected(client)){{
-
- /* publish changes, and reset variable's state to UNCHANGED */
-{publish_changes}
- MQTT_any_pub_var_changed = 0;
- }}
-
- pthread_mutex_unlock(&MQTT_mutex);
-
- if(MQTT_stop_thread) break;
- }}
-
- if(!MQTT_stop_thread){{
- /* if thread exits outside of normal shutdown, report error*/
- LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc);
- }}
-}}
-
-int __init_{locstr}(int argc,char **argv)
-{{
- char *uri = "{uri}";
- char *clientID = "{clientID}";
- int rc;
-
- MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
-
-#ifdef USE_MQTT_5
- conn_opts.MQTTVersion = MQTTVERSION_5;
- conn_opts.cleanstart = 1;
-
- createOpts.MQTTVersion = MQTTVERSION_5;
-#else
- conn_opts.cleansession = 1;
-#endif
-
- MQTTClient_setTraceCallback(trace_callback);
- MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
-
-
- rc = MQTTClient_createWithOptions(
- &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
- if (rc != MQTTCLIENT_SUCCESS)
- {{
- LogError("MQTT Failed to create client, return code %d\\n", rc);
- return rc;
- }}
-
- rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
- if (rc != MQTTCLIENT_SUCCESS)
- {{
- LogError("MQTT Failed to set callbacks, return code %d\\n", rc);
- return rc;
- }}
-
- rc = _connect_mqtt();
-
- if (rc != MQTTCLIENT_SUCCESS) {{
- LogError("MQTT Connect Failed, return code %d\\n", rc);
- return rc;
- }}
-
-{init}
-
- /* TODO start publish thread */
- rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
-
- return 0;
-}}
-
-#define READ_VALUE(c_loc_name, C_type) \\
- if(MQTT_##c_loc_name##_state == CHANGED){{ \\
- /* TODO care about endianess */ \\
- PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \\
- MQTT_##c_loc_name##_state = UNCHANGED; \\
- }}
-
-void __retrieve_{locstr}(void)
-{{
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
-{retrieve}
- pthread_mutex_unlock(&MQTT_mutex);
- }}
-}}
-
-#define WRITE_VALUE(c_loc_name, C_type) \\
- /* TODO care about endianess */ \\
- if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\
- MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\
- MQTT_##c_loc_name##_state = CHANGED; \\
- MQTT_any_pub_var_changed = 1; \\
- }}
-
-void __publish_{locstr}(void)
-{{
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
- MQTT_any_pub_var_changed = 0;
- /* copy PLC_* variables to MQTT_*, and mark those who changed */
-{publish}
- /* if any change detcted, unblock publish thread */
- if(MQTT_any_pub_var_changed){{
- pthread_cond_signal(&MQTT_new_data);
- }}
- pthread_mutex_unlock(&MQTT_mutex);
- }} else {{
- /* TODO if couldn't lock mutex set status variable accordingly */
- }}
-}}
-
-"""
+ c_template_filepath = paths.AbsNeighbourFile(__file__, "mqtt_template.c")
+ c_template_file = open(c_template_filepath , 'rb')
+ c_template = c_template_file.read()
+ c_template_file.close()
formatdict = dict(
locstr = locstr,
@@ -716,7 +365,7 @@
formatdict["retrieve"] += """
READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
- Ccode = template.format(**formatdict)
+ Ccode = c_template.format(**formatdict)
return Ccode
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mqtt/mqtt_template.c Mon Jul 22 16:09:12 2024 +0200
@@ -0,0 +1,343 @@
+/* code generated by beremiz MQTT extension */
+
+#include <stdint.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "MQTTClient.h"
+#include "MQTTClientPersistence.h"
+
+#define _Log(level, ...) \
+ {{ \
+ /* char mstr[256]; */ \
+ /* snprintf(mstr, 255, __VA_ARGS__); */ \
+ /* LogMessage(level, mstr, strlen(mstr)); */ \
+ printf(__VA_ARGS__); \
+ fflush(stdout); \
+ }}
+
+#define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
+#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
+#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
+
+void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
+{{
+ LogInfo("Paho MQTT Trace : %d, %s\n", level, message);
+}}
+
+#define CHANGED 1
+#define UNCHANGED 0
+
+#define DECL_VAR(iec_type, C_type, c_loc_name) \
+static C_type PLC_##c_loc_name##_buf = 0; \
+static C_type MQTT_##c_loc_name##_buf = 0; \
+static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \
+C_type *c_loc_name = &PLC_##c_loc_name##_buf;
+
+{decl}
+
+static MQTTClient client;
+#ifdef USE_MQTT_5
+static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
+#else
+static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
+#endif
+
+/* condition to quit publish thread */
+static int MQTT_stop_thread = 0;
+
+/* condition to wakeup publish thread */
+static int MQTT_any_pub_var_changed = 0;
+
+/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
+static pthread_mutex_t MQTT_mutex;
+
+/* wakeup publish thread when PLC changed published variable */
+static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
+
+/* publish thread */
+static pthread_t publishThread;
+
+#define INIT_TOPIC(topic, iec_type, c_loc_name) \
+{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}},
+
+static struct {{
+ const char *topic; //null terminated topic string
+ void *mqtt_pdata; // pointer to data from/for MQTT stack
+ int *mqtt_pchanged; // pointer to changed flag
+ __IEC_types_enum vartype;
+}} topics [] = {{
+{topics}
+}};
+
+static int _connect_mqtt(void)
+{{
+ int rc;
+
+#ifdef USE_MQTT_5
+ MQTTProperties props = MQTTProperties_initializer;
+ MQTTProperties willProps = MQTTProperties_initializer;
+ MQTTResponse response = MQTTResponse_initializer;
+
+ response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
+ rc = response.reasonCode;
+ MQTTResponse_free(response);
+#else
+ rc = MQTTClient_connect(client, &conn_opts);
+#endif
+
+ return rc;
+}}
+
+void __cleanup_{locstr}(void)
+{{
+ int rc;
+
+ /* stop publish thread */
+ MQTT_stop_thread = 1;
+ if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ /* unblock publish thread so that it can stop normally */
+ pthread_cond_signal(&MQTT_new_data);
+ pthread_mutex_unlock(&MQTT_mutex);
+ }}
+ pthread_join(publishThread, NULL);
+
+#ifdef USE_MQTT_5
+ if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
+#else
+ if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS)
+#endif
+ {{
+ LogError("MQTT Failed to disconnect, return code %d\n", rc);
+ }}
+ MQTTClient_destroy(&client);
+}}
+
+int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
+{{
+ int low = 0;
+ int size = sizeof(topics) / sizeof(topics[0]);
+ int high = size - 1;
+ int mid;
+
+ // bisect topic among subscribed topics
+ while (low <= high) {{
+ int res;
+ mid = low + (high - low) / 2;
+ res = strncmp(topics[mid].topic, topicName, topicLen);
+
+ // Check if key is present at mid
+ if (res == 0)
+ goto found;
+
+ // If key greater, ignore left half
+ if (res < 0)
+ low = mid + 1;
+
+ // If key is smaller, ignore right half
+ else
+ high = mid - 1;
+ }}
+ // If we reach here, then the element was not present
+ LogWarning("MQTT unknown topic: %s", topicName);
+ goto exit;
+
+found:
+ if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
+ memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
+ *topics[mid].mqtt_pchanged = 1;
+ }} else {{
+ LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.",
+ topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
+ }}
+exit:
+ MQTTClient_freeMessage(&message);
+ MQTTClient_free(topicName);
+ return 1;
+}}
+
+#define INIT_NoAuth() \
+ LogInfo("MQTT Init no auth\n");
+
+#define INIT_x509(PrivateKey, Certificate) \
+ LogInfo("MQTT Init x509 %s,%s\n", PrivateKey, Certificate);
+ /* TODO */
+
+#define INIT_UserPassword(User, Password) \
+ LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \
+ conn_opts.username = User; \
+ conn_opts.password = Password;
+
+#ifdef USE_MQTT_5
+#define _SUBSCRIBE(Topic, QoS) \
+ MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \
+ /* when using MQTT5 responce code is 1 for some reason even if no error */ \
+ rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \
+ MQTTResponse_free(response);
+#else
+#define _SUBSCRIBE(Topic, QoS) \
+ rc = MQTTClient_subscribe(client, #Topic, QoS);
+#endif
+
+#define INIT_SUBSCRIPTION(Topic, QoS) \
+ {{ \
+ int rc; \
+ _SUBSCRIBE(Topic, QoS) \
+ if (rc != MQTTCLIENT_SUCCESS) \
+ {{ \
+ LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \
+ }} \
+ }}
+
+
+#ifdef USE_MQTT_5
+#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \
+ MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \
+ &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \
+ rc = response.reasonCode; \
+ MQTTResponse_free(response);
+#else
+#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \
+ rc = MQTTClient_publish(client, #Topic, sizeof(C_type), \
+ &PLC_##c_loc_name##_buf, QoS, Retained, NULL);
+#endif
+
+#define INIT_PUBLICATION(Topic, QoS, C_type, c_loc_name, Retained) \
+ {{ \
+ int rc; \
+ _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \
+ if (rc != MQTTCLIENT_SUCCESS) \
+ {{ \
+ LogError("MQTT client failed to init publication of '%s', return code %d\n", #Topic, rc);\
+ /* TODO update status variable accordingly */ \
+ }} \
+ }}
+
+#define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \
+ if(MQTT_##c_loc_name##_state == CHANGED) \
+ {{ \
+ int rc; \
+ _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \
+ if (rc != MQTTCLIENT_SUCCESS) \
+ {{ \
+ LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \
+ /* TODO update status variable accordingly */ \
+ }} else {{ \
+ MQTT_##c_loc_name##_state = UNCHANGED; \
+ }} \
+ }}
+
+static void *__publish_thread(void *_unused) {{
+ int rc = 0;
+ while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
+ pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
+ if(MQTT_any_pub_var_changed && MQTTClient_isConnected(client)){{
+
+ /* publish changes, and reset variable's state to UNCHANGED */
+{publish_changes}
+ MQTT_any_pub_var_changed = 0;
+ }}
+
+ pthread_mutex_unlock(&MQTT_mutex);
+
+ if(MQTT_stop_thread) break;
+ }}
+
+ if(!MQTT_stop_thread){{
+ /* if thread exits outside of normal shutdown, report error*/
+ LogError("MQTT client thread exited unexpectedly, return code %d\n", rc);
+ }}
+}}
+
+int __init_{locstr}(int argc,char **argv)
+{{
+ char *uri = "{uri}";
+ char *clientID = "{clientID}";
+ int rc;
+
+ MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
+
+#ifdef USE_MQTT_5
+ conn_opts.MQTTVersion = MQTTVERSION_5;
+ conn_opts.cleanstart = 1;
+
+ createOpts.MQTTVersion = MQTTVERSION_5;
+#else
+ conn_opts.cleansession = 1;
+#endif
+
+ MQTTClient_setTraceCallback(trace_callback);
+ MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
+
+
+ rc = MQTTClient_createWithOptions(
+ &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
+ if (rc != MQTTCLIENT_SUCCESS)
+ {{
+ LogError("MQTT Failed to create client, return code %d\n", rc);
+ return rc;
+ }}
+
+ rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
+ if (rc != MQTTCLIENT_SUCCESS)
+ {{
+ LogError("MQTT Failed to set callbacks, return code %d\n", rc);
+ return rc;
+ }}
+
+ rc = _connect_mqtt();
+
+ if (rc != MQTTCLIENT_SUCCESS) {{
+ LogError("MQTT Connect Failed, return code %d\n", rc);
+ return rc;
+ }}
+
+{init}
+
+ /* TODO start publish thread */
+ rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
+
+ return 0;
+}}
+
+#define READ_VALUE(c_loc_name, C_type) \
+ if(MQTT_##c_loc_name##_state == CHANGED){{ \
+ /* TODO care about endianess */ \
+ PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \
+ MQTT_##c_loc_name##_state = UNCHANGED; \
+ }}
+
+void __retrieve_{locstr}(void)
+{{
+ if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+{retrieve}
+ pthread_mutex_unlock(&MQTT_mutex);
+ }}
+}}
+
+#define WRITE_VALUE(c_loc_name, C_type) \
+ /* TODO care about endianess */ \
+ if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \
+ MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \
+ MQTT_##c_loc_name##_state = CHANGED; \
+ MQTT_any_pub_var_changed = 1; \
+ }}
+
+void __publish_{locstr}(void)
+{{
+ if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ MQTT_any_pub_var_changed = 0;
+ /* copy PLC_* variables to MQTT_*, and mark those who changed */
+{publish}
+ /* if any change detcted, unblock publish thread */
+ if(MQTT_any_pub_var_changed){{
+ pthread_cond_signal(&MQTT_new_data);
+ }}
+ pthread_mutex_unlock(&MQTT_mutex);
+ }} else {{
+ /* TODO if couldn't lock mutex set status variable accordingly */
+ }}
+}}
+