edouard@3995: /* code generated by beremiz MQTT extension */ edouard@3979: edouard@3984: #include edouard@3994: #include edouard@3984: #include edouard@3990: #include edouard@3994: #include edouard@4012: #include edouard@4012: edouard@4012: #include "frozen.h" edouard@3984: edouard@3984: #include "MQTTClient.h" edouard@3980: #include "MQTTClientPersistence.h" edouard@3979: edouard@4012: #include "POUS.h" edouard@4012: edouard@3995: #define _Log(level, ...) \ edouard@3995: {{ \ edouard@3998: char mstr[256]; \ edouard@3998: snprintf(mstr, 255, __VA_ARGS__); \ edouard@3998: LogMessage(level, mstr, strlen(mstr)); \ edouard@3979: }} edouard@3979: edouard@3979: #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); edouard@3979: #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); edouard@3979: #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); edouard@3979: edouard@3998: // Selected debug level for paho stack edouard@3998: // can be: edouard@3998: // MQTTCLIENT_TRACE_PROTOCOL, MQTTCLIENT_TRACE_MAXIMUM, MQTTCLIENT_TRACE_ERROR edouard@3998: #define MQTT_DEBUG_LEVEL MQTTCLIENT_TRACE_ERROR edouard@3998: edouard@3984: void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) edouard@3984: {{ edouard@3998: if(level >= MQTT_DEBUG_LEVEL) edouard@3998: {{ edouard@3998: int beremiz_log_level = (level >= MQTTCLIENT_TRACE_ERROR ) ? LOG_CRITICAL : edouard@3998: (level > MQTTCLIENT_TRACE_MINIMUM) ? LOG_WARNING : edouard@3998: LOG_INFO; edouard@3998: _Log(beremiz_log_level,"Paho MQTT Trace : %s\n", message); edouard@3998: }} edouard@3984: }} edouard@3984: edouard@3989: #define CHANGED 1 edouard@3989: #define UNCHANGED 0 edouard@3989: edouard@4023: extern INT CONFIG__MQTT_STATUS_{locstr}; edouard@4023: edouard@3995: #define DECL_VAR(iec_type, C_type, c_loc_name) \ edouard@4012: static C_type PLC_##c_loc_name##_buf; \ edouard@4012: static C_type MQTT_##c_loc_name##_buf; \ edouard@3995: static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \ edouard@3984: C_type *c_loc_name = &PLC_##c_loc_name##_buf; edouard@3984: edouard@3984: {decl} edouard@3984: edouard@4012: /* JSON topic content encoding macros matching "json_decl" in substitution*/ edouard@4012: edouard@4018: #define printf_fmt_BOOL "%B" edouard@4018: #define printf_fmt_SINT "%hhd" edouard@4018: #define printf_fmt_USINT "%uhhd" edouard@4018: #define printf_fmt_INT "%hd" edouard@4018: #define printf_fmt_UINT "%uhd" edouard@4018: #define printf_fmt_DINT "%d" edouard@4018: #define printf_fmt_UDINT "%ud" edouard@4018: #define printf_fmt_LINT "%ld" edouard@4018: #define printf_fmt_ULINT "%uld" edouard@4018: #define printf_fmt_REAL "%f" edouard@4018: #define printf_fmt_LREAL "%Lf" edouard@4018: #define printf_fmt_STRING "%.*Q" edouard@4018: edouard@4018: #define printf_fmt_separator ", " edouard@4018: edouard@4019: #define printf_fmt_SIMPLE(C_type, C_name, name, _A) #name " : " printf_fmt_##C_type edouard@4019: #define printf_fmt_OBJECT(C_type, C_name, name, _A) #name " : {{ " TYPE_##C_type(printf_fmt, _A) " }}" edouard@4020: #define printf_fmt_ARRAY(C_type, C_name, name, _A) #name " : [ " TYPE_##C_type(printf_fmt, _A) " ]" edouard@4020: #define printf_fmt_ARRAY_SIMPLE(C_type, index, _A) printf_fmt_##C_type edouard@4020: #define printf_fmt_ARRAY_OBJECT(C_type, index, _A) "{{ " TYPE_##C_type(printf_fmt, _A) " }}" edouard@4018: edouard@4018: #define scanf_fmt_BOOL "%B" edouard@4018: #define scanf_fmt_SINT "%hhd" edouard@4018: #define scanf_fmt_USINT "%uhhd" edouard@4018: #define scanf_fmt_INT "%hd" edouard@4018: #define scanf_fmt_UINT "%uhd" edouard@4018: #define scanf_fmt_DINT "%d" edouard@4018: #define scanf_fmt_UDINT "%ud" edouard@4018: #define scanf_fmt_LINT "%ld" edouard@4018: #define scanf_fmt_ULINT "%uld" edouard@4018: #define scanf_fmt_REAL "%f" edouard@4018: #define scanf_fmt_LREAL "%Lf" edouard@4018: #define scanf_fmt_STRING "%M" edouard@4018: edouard@4018: #define scanf_fmt_separator ", " edouard@4018: edouard@4019: #define scanf_fmt_SIMPLE(C_type, C_name, name, _A) #name " : " scanf_fmt_##C_type edouard@4019: #define scanf_fmt_OBJECT(C_type, C_name, name, _A) #name " : {{ " TYPE_##C_type(scanf_fmt, _A) " }}" edouard@4020: #define scanf_fmt_ARRAY(C_type, C_name, name, _A) #name " : [ " TYPE_##C_type(scanf_fmt, _A) " ]" edouard@4020: #define scanf_fmt_ARRAY_SIMPLE(C_type, index, _A) scanf_fmt_##C_type edouard@4020: #define scanf_fmt_ARRAY_OBJECT(C_type, index, _A) "{{ " TYPE_##C_type(scanf_fmt, _A) " }}" edouard@4020: edouard@4020: #define scanf_arg_BOOL(arg) arg edouard@4020: #define scanf_arg_SINT(arg) arg edouard@4020: #define scanf_arg_USINT(arg) arg edouard@4020: #define scanf_arg_INT(arg) arg edouard@4020: #define scanf_arg_UINT(arg) arg edouard@4020: #define scanf_arg_DINT(arg) arg edouard@4020: #define scanf_arg_UDINT(arg) arg edouard@4020: #define scanf_arg_LINT(arg) arg edouard@4020: #define scanf_arg_ULINT(arg) arg edouard@4020: #define scanf_arg_REAL(arg) arg edouard@4020: #define scanf_arg_LREAL(arg) arg edouard@4020: #define scanf_arg_STRING(arg) scan_string, arg edouard@4012: edouard@4016: #define scanf_args_separator , edouard@4016: edouard@4020: #define scanf_args_SIMPLE(C_type, C_name, name, data_ptr) scanf_arg_##C_type(&data_ptr->C_name) edouard@4019: #define scanf_args_OBJECT(C_type, C_name, name, data_ptr) TYPE_##C_type(scanf_args, (&data_ptr->C_name)) edouard@4020: #define scanf_args_ARRAY(C_type, C_name, name, data_ptr) TYPE_##C_type(scanf_args, data_ptr->C_name.table) edouard@4020: #define scanf_args_ARRAY_SIMPLE(C_type, index, data_ptr) scanf_arg_##C_type(&data_ptr[index]) edouard@4020: #define scanf_args_ARRAY_OBJECT(C_type, index, data_ptr) TYPE_##C_type(scanf_args, (&data_ptr[index])) edouard@4020: edouard@4020: #define printf_arg_BOOL(arg) arg edouard@4020: #define printf_arg_SINT(arg) arg edouard@4020: #define printf_arg_USINT(arg) arg edouard@4020: #define printf_arg_INT(arg) arg edouard@4020: #define printf_arg_UINT(arg) arg edouard@4020: #define printf_arg_DINT(arg) arg edouard@4020: #define printf_arg_UDINT(arg) arg edouard@4020: #define printf_arg_LINT(arg) arg edouard@4020: #define printf_arg_ULINT(arg) arg edouard@4020: #define printf_arg_REAL(arg) arg edouard@4020: #define printf_arg_LREAL(arg) arg edouard@4020: #define printf_arg_STRING(arg) arg.len, arg.body edouard@4018: edouard@4016: #define printf_args_separator , edouard@4016: edouard@4020: #define printf_args_SIMPLE(C_type, C_name, name, data_ptr) printf_arg_##C_type(data_ptr->C_name) edouard@4019: #define printf_args_OBJECT(C_type, C_name, name, data_ptr) TYPE_##C_type(printf_args, (&data_ptr->C_name)) edouard@4020: #define printf_args_ARRAY(C_type, C_name, name, data_ptr) TYPE_##C_type(printf_args, (&data_ptr->C_name.table)) edouard@4020: #define printf_args_ARRAY_SIMPLE(C_type, index, data_ptr) printf_arg_##C_type(data_ptr[index]) edouard@4020: #define printf_args_ARRAY_OBJECT(C_type, index, data_ptr) TYPE_##C_type(printf_args, (data_ptr[index])) edouard@4012: edouard@4018: static void scan_string(const char *str, int len, void *user_data) {{ edouard@4018: IEC_STRING *iecstr = (IEC_STRING*)user_data; edouard@4018: __strlen_t ieclen = len > STR_MAX_LEN ? STR_MAX_LEN : len; edouard@4018: memcpy(iecstr->body, str, ieclen); edouard@4018: iecstr->len = ieclen; edouard@4018: }} edouard@4018: edouard@4012: #define DECL_JSON_INPUT(C_type, c_loc_name) \ edouard@4012: int json_parse_##c_loc_name(char *json, const int len, void *void_ptr) {{ \ edouard@4012: C_type *struct_ptr = (C_type *)void_ptr; \ edouard@4018: return json_scanf(json, len, "{{" TYPE_##C_type(scanf_fmt,) "}}", TYPE_##C_type(scanf_args, struct_ptr)); \ edouard@4012: }} edouard@4012: edouard@4012: /* Pre-allocated json output buffer for json_printf */ edouard@4012: #define json_out_size 1<<12 // 4K edouard@4012: static char json_out_buf[json_out_size] = {{0,}}; edouard@4012: static int json_out_len = 0; edouard@4012: edouard@4012: #define DECL_JSON_OUTPUT(C_type, c_loc_name) \ edouard@4012: int json_gen_##c_loc_name(C_type *struct_ptr) {{ \ edouard@4012: struct json_out out = JSON_OUT_BUF(json_out_buf, json_out_size); \ edouard@4018: json_out_len = json_printf(&out, "{{" TYPE_##C_type(printf_fmt,) "}}", TYPE_##C_type(printf_args, struct_ptr)); \ edouard@4012: if(json_out_len > json_out_size){{ \ edouard@4012: json_out_len = 0; \ edouard@4012: return -EOVERFLOW; \ edouard@4012: }} \ edouard@4012: return 0; \ edouard@4012: }} edouard@4012: edouard@4012: {json_decl} edouard@4012: edouard@3986: static MQTTClient client; edouard@3986: #ifdef USE_MQTT_5 edouard@3986: static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; edouard@3986: #else edouard@3986: static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; edouard@3986: #endif edouard@3989: edouard@4005: MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer; edouard@4005: edouard@3989: /* condition to quit publish thread */ edouard@3989: static int MQTT_stop_thread = 0; edouard@3989: edouard@3989: /* condition to wakeup publish thread */ edouard@3989: static int MQTT_any_pub_var_changed = 0; edouard@3989: edouard@4002: /* Keep track of connection state */ edouard@4002: static volatile int MQTT_is_disconnected = 1; edouard@4002: edouard@3997: /* mutex to keep incoming PLC data consistent */ edouard@3997: static pthread_mutex_t MQTT_retrieve_mutex = PTHREAD_MUTEX_INITIALIZER; edouard@3997: edouard@3997: /* mutex to keep outgoing PLC data consistent, and protect MQTT_any_pub_var_changed */ edouard@3998: static pthread_mutex_t MQTT_thread_wakeup_mutex = PTHREAD_MUTEX_INITIALIZER; edouard@3989: edouard@3989: /* wakeup publish thread when PLC changed published variable */ edouard@3998: static pthread_cond_t MQTT_thread_wakeup = PTHREAD_COND_INITIALIZER; edouard@3998: edouard@3998: /* thread that handles publish and reconnection */ edouard@3998: static pthread_t MQTT_thread; edouard@3986: edouard@4012: #define INIT_TOPIC(topic, iec_type, c_loc_name) \ edouard@4015: {{#topic, sizeof(#topic)-1, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, .is_json_type=0, .vartype = iec_type##_ENUM}}, edouard@4012: edouard@4012: #define INIT_JSON_TOPIC(topic, iec_type, c_loc_name) \ edouard@4015: {{#topic, sizeof(#topic)-1, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, .is_json_type=1, .json_parse_func=json_parse_##c_loc_name}}, edouard@4012: edouard@4012: typedef int (*json_parse_func_t)(char *json, int len, void *void_ptr); edouard@3984: edouard@3984: static struct {{ edouard@3984: const char *topic; //null terminated topic string edouard@4015: const unsigned int topic_len; edouard@3990: void *mqtt_pdata; // pointer to data from/for MQTT stack edouard@4015: int *mqtt_pstate; // pointer to changed flag edouard@4012: int is_json_type; edouard@4012: union {{ edouard@4012: __IEC_types_enum vartype; edouard@4012: json_parse_func_t json_parse_func; edouard@4012: }}; edouard@3984: }} topics [] = {{ edouard@3984: {topics} edouard@3984: }}; edouard@3984: edouard@3984: void __cleanup_{locstr}(void) edouard@3984: {{ edouard@3984: int rc; edouard@3984: edouard@3993: /* stop publish thread */ edouard@3993: MQTT_stop_thread = 1; edouard@3998: if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{ edouard@3993: /* unblock publish thread so that it can stop normally */ edouard@3998: pthread_cond_signal(&MQTT_thread_wakeup); edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3998: }} edouard@3998: pthread_join(MQTT_thread, NULL); edouard@3984: edouard@3986: #ifdef USE_MQTT_5 edouard@3984: if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) edouard@3986: #else edouard@3986: if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS) edouard@3986: #endif edouard@3984: {{ edouard@3995: LogError("MQTT Failed to disconnect, return code %d\n", rc); edouard@3979: }} edouard@3984: MQTTClient_destroy(&client); edouard@3984: }} edouard@3984: edouard@3998: void connectionLost(void* context, char* reason) edouard@3998: {{ edouard@3998: int rc; edouard@3998: LogWarning("ConnectionLost, reconnecting\\n"); edouard@3998: if (pthread_mutex_lock(&MQTT_thread_wakeup_mutex) == 0){{ edouard@3998: /* unblock publish thread so that it can reconnect */ edouard@4002: MQTT_is_disconnected = 1; edouard@3998: pthread_cond_signal(&MQTT_thread_wakeup); edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3998: }} edouard@3998: }} edouard@3998: edouard@3998: edouard@3998: edouard@3984: int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) edouard@3984: {{ edouard@3990: int low = 0; edouard@3990: int size = sizeof(topics) / sizeof(topics[0]); edouard@3990: int high = size - 1; edouard@3990: int mid; edouard@4012: int is_json_type; edouard@3990: edouard@3990: // bisect topic among subscribed topics edouard@3990: while (low <= high) {{ edouard@4015: int res, len, delta_len; edouard@3990: mid = low + (high - low) / 2; edouard@4015: if(topicLen == 0) {{ edouard@4015: // zero topic len means null-terminated edouard@4015: topicLen = strlen(topicName); edouard@4015: }} edouard@4015: edouard@4015: len = topics[mid].topic_len; edouard@4015: edouard@4015: // keep track of length difference edouard@4015: delta_len = len - topicLen; edouard@4015: edouard@4015: // len = min(len, topicLen) edouard@4015: len = topicLen > len ? len : topicLen; edouard@4015: edouard@4015: // compare strings as far as possible edouard@4015: res = strncmp(topics[mid].topic, topicName, len); edouard@4015: edouard@4015: // if partial comparison matches, longest string is the greatest edouard@4015: if (res == 0) edouard@4015: // update res to continue bisection edouard@4015: res = delta_len; edouard@3990: edouard@3990: // Check if key is present at mid edouard@3990: if (res == 0) edouard@3990: goto found; edouard@3990: edouard@3990: // If key greater, ignore left half edouard@3990: if (res < 0) edouard@3990: low = mid + 1; edouard@3990: edouard@3990: // If key is smaller, ignore right half edouard@3990: else edouard@3990: high = mid - 1; edouard@3990: }} edouard@3990: // If we reach here, then the element was not present edouard@4015: LogWarning("MQTT unknown topic: %s\n", topicName); edouard@3990: goto exit; edouard@3990: edouard@3990: found: edouard@4012: edouard@4012: is_json_type = topics[mid].is_json_type; edouard@4015: if(is_json_type || (__get_type_enum_size(topics[mid].vartype) == message->payloadlen)){{ edouard@3997: if (pthread_mutex_lock(&MQTT_retrieve_mutex) == 0){{ edouard@4012: if(is_json_type){{ edouard@4012: (topics[mid].json_parse_func)((char*)message->payload, message->payloadlen, topics[mid].mqtt_pdata); edouard@4012: }} else {{ edouard@4012: memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); edouard@4012: }} edouard@4015: *topics[mid].mqtt_pstate = CHANGED; edouard@3997: pthread_mutex_unlock(&MQTT_retrieve_mutex); edouard@3997: }} edouard@3990: }} else {{ edouard@3990: LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", edouard@3990: topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen); edouard@3990: }} edouard@3990: exit: edouard@3984: MQTTClient_freeMessage(&message); edouard@3984: MQTTClient_free(topicName); edouard@3984: return 1; edouard@3984: }} edouard@3984: edouard@3995: #define INIT_NoAuth() \ edouard@3995: LogInfo("MQTT Init no auth\n"); edouard@3995: edouard@4005: #define INIT_x509(Verify, KeyStore, TrustStore) \ edouard@4012: LogInfo("MQTT Init x509 with %s,%s\n", KeyStore?KeyStore:"NULL", TrustStore?TrustStore:"NULL")\ edouard@4005: ssl_opts.verify = Verify; \ edouard@4005: ssl_opts.keyStore = KeyStore; \ edouard@4005: ssl_opts.trustStore = TrustStore; \ edouard@4005: conn_opts.ssl = &ssl_opts; edouard@4005: edouard@4005: #define INIT_PSK(Secret, ID) \ edouard@4005: LogError("MQTT PSK NOT IMPLEMENTED\n") \ edouard@4005: /* LogInfo("MQTT Init PSK for ID %s\n", ID) */ \ edouard@4005: /* ssl_opts.ssl_psk_cb = TODO; */ \ edouard@4005: /* ssl_opts.ssl_psk_context = TODO; */ \ edouard@4005: conn_opts.ssl = &ssl_opts; edouard@3984: edouard@3995: #define INIT_UserPassword(User, Password) \ edouard@3998: LogInfo("MQTT Init UserPassword %s,%s\n", User, Password); \ edouard@3995: conn_opts.username = User; \ edouard@3990: conn_opts.password = Password; edouard@3984: edouard@3986: #ifdef USE_MQTT_5 edouard@3995: #define _SUBSCRIBE(Topic, QoS) \ edouard@3995: MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \ edouard@3995: /* when using MQTT5 responce code is 1 for some reason even if no error */ \ edouard@3995: rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \ edouard@3986: MQTTResponse_free(response); edouard@3986: #else edouard@3995: #define _SUBSCRIBE(Topic, QoS) \ edouard@3986: rc = MQTTClient_subscribe(client, #Topic, QoS); edouard@3986: #endif edouard@3986: edouard@3995: #define INIT_SUBSCRIPTION(Topic, QoS) \ edouard@3995: {{ \ edouard@3995: int rc; \ edouard@3998: _SUBSCRIBE(Topic, QoS) \ edouard@3995: if (rc != MQTTCLIENT_SUCCESS) \ edouard@3995: {{ \ edouard@3998: LogError("MQTT client failed to subscribe to '%s', return code %d\n", #Topic, rc); \ edouard@3995: }} \ edouard@3995: }} edouard@3995: edouard@3995: edouard@3995: #ifdef USE_MQTT_5 edouard@4012: #define _PUBLISH(Topic, QoS, cstring_size, cstring_ptr, Retained) \ edouard@4012: MQTTResponse response = MQTTClient_publish5(client, #Topic, cstring_size, \ edouard@4012: cstring_ptr, QoS, Retained, NULL, NULL); \ edouard@3995: rc = response.reasonCode; \ edouard@3987: MQTTResponse_free(response); edouard@3987: #else edouard@4012: #define _PUBLISH(Topic, QoS, cstring_size, cstring_ptr, Retained) \ edouard@4012: rc = MQTTClient_publish(client, #Topic, cstring_size, \ edouard@4012: cstring_ptr, QoS, Retained, NULL); edouard@3987: #endif edouard@3987: edouard@4012: #define PUBLISH_SIMPLE(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@4012: _PUBLISH(Topic, QoS, sizeof(C_type), &MQTT_##c_loc_name##_buf, Retained) edouard@4012: edouard@4012: #define PUBLISH_JSON(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@4012: int res = json_gen_##c_loc_name(&MQTT_##c_loc_name##_buf); \ edouard@4012: if(res == 0) {{ \ edouard@4012: _PUBLISH(Topic, QoS, json_out_len, json_out_buf, Retained) \ edouard@4012: }} edouard@4012: edouard@4012: #define INIT_PUBLICATION(encoding, Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: {{ \ edouard@3995: int rc; \ edouard@4012: PUBLISH_##encoding(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: if (rc != MQTTCLIENT_SUCCESS) \ edouard@3995: {{ \ edouard@3995: LogError("MQTT client failed to init publication of '%s', return code %d\n", #Topic, rc);\ edouard@3995: /* TODO update status variable accordingly */ \ edouard@3995: }} \ edouard@3995: }} edouard@3995: edouard@4012: #define PUBLISH_CHANGE(encoding, Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: if(MQTT_##c_loc_name##_state == CHANGED) \ edouard@3995: {{ \ edouard@3995: int rc; \ edouard@4012: PUBLISH_##encoding(Topic, QoS, C_type, c_loc_name, Retained) \ edouard@3995: if (rc != MQTTCLIENT_SUCCESS) \ edouard@3995: {{ \ edouard@3998: LogError("MQTT client failed to publish '%s', return code %d\n", #Topic, rc); \ edouard@3995: /* TODO update status variable accordingly */ \ edouard@3995: }} else {{ \ edouard@3995: MQTT_##c_loc_name##_state = UNCHANGED; \ edouard@3995: }} \ edouard@3989: }} edouard@3989: edouard@3998: static int _connect_mqtt(void) edouard@3998: {{ edouard@3998: int rc; edouard@3998: edouard@3998: #ifdef USE_MQTT_5 edouard@3998: MQTTProperties props = MQTTProperties_initializer; edouard@3998: MQTTProperties willProps = MQTTProperties_initializer; edouard@3998: MQTTResponse response = MQTTResponse_initializer; edouard@3998: edouard@3998: response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); edouard@3998: rc = response.reasonCode; edouard@3998: MQTTResponse_free(response); edouard@3998: #else edouard@3998: rc = MQTTClient_connect(client, &conn_opts); edouard@3998: #endif edouard@3998: edouard@3998: if (rc != MQTTCLIENT_SUCCESS) {{ edouard@4002: MQTT_is_disconnected = 1; edouard@3998: return rc; edouard@4002: }}else{{ edouard@4002: MQTT_is_disconnected = 0; edouard@3998: }} edouard@3998: edouard@3998: {init_pubsub} edouard@3998: edouard@3998: return MQTTCLIENT_SUCCESS; edouard@3998: }} edouard@3998: edouard@3998: static void *__MQTT_thread_proc(void *_unused) {{ edouard@3989: int rc = 0; edouard@3998: edouard@3998: while((rc = pthread_mutex_lock(&MQTT_thread_wakeup_mutex)) == 0 && !MQTT_stop_thread){{ edouard@3998: int do_publish; edouard@3998: edouard@3998: pthread_cond_wait(&MQTT_thread_wakeup, &MQTT_thread_wakeup_mutex); edouard@4002: edouard@4002: if(MQTT_is_disconnected) edouard@4002: {{ edouard@4002: /* TODO growing retry delay */ edouard@4002: /* TODO max retry delay as config parameter */ edouard@4002: sleep(5); edouard@4002: rc = _connect_mqtt(); edouard@4002: if (rc == MQTTCLIENT_SUCCESS) {{ edouard@4002: LogInfo("MQTT Reconnected\n"); edouard@4002: }} else {{ edouard@4002: LogError("MQTT Reconnect Failed, return code %d\n", rc); edouard@4002: }} edouard@4002: }} edouard@4002: if(!MQTT_is_disconnected && MQTT_any_pub_var_changed) edouard@3996: {{ edouard@3989: /* publish changes, and reset variable's state to UNCHANGED */ edouard@3989: {publish_changes} edouard@3998: MQTT_any_pub_var_changed = 0; edouard@3998: }} edouard@3998: edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3998: edouard@3993: if(MQTT_stop_thread) break; edouard@3989: }} edouard@3989: edouard@3989: if(!MQTT_stop_thread){{ edouard@3989: /* if thread exits outside of normal shutdown, report error*/ edouard@3995: LogError("MQTT client thread exited unexpectedly, return code %d\n", rc); edouard@3989: }} edouard@3989: }} edouard@3989: edouard@3979: int __init_{locstr}(int argc,char **argv) edouard@3979: {{ edouard@3979: char *uri = "{uri}"; edouard@3980: char *clientID = "{clientID}"; edouard@3980: int rc; edouard@3984: edouard@3990: MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; edouard@3984: edouard@3986: #ifdef USE_MQTT_5 edouard@3990: conn_opts.MQTTVersion = MQTTVERSION_5; edouard@3984: conn_opts.cleanstart = 1; edouard@3984: edouard@3984: createOpts.MQTTVersion = MQTTVERSION_5; edouard@3986: #else edouard@3990: conn_opts.cleansession = 1; edouard@3986: #endif edouard@3984: edouard@3984: MQTTClient_setTraceCallback(trace_callback); edouard@3998: MQTTClient_setTraceLevel(MQTT_DEBUG_LEVEL); edouard@3984: edouard@3990: rc = MQTTClient_createWithOptions( edouard@3984: &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); edouard@3990: if (rc != MQTTCLIENT_SUCCESS) edouard@3990: {{ edouard@3995: LogError("MQTT Failed to create client, return code %d\n", rc); edouard@3997: goto exit_error; edouard@3984: }} edouard@3984: edouard@3998: rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); edouard@3990: if (rc != MQTTCLIENT_SUCCESS) edouard@3990: {{ edouard@3995: LogError("MQTT Failed to set callbacks, return code %d\n", rc); edouard@3997: goto exit_error; edouard@3990: }} edouard@3990: edouard@3998: {init} edouard@3998: edouard@3990: rc = _connect_mqtt(); edouard@3998: if (rc == MQTTCLIENT_SUCCESS) {{ edouard@3998: LogInfo("MQTT Connected\n"); edouard@3998: }} else {{ edouard@3995: LogError("MQTT Connect Failed, return code %d\n", rc); edouard@3998: // Connect error at init is fine, publish thread will retry later edouard@3998: }} edouard@3998: edouard@3998: /* start MQTT thread */ edouard@3997: MQTT_stop_thread = 0; edouard@3998: rc = pthread_create(&MQTT_thread, NULL, &__MQTT_thread_proc, NULL); edouard@3997: if (rc != 0) {{ edouard@3997: LogError("MQTT cannot create thread, return code %d\n", rc); edouard@3997: goto exit_error; edouard@3997: }} edouard@3984: edouard@3979: return 0; edouard@3997: edouard@3997: exit_error: edouard@3997: MQTTClient_destroy(&client); edouard@3997: return rc; edouard@3979: }} edouard@3979: edouard@3995: #define READ_VALUE(c_loc_name, C_type) \ edouard@3995: if(MQTT_##c_loc_name##_state == CHANGED){{ \ edouard@3995: /* TODO care about endianess */ \ edouard@3995: PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \ edouard@3995: MQTT_##c_loc_name##_state = UNCHANGED; \ edouard@3990: }} edouard@3979: edouard@3979: void __retrieve_{locstr}(void) edouard@3979: {{ edouard@3997: if (pthread_mutex_trylock(&MQTT_retrieve_mutex) == 0){{ edouard@4023: CONFIG__MQTT_STATUS_{locstr} = MQTT_is_disconnected ? 0 : 1; edouard@3979: {retrieve} edouard@3997: pthread_mutex_unlock(&MQTT_retrieve_mutex); edouard@3990: }} edouard@3984: }} edouard@3984: edouard@3995: #define WRITE_VALUE(c_loc_name, C_type) \ edouard@3995: /* TODO care about endianess */ \ edouard@4012: if(memcmp(&MQTT_##c_loc_name##_buf, &PLC_##c_loc_name##_buf, sizeof(C_type))){{ \ edouard@3995: MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \ edouard@3995: MQTT_##c_loc_name##_state = CHANGED; \ edouard@3995: MQTT_any_pub_var_changed = 1; \ edouard@3989: }} edouard@3979: edouard@3979: void __publish_{locstr}(void) edouard@3979: {{ edouard@3998: if (pthread_mutex_trylock(&MQTT_thread_wakeup_mutex) == 0){{ edouard@3989: MQTT_any_pub_var_changed = 0; edouard@3989: /* copy PLC_* variables to MQTT_*, and mark those who changed */ edouard@3979: {publish} edouard@3989: /* if any change detcted, unblock publish thread */ edouard@4002: if(MQTT_any_pub_var_changed || MQTT_is_disconnected){{ edouard@3998: pthread_cond_signal(&MQTT_thread_wakeup); edouard@3989: }} edouard@3998: pthread_mutex_unlock(&MQTT_thread_wakeup_mutex); edouard@3989: }} else {{ edouard@3989: /* TODO if couldn't lock mutex set status variable accordingly */ edouard@3989: }} edouard@3979: }} edouard@3979: