# HG changeset patch # User Edouard Tisserant # Date 1721725546 -7200 # Node ID 364e09f5faea507fe0e4aab2d4cf19ce44a35ecf # Parent 4eb23bb4bc2fd3c69e07179f9c9e37d6dda14f45 MQTT: WIP, destroys paho client during failure handling at init and at cleanup. Use mutex to protect retrieve. diff -r 4eb23bb4bc2f -r 364e09f5faea mqtt/mqtt_template.c --- a/mqtt/mqtt_template.c Mon Jul 22 16:13:27 2024 +0200 +++ b/mqtt/mqtt_template.c Tue Jul 23 11:05:46 2024 +0200 @@ -51,8 +51,11 @@ /* condition to wakeup publish thread */ static int MQTT_any_pub_var_changed = 0; -/* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */ -static pthread_mutex_t MQTT_mutex; +/* mutex to keep incoming PLC data consistent */ +static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */ +static pthread_mutex_t MQTT_publish_mutex = PTHREAD_MUTEX_INITIALIZER; /* wakeup publish thread when PLC changed published variable */ static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; @@ -97,10 +100,10 @@ /* stop publish thread */ MQTT_stop_thread = 1; - if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + if (pthread_mutex_lock(&MQTT_publish_mutex) == 0){{ /* unblock publish thread so that it can stop normally */ pthread_cond_signal(&MQTT_new_data); - pthread_mutex_unlock(&MQTT_mutex); + pthread_mutex_unlock(&MQTT_publish_mutex); }} pthread_join(publishThread, NULL); @@ -146,8 +149,11 @@ found: if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{ - memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); - *topics[mid].mqtt_pchanged = 1; + if (pthread_mutex_lock(&MQTT_retrieve_mutex) == 0){{ + memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); + *topics[mid].mqtt_pchanged = 1; + pthread_mutex_unlock(&MQTT_retrieve_mutex); + }} }} else {{ LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen); @@ -231,8 +237,8 @@ static void *__publish_thread(void *_unused) {{ int rc = 0; - while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{ - pthread_cond_wait(&MQTT_new_data, &MQTT_mutex); + while((rc = pthread_mutex_lock(&MQTT_publish_mutex)) == 0 && !MQTT_stop_thread){{ + pthread_cond_wait(&MQTT_new_data, &MQTT_publish_mutex); {{ int is_connected = MQTTClient_isConnected(client); if(MQTT_any_pub_var_changed && is_connected){{ @@ -249,7 +255,7 @@ }} }} - pthread_mutex_unlock(&MQTT_mutex); + pthread_mutex_unlock(&MQTT_publish_mutex); if(MQTT_stop_thread) break; }} @@ -286,29 +292,37 @@ if (rc != MQTTCLIENT_SUCCESS) {{ LogError("MQTT Failed to create client, return code %d\n", rc); - return rc; + goto exit_error; }} rc = MQTTClient_setCallbacks(client, NULL, NULL, messageArrived, NULL); if (rc != MQTTCLIENT_SUCCESS) {{ LogError("MQTT Failed to set callbacks, return code %d\n", rc); - return rc; + goto exit_error; }} rc = _connect_mqtt(); - if (rc != MQTTCLIENT_SUCCESS) {{ LogError("MQTT Connect Failed, return code %d\n", rc); - return rc; + goto exit_error; }} {init} - /* TODO start publish thread */ + /* start publish thread */ + MQTT_stop_thread = 0; rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL); + if (rc != 0) {{ + LogError("MQTT cannot create thread, return code %d\n", rc); + goto exit_error; + }} return 0; + +exit_error: + MQTTClient_destroy(&client); + return rc; }} #define READ_VALUE(c_loc_name, C_type) \ @@ -320,9 +334,9 @@ void __retrieve_{locstr}(void) {{ - if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + if (pthread_mutex_trylock(&MQTT_retrieve_mutex) == 0){{ {retrieve} - pthread_mutex_unlock(&MQTT_mutex); + pthread_mutex_unlock(&MQTT_retrieve_mutex); }} }} @@ -336,7 +350,7 @@ void __publish_{locstr}(void) {{ - if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ + if (pthread_mutex_trylock(&MQTT_publish_mutex) == 0){{ MQTT_any_pub_var_changed = 0; /* copy PLC_* variables to MQTT_*, and mark those who changed */ {publish} @@ -344,7 +358,7 @@ if(MQTT_any_pub_var_changed){{ pthread_cond_signal(&MQTT_new_data); }} - pthread_mutex_unlock(&MQTT_mutex); + pthread_mutex_unlock(&MQTT_publish_mutex); }} else {{ /* TODO if couldn't lock mutex set status variable accordingly */ }}