edouard@3995: /* code generated by beremiz MQTT extension */ edouard@3979: edouard@3984: #include edouard@3994: #include edouard@3984: #include edouard@3990: #include edouard@3994: #include edouard@3984: edouard@3984: #include "MQTTClient.h" edouard@3980: #include "MQTTClientPersistence.h" edouard@3979: edouard@3995: #define _Log(level, ...) \ edouard@3995: {{ \ edouard@3998: char mstr[256]; \ edouard@3998: snprintf(mstr, 255, __VA_ARGS__); \ edouard@3998: LogMessage(level, mstr, strlen(mstr)); \ edouard@3995: printf(__VA_ARGS__); \ edouard@3995: fflush(stdout); \ edouard@3979: }} edouard@3979: edouard@3979: #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); edouard@3979: #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); edouard@3979: #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); edouard@3979: edouard@3998: // Selected debug level for paho stack edouard@3998: // can be: edouard@3998: // MQTTCLIENT_TRACE_PROTOCOL, MQTTCLIENT_TRACE_MAXIMUM, MQTTCLIENT_TRACE_ERROR edouard@3998: #define MQTT_DEBUG_LEVEL MQTTCLIENT_TRACE_ERROR edouard@3998: edouard@3984: void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) edouard@3984: {{ edouard@3998: if(level >= MQTT_DEBUG_LEVEL) edouard@3998: {{ edouard@3998: int beremiz_log_level = (level >= MQTTCLIENT_TRACE_ERROR ) ? LOG_CRITICAL : edouard@3998: (level > MQTTCLIENT_TRACE_MINIMUM) ? LOG_WARNING : edouard@3998: LOG_INFO; edouard@3998: _Log(beremiz_log_level,"Paho MQTT Trace : %s\n", message); edouard@3998: }} edouard@3984: }} edouard@3984: edouard@3989: #define CHANGED 1 edouard@3989: #define UNCHANGED 0 edouard@3989: edouard@3995: #define DECL_VAR(iec_type, C_type, c_loc_name) \ edouard@3995: static C_type PLC_##c_loc_name##_buf = 0; \ edouard@3995: static C_type MQTT_##c_loc_name##_buf = 0; \ edouard@3995: static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \ edouard@3984: C_type *c_loc_name = &PLC_##c_loc_name##_buf; edouard@3984: edouard@3984: {decl} edouard@3984: edouard@3986: static MQTTClient client; edouard@3986: #ifdef USE_MQTT_5 edouard@3986: static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; edouard@3986: #else edouard@3986: static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; edouard@3986: #endif edouard@3989: edouard@4005: MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer; edouard@4005: edouard@3989: /* condition to quit publish thread */ edouard@3989: static int MQTT_stop_thread = 0; edouard@3989: edouard@3989: /* condition to wakeup publish thread */ edouard@3989: static int MQTT_any_pub_var_changed = 0; edouard@3989: edouard@4002: /* Keep track of connection state */ edouard@4002: static volatile int MQTT_is_disconnected = 1; edouard@4002: edouard@3997: /* mutex to keep incoming PLC data consistent */ edouard@3997: static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER; edouard@3997: edouard@3997: /* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */ edouard@3998: static pthread_mutex_t MQTT_thread_wakeup_mutex = PTHREAD_MUTEX_INITIALIZER; edouard@3989: edouard@3989: /* wakeup publish thread when PLC changed published variable */ edouard@3998: static pthread_cond_t MQTT_thread_wakeup = PTHREAD_COND_INITIALIZER; edouard@3998: edouard@3998: /* thread that handles publish and reconnection */ edouard@3998: static pthread_t MQTT_thread; edouard@3986: edouard@3995: #define INIT_TOPIC(topic, iec_type, c_loc_name) \ edouard@3990: {{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}}, edouard@3984: edouard@3984: static struct {{ edouard@3984: const char *topic; //null terminated topic string edouard@3990: void *mqtt_pdata; // pointer to data from/for MQTT stack edouard@3990: int *mqtt_pchanged; // pointer to changed flag edouard@3984: __IEC_types_enum vartype; edouard@3984: }} topics [] = {{ edouard@3984: {topics} edouard@3984: }}; edouard@3984: edouard@3984: void __cleanup_{locstr}(void) edouard@3984: {{ edouard@3984: int rc; edouard@3984: edouard@3993: /* stop publish thread */ edouard@3993: MQTT_stop_thread = 1; edouard@3998: if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{ edouard@3993: /* unblock publish thread so that it can stop normally */ edouard@3998: pthread_cond_signal(&MQTT_thread_wakeup); edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3998: }} edouard@3998: pthread_join(MQTT_thread, NULL); edouard@3984: edouard@3986: #ifdef USE_MQTT_5 edouard@3984: if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) edouard@3986: #else edouard@3986: if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS) edouard@3986: #endif edouard@3984: {{ edouard@3995: LogError("MQTT Failed to disconnect, return code %d\n", rc); edouard@3979: }} edouard@3984: MQTTClient_destroy(&client); edouard@3984: }} edouard@3984: edouard@3998: void connectionLost(void* context, char* reason) edouard@3998: {{ edouard@3998: int rc; edouard@3998: LogWarning("ConnectionLost, reconnecting\\n"); edouard@3998: if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{ edouard@3998: /* unblock publish thread so that it can reconnect */ edouard@4002: MQTT_is_disconnected = 1; edouard@3998: pthread_cond_signal(&MQTT_thread_wakeup); edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3998: }} edouard@3998: }} edouard@3998: edouard@3998: edouard@3998: edouard@3984: int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) edouard@3984: {{ edouard@3990: int low = 0; edouard@3990: int size = sizeof(topics) / sizeof(topics[0]); edouard@3990: int high = size - 1; edouard@3990: int mid; edouard@3990: edouard@3990: // bisect topic among subscribed topics edouard@3990: while (low <= high) {{ edouard@3990: int res; edouard@3990: mid = low + (high - low) / 2; edouard@3990: res = strncmp(topics[mid].topic, topicName, topicLen); edouard@3990: edouard@3990: // Check if key is present at mid edouard@3990: if (res == 0) edouard@3990: goto found; edouard@3990: edouard@3990: // If key greater, ignore left half edouard@3990: if (res < 0) edouard@3990: low = mid + 1; edouard@3990: edouard@3990: // If key is smaller, ignore right half edouard@3990: else edouard@3990: high = mid - 1; edouard@3990: }} edouard@3990: // If we reach here, then the element was not present edouard@3990: LogWarning("MQTT unknown topic: %s", topicName); edouard@3990: goto exit; edouard@3990: edouard@3990: found: edouard@3990: if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{ edouard@3997: if (pthread_mutex_lock(&MQTT_retrieve_mutex) == 0){{ edouard@3997: memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); edouard@3997: *topics[mid].mqtt_pchanged = 1; edouard@3997: pthread_mutex_unlock(&MQTT_retrieve_mutex); edouard@3997: }} edouard@3990: }} else {{ edouard@3990: LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", edouard@3990: topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen); edouard@3990: }} edouard@3990: exit: edouard@3984: MQTTClient_freeMessage(&message); edouard@3984: MQTTClient_free(topicName); edouard@3984: return 1; edouard@3984: }} edouard@3984: edouard@3995: #define INIT_NoAuth() \ edouard@3995: LogInfo("MQTT Init no auth\n"); edouard@3995: edouard@4005: #define INIT_x509(Verify, KeyStore, TrustStore) \ edouard@4005: LogInfo("MQTT Init x509 with %s,%s\n", KeyStore, TrustStore) \ edouard@4005: ssl_opts.verify = Verify; \ edouard@4005: ssl_opts.keyStore = KeyStore; \ edouard@4005: ssl_opts.trustStore = TrustStore; \ edouard@4005: conn_opts.ssl = &ssl_opts; edouard@4005: edouard@4005: #define INIT_PSK(Secret, ID) \ edouard@4005: LogError("MQTT PSK NOT IMPLEMENTED\n") \ edouard@4005: /* LogInfo("MQTT Init PSK for ID %s\n", ID) */ \ edouard@4005: /* ssl_opts.ssl_psk_cb = TODO; */ \ edouard@4005: /* ssl_opts.ssl_psk_context = TODO; */ \ edouard@4005: conn_opts.ssl = &ssl_opts; edouard@3984: edouard@3995: #define INIT_UserPassword(User, Password) \ edouard@3998: LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \ edouard@3995: conn_opts.username = User; \ edouard@3990: conn_opts.password = Password; edouard@3984: edouard@3986: #ifdef USE_MQTT_5 edouard@3995: #define _SUBSCRIBE(Topic, QoS) \ edouard@3995: MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \ edouard@3995: /* when using MQTT5 responce code is 1 for some reason even if no error */ \ edouard@3995: rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \ edouard@3986: MQTTResponse_free(response); edouard@3986: #else edouard@3995: #define _SUBSCRIBE(Topic, QoS) \ edouard@3986: rc = MQTTClient_subscribe(client, #Topic, QoS); edouard@3986: #endif edouard@3986: edouard@3995: #define INIT_SUBSCRIPTION(Topic, QoS) \ edouard@3995: {{ \ edouard@3995: int rc; \ edouard@3998: _SUBSCRIBE(Topic, QoS) \ edouard@3995: if (rc != MQTTCLIENT_SUCCESS) \ edouard@3995: {{ \ edouard@3998: LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \ edouard@3995: }} \ edouard@3995: }} edouard@3995: edouard@3995: edouard@3995: #ifdef USE_MQTT_5 edouard@3995: #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \ edouard@3998: &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \ edouard@3995: rc = response.reasonCode; \ edouard@3987: MQTTResponse_free(response); edouard@3987: #else edouard@3995: #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: rc = MQTTClient_publish(client, #Topic, sizeof(C_type), \ edouard@3987: &PLC_##c_loc_name##_buf, QoS, Retained, NULL); edouard@3987: #endif edouard@3987: edouard@3995: #define INIT_PUBLICATION(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: {{ \ edouard@3995: int rc; \ edouard@3995: _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: if (rc != MQTTCLIENT_SUCCESS) \ edouard@3995: {{ \ edouard@3995: LogError("MQTT client failed to init publication of '%s', return code %d\n", #Topic, rc);\ edouard@3995: /* TODO update status variable accordingly */ \ edouard@3995: }} \ edouard@3995: }} edouard@3995: edouard@3995: #define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: if(MQTT_##c_loc_name##_state == CHANGED) \ edouard@3995: {{ \ edouard@3995: int rc; \ edouard@3995: _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: if (rc != MQTTCLIENT_SUCCESS) \ edouard@3995: {{ \ edouard@3998: LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \ edouard@3995: /* TODO update status variable accordingly */ \ edouard@3995: }} else {{ \ edouard@3995: MQTT_##c_loc_name##_state = UNCHANGED; \ edouard@3995: }} \ edouard@3989: }} edouard@3989: edouard@3998: static int _connect_mqtt(void) edouard@3998: {{ edouard@3998: int rc; edouard@3998: edouard@3998: #ifdef USE_MQTT_5 edouard@3998: MQTTProperties props = MQTTProperties_initializer; edouard@3998: MQTTProperties willProps = MQTTProperties_initializer; edouard@3998: MQTTResponse response = MQTTResponse_initializer; edouard@3998: edouard@3998: response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); edouard@3998: rc = response.reasonCode; edouard@3998: MQTTResponse_free(response); edouard@3998: #else edouard@3998: rc = MQTTClient_connect(client, &conn_opts); edouard@3998: #endif edouard@3998: edouard@3998: if (rc != MQTTCLIENT_SUCCESS) {{ edouard@4002: MQTT_is_disconnected = 1; edouard@3998: return rc; edouard@4002: }}else{{ edouard@4002: MQTT_is_disconnected = 0; edouard@3998: }} edouard@3998: edouard@3998: {init_pubsub} edouard@3998: edouard@3998: return MQTTCLIENT_SUCCESS; edouard@3998: }} edouard@3998: edouard@3998: static void *__MQTT_thread_proc(void *_unused) {{ edouard@3989: int rc = 0; edouard@3998: edouard@3998: while((rc = pthread_mutex_lock(&MQTT_thread_wakeup_mutex)) == 0 && !MQTT_stop_thread){{ edouard@3998: int do_publish; edouard@3998: edouard@3998: pthread_cond_wait(&MQTT_thread_wakeup, &MQTT_thread_wakeup_mutex); edouard@4002: edouard@4002: if(MQTT_is_disconnected) edouard@4002: {{ edouard@4002: /* TODO growing retry delay */ edouard@4002: /* TODO max retry delay as config parameter */ edouard@4002: sleep(5); edouard@4002: rc = _connect_mqtt(); edouard@4002: if (rc == MQTTCLIENT_SUCCESS) {{ edouard@4002: LogInfo("MQTT Reconnected\n"); edouard@4002: }} else {{ edouard@4002: LogError("MQTT Reconnect Failed, return code %d\n", rc); edouard@4002: }} edouard@4002: }} edouard@4002: if(!MQTT_is_disconnected && MQTT_any_pub_var_changed) edouard@3996: {{ edouard@3989: /* publish changes, and reset variable's state to UNCHANGED */ edouard@3989: {publish_changes} edouard@3998: MQTT_any_pub_var_changed = 0; edouard@3998: }} edouard@3998: edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3998: edouard@3993: if(MQTT_stop_thread) break; edouard@3989: }} edouard@3989: edouard@3989: if(!MQTT_stop_thread){{ edouard@3989: /* if thread exits outside of normal shutdown, report error*/ edouard@3995: LogError("MQTT client thread exited unexpectedly, return code %d\n", rc); edouard@3989: }} edouard@3989: }} edouard@3989: edouard@3979: int __init_{locstr}(int argc,char **argv) edouard@3979: {{ edouard@3979: char *uri = "{uri}"; edouard@3980: char *clientID = "{clientID}"; edouard@3980: int rc; edouard@3984: edouard@3990: MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; edouard@3984: edouard@3986: #ifdef USE_MQTT_5 edouard@3990: conn_opts.MQTTVersion = MQTTVERSION_5; edouard@3984: conn_opts.cleanstart = 1; edouard@3984: edouard@3984: createOpts.MQTTVersion = MQTTVERSION_5; edouard@3986: #else edouard@3990: conn_opts.cleansession = 1; edouard@3986: #endif edouard@3984: edouard@3984: MQTTClient_setTraceCallback(trace_callback); edouard@3998: MQTTClient_setTraceLevel(MQTT_DEBUG_LEVEL); edouard@3984: edouard@3990: rc = MQTTClient_createWithOptions( edouard@3984: &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); edouard@3990: if (rc != MQTTCLIENT_SUCCESS) edouard@3990: {{ edouard@3995: LogError("MQTT Failed to create client, return code %d\n", rc); edouard@3997: goto exit_error; edouard@3984: }} edouard@3984: edouard@3998: rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); edouard@3990: if (rc != MQTTCLIENT_SUCCESS) edouard@3990: {{ edouard@3995: LogError("MQTT Failed to set callbacks, return code %d\n", rc); edouard@3997: goto exit_error; edouard@3990: }} edouard@3990: edouard@3998: {init} edouard@3998: edouard@3990: rc = _connect_mqtt(); edouard@3998: if (rc == MQTTCLIENT_SUCCESS) {{ edouard@3998: LogInfo("MQTT Connected\n"); edouard@3998: }} else {{ edouard@3995: LogError("MQTT Connect Failed, return code %d\n", rc); edouard@3998: // Connect error at init is fine, publish thread will retry later edouard@3998: }} edouard@3998: edouard@3998: /* start MQTT thread */ edouard@3997: MQTT_stop_thread = 0; edouard@3998: rc = pthread_create(&MQTT_thread, NULL, &__MQTT_thread_proc, NULL); edouard@3997: if (rc != 0) {{ edouard@3997: LogError("MQTT cannot create thread, return code %d\n", rc); edouard@3997: goto exit_error; edouard@3997: }} edouard@3984: edouard@3979: return 0; edouard@3997: edouard@3997: exit_error: edouard@3997: MQTTClient_destroy(&client); edouard@3997: return rc; edouard@3979: }} edouard@3979: edouard@3995: #define READ_VALUE(c_loc_name, C_type) \ edouard@3995: if(MQTT_##c_loc_name##_state == CHANGED){{ \ edouard@3995: /* TODO care about endianess */ \ edouard@3995: PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \ edouard@3995: MQTT_##c_loc_name##_state = UNCHANGED; \ edouard@3990: }} edouard@3979: edouard@3979: void __retrieve_{locstr}(void) edouard@3979: {{ edouard@3997: if (pthread_mutex_trylock(&MQTT_retrieve_mutex) == 0){{ edouard@3979: {retrieve} edouard@3997: pthread_mutex_unlock(&MQTT_retrieve_mutex); edouard@3990: }} edouard@3984: }} edouard@3984: edouard@3995: #define WRITE_VALUE(c_loc_name, C_type) \ edouard@3995: /* TODO care about endianess */ \ edouard@3995: if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \ edouard@3995: MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \ edouard@3995: MQTT_##c_loc_name##_state = CHANGED; \ edouard@3995: MQTT_any_pub_var_changed = 1; \ edouard@3989: }} edouard@3979: edouard@3979: void __publish_{locstr}(void) edouard@3979: {{ edouard@3998: if (pthread_mutex_trylock(&MQTT_thread_wakeup_mutex) == 0){{ edouard@3989: MQTT_any_pub_var_changed = 0; edouard@3989: /* copy PLC_* variables to MQTT_*, and mark those who changed */ edouard@3979: {publish} edouard@3989: /* if any change detcted, unblock publish thread */ edouard@4002: if(MQTT_any_pub_var_changed || MQTT_is_disconnected){{ edouard@3998: pthread_cond_signal(&MQTT_thread_wakeup); edouard@3989: }} edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3989: }} else {{ edouard@3989: /* TODO if couldn't lock mutex set status variable accordingly */ edouard@3989: }} edouard@3979: }} edouard@3979: