MQTT: WIP, re-organized disconnection handling, now keep track of connection status and reconnect accordingly.
authorEdouard Tisserant <edouard@beremiz.fr>
Thu, 01 Aug 2024 12:11:11 +0200 (5 months ago)
changeset 4002 6c2b80b4515d
parent 4001 5e0660d394e3
child 4003 65aae40f81dd
MQTT: WIP, re-organized disconnection handling, now keep track of connection status and reconnect accordingly.

This fixes bug with non-reconnecting to broker when having no published variables.
mqtt/mqtt_template.c
--- a/mqtt/mqtt_template.c	Thu Aug 01 12:09:28 2024 +0200
+++ b/mqtt/mqtt_template.c	Thu Aug 01 12:11:11 2024 +0200
@@ -62,6 +62,9 @@
 /* condition to wakeup publish thread */
 static int MQTT_any_pub_var_changed = 0;
 
+/* Keep track of connection state */
+static volatile int MQTT_is_disconnected = 1;
+
 /* mutex to keep incoming PLC data consistent */
 static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER;
 
@@ -116,6 +119,7 @@
     LogWarning("ConnectionLost, reconnecting\\n");
     if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{
         /* unblock publish thread so that it can reconnect */
+        MQTT_is_disconnected = 1;
         pthread_cond_signal(&MQTT_thread_wakeup);
         pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
     }}
@@ -257,7 +261,10 @@
 #endif
 
     if (rc != MQTTCLIENT_SUCCESS) {{
+        MQTT_is_disconnected = 1;
         return rc;
+    }}else{{
+        MQTT_is_disconnected = 0;
     }}
 
 {init_pubsub}
@@ -270,13 +277,22 @@
 
     while((rc = pthread_mutex_lock(&MQTT_thread_wakeup_mutex)) == 0 && !MQTT_stop_thread){{
         int do_publish; 
-        int is_connected;
 
         pthread_cond_wait(&MQTT_thread_wakeup, &MQTT_thread_wakeup_mutex);
-        is_connected = MQTTClient_isConnected(client);
-        do_publish = MQTT_any_pub_var_changed && is_connected; 
-
-        if(do_publish)
+        
+        if(MQTT_is_disconnected)
+        {{
+            /* TODO growing retry delay */
+            /* TODO max retry delay as config parameter */
+            sleep(5);
+            rc = _connect_mqtt();
+            if (rc == MQTTCLIENT_SUCCESS) {{
+                LogInfo("MQTT Reconnected\n");
+            }} else {{
+                LogError("MQTT Reconnect Failed, return code %d\n", rc);
+            }}
+        }} 
+        if(!MQTT_is_disconnected && MQTT_any_pub_var_changed)
         {{
             /* publish changes, and reset variable's state to UNCHANGED */
 {publish_changes}
@@ -285,16 +301,6 @@
 
         pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
 
-        if(!is_connected) {{
-            rc = _connect_mqtt();
-            if (rc == MQTTCLIENT_SUCCESS) {{
-                LogInfo("MQTT Reconnected\n");
-            }} else {{
-                LogError("MQTT Reconnect Failed, return code %d\n", rc);
-                sleep(5);
-            }}
-        }}
-
         if(MQTT_stop_thread) break;
     }}
 
@@ -394,7 +400,7 @@
         /* copy PLC_* variables to MQTT_*, and mark those who changed */
 {publish}
         /* if any change detcted, unblock publish thread */
-        if(MQTT_any_pub_var_changed){{
+        if(MQTT_any_pub_var_changed || MQTT_is_disconnected){{
             pthread_cond_signal(&MQTT_thread_wakeup);
         }}
         pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);