MQTT: WIP, destroys paho client during failure handling at init and at cleanup. Use mutex to protect retrieve.
authorEdouard Tisserant <edouard@beremiz.fr>
Tue, 23 Jul 2024 11:05:46 +0200
changeset 3997 364e09f5faea
parent 3996 4eb23bb4bc2f
child 3998 0145c60b9560
MQTT: WIP, destroys paho client during failure handling at init and at cleanup. Use mutex to protect retrieve.
mqtt/mqtt_template.c
--- a/mqtt/mqtt_template.c	Mon Jul 22 16:13:27 2024 +0200
+++ b/mqtt/mqtt_template.c	Tue Jul 23 11:05:46 2024 +0200
@@ -51,8 +51,11 @@
 /* condition to wakeup publish thread */
 static int MQTT_any_pub_var_changed = 0;
 
-/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
-static pthread_mutex_t MQTT_mutex;
+/* mutex to keep incoming PLC data consistent */
+static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */
+static pthread_mutex_t MQTT_publish_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 /* wakeup publish thread when PLC changed published variable */
 static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
@@ -97,10 +100,10 @@
 
     /* stop publish thread */
     MQTT_stop_thread = 1;
-    if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+    if (pthread_mutex_lock(&MQTT_publish_mutex) == 0){{
         /* unblock publish thread so that it can stop normally */
         pthread_cond_signal(&MQTT_new_data);
-        pthread_mutex_unlock(&MQTT_mutex);
+        pthread_mutex_unlock(&MQTT_publish_mutex);
     }}
     pthread_join(publishThread, NULL);
 
@@ -146,8 +149,11 @@
 
 found:
     if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
-        memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
-        *topics[mid].mqtt_pchanged = 1;
+        if (pthread_mutex_lock(&MQTT_retrieve_mutex) == 0){{
+            memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
+            *topics[mid].mqtt_pchanged = 1;
+            pthread_mutex_unlock(&MQTT_retrieve_mutex);
+        }}
     }} else {{
         LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", 
             topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
@@ -231,8 +237,8 @@
 
 static void *__publish_thread(void *_unused) {{
     int rc = 0;
-    while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
-        pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
+    while((rc = pthread_mutex_lock(&MQTT_publish_mutex)) == 0 && !MQTT_stop_thread){{
+        pthread_cond_wait(&MQTT_new_data, &MQTT_publish_mutex);
         {{
             int is_connected = MQTTClient_isConnected(client);
             if(MQTT_any_pub_var_changed && is_connected){{
@@ -249,7 +255,7 @@
             }}
         }}
 
-        pthread_mutex_unlock(&MQTT_mutex);
+        pthread_mutex_unlock(&MQTT_publish_mutex);
 
         if(MQTT_stop_thread) break;
     }}
@@ -286,29 +292,37 @@
     if (rc != MQTTCLIENT_SUCCESS)
     {{
         LogError("MQTT Failed to create client, return code %d\n", rc);
-        return rc;
+        goto exit_error;
     }}
 
     rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
     if (rc != MQTTCLIENT_SUCCESS)
     {{
         LogError("MQTT Failed to set callbacks, return code %d\n", rc);
-        return rc;
+        goto exit_error;
     }}
 
     rc = _connect_mqtt();
-
     if (rc != MQTTCLIENT_SUCCESS) {{
         LogError("MQTT Connect Failed, return code %d\n", rc);
-        return rc;
+        goto exit_error;
     }}
 
 {init}
 
-    /* TODO start publish thread */
+    /* start publish thread */
+    MQTT_stop_thread = 0;
     rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
+    if (rc != 0) {{
+        LogError("MQTT cannot create thread, return code %d\n", rc);
+        goto exit_error;
+    }}
 
     return 0;
+
+exit_error:
+    MQTTClient_destroy(&client);
+    return rc;
 }}
 
 #define READ_VALUE(c_loc_name, C_type) \
@@ -320,9 +334,9 @@
 
 void __retrieve_{locstr}(void)
 {{
-    if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+    if (pthread_mutex_trylock(&MQTT_retrieve_mutex) == 0){{
 {retrieve}
-        pthread_mutex_unlock(&MQTT_mutex);
+        pthread_mutex_unlock(&MQTT_retrieve_mutex);
     }}
 }}
 
@@ -336,7 +350,7 @@
 
 void __publish_{locstr}(void)
 {{
-    if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
+    if (pthread_mutex_trylock(&MQTT_publish_mutex) == 0){{
         MQTT_any_pub_var_changed = 0;
         /* copy PLC_* variables to MQTT_*, and mark those who changed */
 {publish}
@@ -344,7 +358,7 @@
         if(MQTT_any_pub_var_changed){{
             pthread_cond_signal(&MQTT_new_data);
         }}
-        pthread_mutex_unlock(&MQTT_mutex);
+        pthread_mutex_unlock(&MQTT_publish_mutex);
     }} else {{
         /* TODO if couldn't lock mutex set status variable accordingly */ 
     }}