--- a/mqtt/mqtt_client_gen.py Tue Jul 23 11:05:46 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Tue Jul 23 15:30:04 2024 +0200
@@ -314,6 +314,7 @@
topics = "",
cleanup = "",
init = "",
+ init_pubsub = "",
retrieve = "",
publish = "",
publish_changes = ""
@@ -344,7 +345,7 @@
formatdict["decl"] += """
DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
- formatdict["init"] += """
+ formatdict["init_pubsub"] += """
INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
formatdict["publish"] += """
WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
@@ -360,7 +361,7 @@
DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
formatdict["topics"] += """
INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
- formatdict["init"] += """
+ formatdict["init_pubsub"] += """
INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals())
formatdict["retrieve"] += """
READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
--- a/mqtt/mqtt_template.c Tue Jul 23 11:05:46 2024 +0200
+++ b/mqtt/mqtt_template.c Tue Jul 23 15:30:04 2024 +0200
@@ -11,9 +11,9 @@
#define _Log(level, ...) \
{{ \
- /* char mstr[256]; */ \
- /* snprintf(mstr, 255, __VA_ARGS__); */ \
- /* LogMessage(level, mstr, strlen(mstr)); */ \
+ char mstr[256]; \
+ snprintf(mstr, 255, __VA_ARGS__); \
+ LogMessage(level, mstr, strlen(mstr)); \
printf(__VA_ARGS__); \
fflush(stdout); \
}}
@@ -22,9 +22,20 @@
#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
+// Selected debug level for paho stack
+// can be:
+// MQTTCLIENT_TRACE_PROTOCOL, MQTTCLIENT_TRACE_MAXIMUM, MQTTCLIENT_TRACE_ERROR
+#define MQTT_DEBUG_LEVEL MQTTCLIENT_TRACE_ERROR
+
void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
{{
- LogInfo("Paho MQTT Trace : %d, %s\n", level, message);
+ if(level >= MQTT_DEBUG_LEVEL)
+ {{
+ int beremiz_log_level = (level >= MQTTCLIENT_TRACE_ERROR ) ? LOG_CRITICAL :
+ (level > MQTTCLIENT_TRACE_MINIMUM) ? LOG_WARNING :
+ LOG_INFO;
+ _Log(beremiz_log_level,"Paho MQTT Trace : %s\n", message);
+ }}
}}
#define CHANGED 1
@@ -55,13 +66,13 @@
static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER;
/* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */
-static pthread_mutex_t MQTT_publish_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t MQTT_thread_wakeup_mutex = PTHREAD_MUTEX_INITIALIZER;
/* wakeup publish thread when PLC changed published variable */
-static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
-
-/* publish thread */
-static pthread_t publishThread;
+static pthread_cond_t MQTT_thread_wakeup = PTHREAD_COND_INITIALIZER;
+
+/* thread that handles publish and reconnection */
+static pthread_t MQTT_thread;
#define INIT_TOPIC(topic, iec_type, c_loc_name) \
{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}},
@@ -75,37 +86,18 @@
{topics}
}};
-static int _connect_mqtt(void)
-{{
- int rc;
-
-#ifdef USE_MQTT_5
- MQTTProperties props = MQTTProperties_initializer;
- MQTTProperties willProps = MQTTProperties_initializer;
- MQTTResponse response = MQTTResponse_initializer;
-
- response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
- rc = response.reasonCode;
- MQTTResponse_free(response);
-#else
- rc = MQTTClient_connect(client, &conn_opts);
-#endif
-
- return rc;
-}}
-
void __cleanup_{locstr}(void)
{{
int rc;
/* stop publish thread */
MQTT_stop_thread = 1;
- if (pthread_mutex_lock(&MQTT_publish_mutex) == 0){{
+ if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{
/* unblock publish thread so that it can stop normally */
- pthread_cond_signal(&MQTT_new_data);
- pthread_mutex_unlock(&MQTT_publish_mutex);
- }}
- pthread_join(publishThread, NULL);
+ pthread_cond_signal(&MQTT_thread_wakeup);
+ pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
+ }}
+ pthread_join(MQTT_thread, NULL);
#ifdef USE_MQTT_5
if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
@@ -118,6 +110,19 @@
MQTTClient_destroy(&client);
}}
+void connectionLost(void* context, char* reason)
+{{
+ int rc;
+ LogWarning("ConnectionLost, reconnecting\\n");
+ if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{
+ /* unblock publish thread so that it can reconnect */
+ pthread_cond_signal(&MQTT_thread_wakeup);
+ pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
+ }}
+}}
+
+
+
int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{{
int low = 0;
@@ -172,7 +177,7 @@
/* TODO */
#define INIT_UserPassword(User, Password) \
- LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \
+ LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \
conn_opts.username = User; \
conn_opts.password = Password;
@@ -190,10 +195,10 @@
#define INIT_SUBSCRIPTION(Topic, QoS) \
{{ \
int rc; \
- _SUBSCRIBE(Topic, QoS) \
+ _SUBSCRIBE(Topic, QoS) \
if (rc != MQTTCLIENT_SUCCESS) \
{{ \
- LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \
+ LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \
}} \
}}
@@ -201,7 +206,7 @@
#ifdef USE_MQTT_5
#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \
MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \
- &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \
+ &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \
rc = response.reasonCode; \
MQTTResponse_free(response);
#else
@@ -228,35 +233,68 @@
_PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \
if (rc != MQTTCLIENT_SUCCESS) \
{{ \
- LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \
+ LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \
/* TODO update status variable accordingly */ \
}} else {{ \
MQTT_##c_loc_name##_state = UNCHANGED; \
}} \
}}
-static void *__publish_thread(void *_unused) {{
+static int _connect_mqtt(void)
+{{
+ int rc;
+
+#ifdef USE_MQTT_5
+ MQTTProperties props = MQTTProperties_initializer;
+ MQTTProperties willProps = MQTTProperties_initializer;
+ MQTTResponse response = MQTTResponse_initializer;
+
+ response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
+ rc = response.reasonCode;
+ MQTTResponse_free(response);
+#else
+ rc = MQTTClient_connect(client, &conn_opts);
+#endif
+
+ if (rc != MQTTCLIENT_SUCCESS) {{
+ return rc;
+ }}
+
+{init_pubsub}
+
+ return MQTTCLIENT_SUCCESS;
+}}
+
+static void *__MQTT_thread_proc(void *_unused) {{
int rc = 0;
- while((rc = pthread_mutex_lock(&MQTT_publish_mutex)) == 0 && !MQTT_stop_thread){{
- pthread_cond_wait(&MQTT_new_data, &MQTT_publish_mutex);
+
+ while((rc = pthread_mutex_lock(&MQTT_thread_wakeup_mutex)) == 0 && !MQTT_stop_thread){{
+ int do_publish;
+ int is_connected;
+
+ pthread_cond_wait(&MQTT_thread_wakeup, &MQTT_thread_wakeup_mutex);
+ is_connected = MQTTClient_isConnected(client);
+ do_publish = MQTT_any_pub_var_changed && is_connected;
+
+ if(do_publish)
{{
- int is_connected = MQTTClient_isConnected(client);
- if(MQTT_any_pub_var_changed && is_connected){{
-
/* publish changes, and reset variable's state to UNCHANGED */
{publish_changes}
- MQTT_any_pub_var_changed = 0;
- }} else if(!is_connected){{
- rc = _connect_mqtt();
- if (rc != MQTTCLIENT_SUCCESS) {{
- LogError("MQTT Reconnect Failed, return code %d\n", rc);
- sleep(5);
- }}
+ MQTT_any_pub_var_changed = 0;
+ }}
+
+ pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
+
+ if(!is_connected) {{
+ rc = _connect_mqtt();
+ if (rc == MQTTCLIENT_SUCCESS) {{
+ LogInfo("MQTT Reconnected\n");
+ }} else {{
+ LogError("MQTT Reconnect Failed, return code %d\n", rc);
+ sleep(5);
}}
}}
- pthread_mutex_unlock(&MQTT_publish_mutex);
-
if(MQTT_stop_thread) break;
}}
@@ -284,8 +322,7 @@
#endif
MQTTClient_setTraceCallback(trace_callback);
- MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
-
+ MQTTClient_setTraceLevel(MQTT_DEBUG_LEVEL);
rc = MQTTClient_createWithOptions(
&client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
@@ -295,24 +332,26 @@
goto exit_error;
}}
- rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL);
+ rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
if (rc != MQTTCLIENT_SUCCESS)
{{
LogError("MQTT Failed to set callbacks, return code %d\n", rc);
goto exit_error;
}}
+{init}
+
rc = _connect_mqtt();
- if (rc != MQTTCLIENT_SUCCESS) {{
+ if (rc == MQTTCLIENT_SUCCESS) {{
+ LogInfo("MQTT Connected\n");
+ }} else {{
LogError("MQTT Connect Failed, return code %d\n", rc);
- goto exit_error;
- }}
-
-{init}
-
- /* start publish thread */
+ // Connect error at init is fine, publish thread will retry later
+ }}
+
+ /* start MQTT thread */
MQTT_stop_thread = 0;
- rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
+ rc = pthread_create(&MQTT_thread, NULL, &__MQTT_thread_proc, NULL);
if (rc != 0) {{
LogError("MQTT cannot create thread, return code %d\n", rc);
goto exit_error;
@@ -350,15 +389,15 @@
void __publish_{locstr}(void)
{{
- if (pthread_mutex_trylock(&MQTT_publish_mutex) == 0){{
+ if (pthread_mutex_trylock(&MQTT_thread_wakeup_mutex) == 0){{
MQTT_any_pub_var_changed = 0;
/* copy PLC_* variables to MQTT_*, and mark those who changed */
{publish}
/* if any change detcted, unblock publish thread */
if(MQTT_any_pub_var_changed){{
- pthread_cond_signal(&MQTT_new_data);
+ pthread_cond_signal(&MQTT_thread_wakeup);
}}
- pthread_mutex_unlock(&MQTT_publish_mutex);
+ pthread_mutex_unlock(&MQTT_thread_wakeup_mutex);
}} else {{
/* TODO if couldn't lock mutex set status variable accordingly */
}}