MQTT: WIP, PLC starts even if MQTT client can't connect broker, and publish+subscribe again when reconnecting.
authorEdouard Tisserant <edouard@beremiz.fr>
Tue, 23 Jul 2024 15:30:04 +0200 (6 months ago)
changeset 3998 0145c60b9560
parent 3997 364e09f5faea
child 3999 1479acf750e2
MQTT: WIP, PLC starts even if MQTT client can't connect broker, and publish+subscribe again when reconnecting.
mqtt/mqtt_client_gen.py
mqtt/mqtt_template.c
--- a/mqtt/mqtt_client_gen.py	Tue Jul 23 11:05:46 2024 +0200
+++ b/mqtt/mqtt_client_gen.py	Tue Jul 23 15:30:04 2024 +0200
@@ -314,6 +314,7 @@
             topics          = "",
             cleanup         = "",
             init            = "",
+            init_pubsub     = "",
             retrieve        = "",
             publish         = "",
             publish_changes = ""
@@ -344,7 +345,7 @@
 
             formatdict["decl"] += """
 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
-            formatdict["init"] += """
+            formatdict["init_pubsub"] += """
     INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
             formatdict["publish"] += """
         WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
@@ -360,7 +361,7 @@
 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
             formatdict["topics"] += """
     INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
-            formatdict["init"] += """
+            formatdict["init_pubsub"] += """
     INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals())
             formatdict["retrieve"] += """
         READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
--- a/mqtt/mqtt_template.c	Tue Jul 23 11:05:46 2024 +0200
+++ b/mqtt/mqtt_template.c	Tue Jul 23 15:30:04 2024 +0200
@@ -11,9 +11,9 @@
 
 #define _Log(level, ...)                                                                          \
     {{                                                                                            \
-        /* char mstr[256];                          */                                                 \
-        /* snprintf(mstr, 255, __VA_ARGS__);        */                                                 \
-        /* LogMessage(level, mstr, strlen(mstr));   */                                                 \
+        char mstr[256];                                                                           \
+        snprintf(mstr, 255, __VA_ARGS__);                                                         \
+        LogMessage(level, mstr, strlen(mstr));                                                    \
         printf(__VA_ARGS__);                                                                      \
         fflush(stdout);                                                                           \
     }}
@@ -22,9 +22,20 @@
 #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
 #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
 
+// Selected debug level for paho stack
+// can be:
+// MQTTCLIENT_TRACE_PROTOCOL, MQTTCLIENT_TRACE_MAXIMUM, MQTTCLIENT_TRACE_ERROR
+#define MQTT_DEBUG_LEVEL MQTTCLIENT_TRACE_ERROR
+
 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
 {{
-    LogInfo("Paho MQTT Trace : %d, %s\n", level, message);
+    if(level >= MQTT_DEBUG_LEVEL)
+    {{
+        int beremiz_log_level = (level >= MQTTCLIENT_TRACE_ERROR ) ? LOG_CRITICAL :
+                                (level > MQTTCLIENT_TRACE_MINIMUM) ? LOG_WARNING : 
+                                LOG_INFO;
+        _Log(beremiz_log_level,"Paho MQTT Trace : %s\n", message);
+    }}
 }}
 
 #define CHANGED 1
@@ -55,13 +66,13 @@
 static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 /* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */
-static pthread_mutex_t MQTT_publish_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t MQTT_thread_wakeup_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 /* wakeup publish thread when PLC changed published variable */
-static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
-
-/* publish thread */
-static pthread_t publishThread;
+static pthread_cond_t MQTT_thread_wakeup = PTHREAD_COND_INITIALIZER;
+
+/* thread that handles publish and reconnection */
+static pthread_t MQTT_thread;
 
 #define INIT_TOPIC(topic, iec_type, c_loc_name)                                                    \
 {{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}},
@@ -75,37 +86,18 @@
 {topics}
 }};
 
-static int _connect_mqtt(void)
-{{
-    int rc;
-
-#ifdef USE_MQTT_5
-    MQTTProperties props = MQTTProperties_initializer;
-    MQTTProperties willProps = MQTTProperties_initializer;
-    MQTTResponse response = MQTTResponse_initializer;
-
-    response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
-    rc = response.reasonCode;
-    MQTTResponse_free(response);
-#else
-    rc = MQTTClient_connect(client, &conn_opts);
-#endif
-
-    return rc;
-}}
-
 void __cleanup_{locstr}(void)
 {{
     int rc;
 
     /* stop publish thread */
     MQTT_stop_thread = 1;
-    if (pthread_mutex_lock(&MQTT_publish_mutex) == 0){{
+    if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{
         /* unblock publish thread so that it can stop normally */
-        pthread_cond_signal(&MQTT_new_data);
-        pthread_mutex_unlock(&MQTT_publish_mutex);
-    }}
-    pthread_join(publishThread, NULL);
+        pthread_cond_signal(&MQTT_thread_wakeup);
+        pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
+    }}
+    pthread_join(MQTT_thread, NULL);
 
 #ifdef USE_MQTT_5
     if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
@@ -118,6 +110,19 @@
     MQTTClient_destroy(&client);
 }}
 
+void connectionLost(void* context, char* reason)
+{{
+    int rc;
+    LogWarning("ConnectionLost, reconnecting\\n");
+    if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{
+        /* unblock publish thread so that it can reconnect */
+        pthread_cond_signal(&MQTT_thread_wakeup);
+        pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
+    }}
+}}
+
+
+
 int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
 {{
     int low = 0;
@@ -172,7 +177,7 @@
     /* TODO */
 
 #define INIT_UserPassword(User, Password)                                                         \
-    LogInfo("MQTT Init UserPassword %s,%s\n", User, Password);                                   \
+    LogInfo("MQTT Init UserPassword %s,%s\n", User, Password);                                    \
     conn_opts.username = User;                                                                    \
     conn_opts.password = Password;
 
@@ -190,10 +195,10 @@
 #define INIT_SUBSCRIPTION(Topic, QoS)                                                             \
     {{                                                                                            \
         int rc;                                                                                   \
-        _SUBSCRIBE(Topic, QoS)                                                                  \
+        _SUBSCRIBE(Topic, QoS)                                                                    \
         if (rc != MQTTCLIENT_SUCCESS)                                                             \
         {{                                                                                        \
-            LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc);   \
+            LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc);    \
         }}                                                                                        \
     }}
 
@@ -201,7 +206,7 @@
 #ifdef USE_MQTT_5
 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \
         MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type),               \
-            &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL);                                  \
+            &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL);                                 \
         rc = response.reasonCode;                                                                 \
         MQTTResponse_free(response);
 #else
@@ -228,35 +233,68 @@
         _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \
         if (rc != MQTTCLIENT_SUCCESS)                                                             \
         {{                                                                                        \
-            LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc);        \
+            LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc);         \
             /* TODO update status variable accordingly */                                         \
         }} else {{                                                                                \
             MQTT_##c_loc_name##_state = UNCHANGED;                                                \
         }}                                                                                        \
     }}
 
-static void *__publish_thread(void *_unused) {{
+static int _connect_mqtt(void)
+{{
+    int rc;
+
+#ifdef USE_MQTT_5
+    MQTTProperties props = MQTTProperties_initializer;
+    MQTTProperties willProps = MQTTProperties_initializer;
+    MQTTResponse response = MQTTResponse_initializer;
+
+    response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
+    rc = response.reasonCode;
+    MQTTResponse_free(response);
+#else
+    rc = MQTTClient_connect(client, &conn_opts);
+#endif
+
+    if (rc != MQTTCLIENT_SUCCESS) {{
+        return rc;
+    }}
+
+{init_pubsub}
+
+    return MQTTCLIENT_SUCCESS;
+}}
+
+static void *__MQTT_thread_proc(void *_unused) {{
     int rc = 0;
-    while((rc = pthread_mutex_lock(&MQTT_publish_mutex)) == 0 && !MQTT_stop_thread){{
-        pthread_cond_wait(&MQTT_new_data, &MQTT_publish_mutex);
+
+    while((rc = pthread_mutex_lock(&MQTT_thread_wakeup_mutex)) == 0 && !MQTT_stop_thread){{
+        int do_publish; 
+        int is_connected;
+
+        pthread_cond_wait(&MQTT_thread_wakeup, &MQTT_thread_wakeup_mutex);
+        is_connected = MQTTClient_isConnected(client);
+        do_publish = MQTT_any_pub_var_changed && is_connected; 
+
+        if(do_publish)
         {{
-            int is_connected = MQTTClient_isConnected(client);
-            if(MQTT_any_pub_var_changed && is_connected){{
-
             /* publish changes, and reset variable's state to UNCHANGED */
 {publish_changes}
-                MQTT_any_pub_var_changed = 0;
-            }} else if(!is_connected){{
-                rc = _connect_mqtt();
-                if (rc != MQTTCLIENT_SUCCESS) {{
-                    LogError("MQTT Reconnect Failed, return code %d\n", rc);
-                    sleep(5);
-                }}
+            MQTT_any_pub_var_changed = 0;
+        }}
+
+        pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
+
+        if(!is_connected) {{
+            rc = _connect_mqtt();
+            if (rc == MQTTCLIENT_SUCCESS) {{
+                LogInfo("MQTT Reconnected\n");
+            }} else {{
+                LogError("MQTT Reconnect Failed, return code %d\n", rc);
+                sleep(5);
             }}
         }}
 
-        pthread_mutex_unlock(&MQTT_publish_mutex);
-
         if(MQTT_stop_thread) break;
     }}
 
@@ -284,8 +322,7 @@
 #endif
 
     MQTTClient_setTraceCallback(trace_callback);
-    MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
-
+    MQTTClient_setTraceLevel(MQTT_DEBUG_LEVEL);
 
     rc = MQTTClient_createWithOptions(
         &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
@@ -295,24 +332,26 @@
         goto exit_error;
     }}
 
-    rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
+    rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
     if (rc != MQTTCLIENT_SUCCESS)
     {{
         LogError("MQTT Failed to set callbacks, return code %d\n", rc);
         goto exit_error;
     }}
 
+{init}
+
     rc = _connect_mqtt();
-    if (rc != MQTTCLIENT_SUCCESS) {{
+	if (rc == MQTTCLIENT_SUCCESS) {{
+		LogInfo("MQTT Connected\n");
+	}} else {{
         LogError("MQTT Connect Failed, return code %d\n", rc);
-        goto exit_error;
-    }}
-
-{init}
-
-    /* start publish thread */
+        // Connect error at init is fine, publish thread will retry later
+    }}
+
+    /* start MQTT thread */
     MQTT_stop_thread = 0;
-    rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
+    rc = pthread_create(&MQTT_thread, NULL, &__MQTT_thread_proc, NULL);
     if (rc != 0) {{
         LogError("MQTT cannot create thread, return code %d\n", rc);
         goto exit_error;
@@ -350,15 +389,15 @@
 
 void __publish_{locstr}(void)
 {{
-    if (pthread_mutex_trylock(&MQTT_publish_mutex) == 0){{
+    if (pthread_mutex_trylock(&MQTT_thread_wakeup_mutex) == 0){{
         MQTT_any_pub_var_changed = 0;
         /* copy PLC_* variables to MQTT_*, and mark those who changed */
 {publish}
         /* if any change detcted, unblock publish thread */
         if(MQTT_any_pub_var_changed){{
-            pthread_cond_signal(&MQTT_new_data);
+            pthread_cond_signal(&MQTT_thread_wakeup);
         }}
-        pthread_mutex_unlock(&MQTT_publish_mutex);
+        pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
     }} else {{
         /* TODO if couldn't lock mutex set status variable accordingly */ 
     }}