--- a/mqtt/mqtt_template.c Mon Jul 22 16:13:27 2024 +0200
+++ b/mqtt/mqtt_template.c Tue Jul 23 11:05:46 2024 +0200
@@ -51,8 +51,11 @@
/* condition to wakeup publish thread */
static int MQTT_any_pub_var_changed = 0;
-/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
-static pthread_mutex_t MQTT_mutex;
+/* mutex to keep incoming PLC data consistent */
+static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */
+static pthread_mutex_t MQTT_publish_mutex = PTHREAD_MUTEX_INITIALIZER;
/* wakeup publish thread when PLC changed published variable */
static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
@@ -97,10 +100,10 @@
/* stop publish thread */
MQTT_stop_thread = 1;
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ if (pthread_mutex_lock(&MQTT_publish_mutex) == 0){{
/* unblock publish thread so that it can stop normally */
pthread_cond_signal(&MQTT_new_data);
- pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_publish_mutex);
}}
pthread_join(publishThread, NULL);
@@ -146,8 +149,11 @@
found:
if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
- memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
- *topics[mid].mqtt_pchanged = 1;
+ if (pthread_mutex_lock(&MQTT_retrieve_mutex) == 0){{
+ memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
+ *topics[mid].mqtt_pchanged = 1;
+ pthread_mutex_unlock(&MQTT_retrieve_mutex);
+ }}
}} else {{
LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.",
topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
@@ -231,8 +237,8 @@
static void *__publish_thread(void *_unused) {{
int rc = 0;
- while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
- pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
+ while((rc = pthread_mutex_lock(&MQTT_publish_mutex)) == 0 && !MQTT_stop_thread){{
+ pthread_cond_wait(&MQTT_new_data, &MQTT_publish_mutex);
{{
int is_connected = MQTTClient_isConnected(client);
if(MQTT_any_pub_var_changed && is_connected){{
@@ -249,7 +255,7 @@
}}
}}
- pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_publish_mutex);
if(MQTT_stop_thread) break;
}}
@@ -286,29 +292,37 @@
if (rc != MQTTCLIENT_SUCCESS)
{{
LogError("MQTT Failed to create client, return code %d\n", rc);
- return rc;
+ goto exit_error;
}}
rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
if (rc != MQTTCLIENT_SUCCESS)
{{
LogError("MQTT Failed to set callbacks, return code %d\n", rc);
- return rc;
+ goto exit_error;
}}
rc = _connect_mqtt();
-
if (rc != MQTTCLIENT_SUCCESS) {{
LogError("MQTT Connect Failed, return code %d\n", rc);
- return rc;
+ goto exit_error;
}}
{init}
- /* TODO start publish thread */
+ /* start publish thread */
+ MQTT_stop_thread = 0;
rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
+ if (rc != 0) {{
+ LogError("MQTT cannot create thread, return code %d\n", rc);
+ goto exit_error;
+ }}
return 0;
+
+exit_error:
+ MQTTClient_destroy(&client);
+ return rc;
}}
#define READ_VALUE(c_loc_name, C_type) \
@@ -320,9 +334,9 @@
void __retrieve_{locstr}(void)
{{
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ if (pthread_mutex_trylock(&MQTT_retrieve_mutex) == 0){{
{retrieve}
- pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_retrieve_mutex);
}}
}}
@@ -336,7 +350,7 @@
void __publish_{locstr}(void)
{{
- if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+ if (pthread_mutex_trylock(&MQTT_publish_mutex) == 0){{
MQTT_any_pub_var_changed = 0;
/* copy PLC_* variables to MQTT_*, and mark those who changed */
{publish}
@@ -344,7 +358,7 @@
if(MQTT_any_pub_var_changed){{
pthread_cond_signal(&MQTT_new_data);
}}
- pthread_mutex_unlock(&MQTT_mutex);
+ pthread_mutex_unlock(&MQTT_publish_mutex);
}} else {{
/* TODO if couldn't lock mutex set status variable accordingly */
}}