# HG changeset patch # User Edouard Tisserant # Date 1721741404 -7200 # Node ID 0145c60b956076e6e79eb5fd592f8e083910ef0a # Parent 364e09f5faea507fe0e4aab2d4cf19ce44a35ecf MQTT: WIP, PLC starts even if MQTT client can't connect broker, and publish+subscribe again when reconnecting. diff -r 364e09f5faea -r 0145c60b9560 mqtt/mqtt_client_gen.py --- a/mqtt/mqtt_client_gen.py Tue Jul 23 11:05:46 2024 +0200 +++ b/mqtt/mqtt_client_gen.py Tue Jul 23 15:30:04 2024 +0200 @@ -314,6 +314,7 @@ topics = "", cleanup = "", init = "", + init_pubsub = "", retrieve = "", publish = "", publish_changes = "" @@ -344,7 +345,7 @@ formatdict["decl"] += """ DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals()) - formatdict["init"] += """ + formatdict["init_pubsub"] += """ INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) formatdict["publish"] += """ WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) @@ -360,7 +361,7 @@ DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals()) formatdict["topics"] += """ INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals()) - formatdict["init"] += """ + formatdict["init_pubsub"] += """ INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals()) formatdict["retrieve"] += """ READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) diff -r 364e09f5faea -r 0145c60b9560 mqtt/mqtt_template.c --- a/mqtt/mqtt_template.c Tue Jul 23 11:05:46 2024 +0200 +++ b/mqtt/mqtt_template.c Tue Jul 23 15:30:04 2024 +0200 @@ -11,9 +11,9 @@ #define _Log(level, ...) \ {{ \ - /* char mstr[256]; */ \ - /* snprintf(mstr, 255, __VA_ARGS__); */ \ - /* LogMessage(level, mstr, strlen(mstr)); */ \ + char mstr[256]; \ + snprintf(mstr, 255, __VA_ARGS__); \ + LogMessage(level, mstr, strlen(mstr)); \ printf(__VA_ARGS__); \ fflush(stdout); \ }} @@ -22,9 +22,20 @@ #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); +// Selected debug level for paho stack +// can be: +// MQTTCLIENT_TRACE_PROTOCOL, MQTTCLIENT_TRACE_MAXIMUM, MQTTCLIENT_TRACE_ERROR +#define MQTT_DEBUG_LEVEL MQTTCLIENT_TRACE_ERROR + void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) {{ - LogInfo("Paho MQTT Trace : %d, %s\n", level, message); + if(level >= MQTT_DEBUG_LEVEL) + {{ + int beremiz_log_level = (level >= MQTTCLIENT_TRACE_ERROR ) ? LOG_CRITICAL : + (level > MQTTCLIENT_TRACE_MINIMUM) ? LOG_WARNING : + LOG_INFO; + _Log(beremiz_log_level,"Paho MQTT Trace : %s\n", message); + }} }} #define CHANGED 1 @@ -55,13 +66,13 @@ static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER; /* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */ -static pthread_mutex_t MQTT_publish_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t MQTT_thread_wakeup_mutex = PTHREAD_MUTEX_INITIALIZER; /* wakeup publish thread when PLC changed published variable */ -static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; - -/* publish thread */ -static pthread_t publishThread; +static pthread_cond_t MQTT_thread_wakeup = PTHREAD_COND_INITIALIZER; + +/* thread that handles publish and reconnection */ +static pthread_t MQTT_thread; #define INIT_TOPIC(topic, iec_type, c_loc_name) \ {{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}}, @@ -75,37 +86,18 @@ {topics} }}; -static int _connect_mqtt(void) -{{ - int rc; - -#ifdef USE_MQTT_5 - MQTTProperties props = MQTTProperties_initializer; - MQTTProperties willProps = MQTTProperties_initializer; - MQTTResponse response = MQTTResponse_initializer; - - response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); - rc = response.reasonCode; - MQTTResponse_free(response); -#else - rc = MQTTClient_connect(client, &conn_opts); -#endif - - return rc; -}} - void __cleanup_{locstr}(void) {{ int rc; /* stop publish thread */ MQTT_stop_thread = 1; - if (pthread_mutex_lock(&MQTT_publish_mutex) == 0){{ + if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{ /* unblock publish thread so that it can stop normally */ - pthread_cond_signal(&MQTT_new_data); - pthread_mutex_unlock(&MQTT_publish_mutex); - }} - pthread_join(publishThread, NULL); + pthread_cond_signal(&MQTT_thread_wakeup); + pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); + }} + pthread_join(MQTT_thread, NULL); #ifdef USE_MQTT_5 if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) @@ -118,6 +110,19 @@ MQTTClient_destroy(&client); }} +void connectionLost(void* context, char* reason) +{{ + int rc; + LogWarning("ConnectionLost, reconnecting\\n"); + if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{ + /* unblock publish thread so that it can reconnect */ + pthread_cond_signal(&MQTT_thread_wakeup); + pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); + }} +}} + + + int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) {{ int low = 0; @@ -172,7 +177,7 @@ /* TODO */ #define INIT_UserPassword(User, Password) \ - LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \ + LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \ conn_opts.username = User; \ conn_opts.password = Password; @@ -190,10 +195,10 @@ #define INIT_SUBSCRIPTION(Topic, QoS) \ {{ \ int rc; \ - _SUBSCRIBE(Topic, QoS) \ + _SUBSCRIBE(Topic, QoS) \ if (rc != MQTTCLIENT_SUCCESS) \ {{ \ - LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \ + LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \ }} \ }} @@ -201,7 +206,7 @@ #ifdef USE_MQTT_5 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \ - &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \ + &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \ rc = response.reasonCode; \ MQTTResponse_free(response); #else @@ -228,35 +233,68 @@ _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ if (rc != MQTTCLIENT_SUCCESS) \ {{ \ - LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \ + LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \ /* TODO update status variable accordingly */ \ }} else {{ \ MQTT_##c_loc_name##_state = UNCHANGED; \ }} \ }} -static void *__publish_thread(void *_unused) {{ +static int _connect_mqtt(void) +{{ + int rc; + +#ifdef USE_MQTT_5 + MQTTProperties props = MQTTProperties_initializer; + MQTTProperties willProps = MQTTProperties_initializer; + MQTTResponse response = MQTTResponse_initializer; + + response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); + rc = response.reasonCode; + MQTTResponse_free(response); +#else + rc = MQTTClient_connect(client, &conn_opts); +#endif + + if (rc != MQTTCLIENT_SUCCESS) {{ + return rc; + }} + +{init_pubsub} + + return MQTTCLIENT_SUCCESS; +}} + +static void *__MQTT_thread_proc(void *_unused) {{ int rc = 0; - while((rc = pthread_mutex_lock(&MQTT_publish_mutex)) == 0 && !MQTT_stop_thread){{ - pthread_cond_wait(&MQTT_new_data, &MQTT_publish_mutex); + + while((rc = pthread_mutex_lock(&MQTT_thread_wakeup_mutex)) == 0 && !MQTT_stop_thread){{ + int do_publish; + int is_connected; + + pthread_cond_wait(&MQTT_thread_wakeup, &MQTT_thread_wakeup_mutex); + is_connected = MQTTClient_isConnected(client); + do_publish = MQTT_any_pub_var_changed && is_connected; + + if(do_publish) {{ - int is_connected = MQTTClient_isConnected(client); - if(MQTT_any_pub_var_changed && is_connected){{ - /* publish changes, and reset variable's state to UNCHANGED */ {publish_changes} - MQTT_any_pub_var_changed = 0; - }} else if(!is_connected){{ - rc = _connect_mqtt(); - if (rc != MQTTCLIENT_SUCCESS) {{ - LogError("MQTT Reconnect Failed, return code %d\n", rc); - sleep(5); - }} + MQTT_any_pub_var_changed = 0; + }} + + pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); + + if(!is_connected) {{ + rc = _connect_mqtt(); + if (rc == MQTTCLIENT_SUCCESS) {{ + LogInfo("MQTT Reconnected\n"); + }} else {{ + LogError("MQTT Reconnect Failed, return code %d\n", rc); + sleep(5); }} }} - pthread_mutex_unlock(&MQTT_publish_mutex); - if(MQTT_stop_thread) break; }} @@ -284,8 +322,7 @@ #endif MQTTClient_setTraceCallback(trace_callback); - MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); - + MQTTClient_setTraceLevel(MQTT_DEBUG_LEVEL); rc = MQTTClient_createWithOptions( &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); @@ -295,24 +332,26 @@ goto exit_error; }} - rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL); + rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); if (rc != MQTTCLIENT_SUCCESS) {{ LogError("MQTT Failed to set callbacks, return code %d\n", rc); goto exit_error; }} +{init} + rc = _connect_mqtt(); - if (rc != MQTTCLIENT_SUCCESS) {{ + if (rc == MQTTCLIENT_SUCCESS) {{ + LogInfo("MQTT Connected\n"); + }} else {{ LogError("MQTT Connect Failed, return code %d\n", rc); - goto exit_error; - }} - -{init} - - /* start publish thread */ + // Connect error at init is fine, publish thread will retry later + }} + + /* start MQTT thread */ MQTT_stop_thread = 0; - rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL); + rc = pthread_create(&MQTT_thread, NULL, &__MQTT_thread_proc, NULL); if (rc != 0) {{ LogError("MQTT cannot create thread, return code %d\n", rc); goto exit_error; @@ -350,15 +389,15 @@ void __publish_{locstr}(void) {{ - if (pthread_mutex_trylock(&MQTT_publish_mutex) == 0){{ + if (pthread_mutex_trylock(&MQTT_thread_wakeup_mutex) == 0){{ MQTT_any_pub_var_changed = 0; /* copy PLC_* variables to MQTT_*, and mark those who changed */ {publish} /* if any change detcted, unblock publish thread */ if(MQTT_any_pub_var_changed){{ - pthread_cond_signal(&MQTT_new_data); + pthread_cond_signal(&MQTT_thread_wakeup); }} - pthread_mutex_unlock(&MQTT_publish_mutex); + pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); }} else {{ /* TODO if couldn't lock mutex set status variable accordingly */ }}