# HG changeset patch # User Edouard Tisserant # Date 1721228552 -7200 # Node ID 24656e0e873280d7cb3e319a565fc89b7f973526 # Parent 987c69b1582fbd3565ab33a3cfe753f2a9597f2b MQTT: WIP. Added handling of received messages (subscriptions) Dropped perfect hash in favor of bisection Also fixed indentation (tabs) diff -r 987c69b1582f -r 24656e0e8732 exemples/first_steps/plc.xml --- a/exemples/first_steps/plc.xml Tue Jul 16 09:41:45 2024 +0200 +++ b/exemples/first_steps/plc.xml Wed Jul 17 17:02:32 2024 +0200 @@ -1,7 +1,7 @@ - + diff -r 987c69b1582f -r 24656e0e8732 mqtt/mqtt_client_gen.py --- a/mqtt/mqtt_client_gen.py Tue Jul 16 09:41:45 2024 +0200 +++ b/mqtt/mqtt_client_gen.py Wed Jul 17 17:02:32 2024 +0200 @@ -308,53 +308,21 @@ for row in data: writer.writerow([direction] + row) - def _TopicsPerfectHash(self, topics): - template = """ -#define NK $NK /* number of keys */ -#define NG $NG /* number of vertices */ -#define NS $NS /* length of array T1 and T2 */ - -int S1[] = {$S1}; -int S2[] = {$S2}; -int G[] = {$G}; -char *K[] = {$K}; -""" - code = generate_code(topics, Hash=IntSaltHash, template=template) - code += """ -/* return index of key in K if key is found, -1 otherwise */ -int MQTT_Topic_index(const char *key) -{ - int i, f1 = 0, f2 = 0; - - for (i = 0; key[i] != '\\0' && i < NS; i++) { - f1 += S1[i] * key[i]; - f2 += S2[i] * key[i]; - f1 %= NG; - f2 %= NG; - } - i = (G[f1] + G[f2]) % NG; - if (i < NK && strcmp(key, K[i]) == 0) - return i; - - return -1; -} -""" - return code - def GenerateC(self, path, locstr, config): template = """/* code generated by beremiz MQTT extension */ #include #include +#include #include "MQTTClient.h" #include "MQTTClientPersistence.h" #define _Log(level, ...) \\ {{ \\ - /* char mstr[256]; */ \\ - /* snprintf(mstr, 255, __VA_ARGS__); */ \\ - /* LogMessage(level, mstr, strlen(mstr)); */ \\ + char mstr[256]; \\ + snprintf(mstr, 255, __VA_ARGS__); \\ + LogMessage(level, mstr, strlen(mstr)); \\ printf(__VA_ARGS__); \\ }} @@ -364,7 +332,7 @@ void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) {{ - LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); + LogInfo("Paho MQTT Trace : %d, %s\\n", level, message); }} #define CHANGED 1 @@ -401,11 +369,12 @@ static pthread_t publishThread; #define INIT_TOPIC(topic, iec_type, c_loc_name) \\ -{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, +{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}}, static struct {{ const char *topic; //null terminated topic string - void *mqtt_pdata; //data from/for MQTT stack + void *mqtt_pdata; // pointer to data from/for MQTT stack + int *mqtt_pchanged; // pointer to changed flag __IEC_types_enum vartype; }} topics [] = {{ {topics} @@ -427,7 +396,7 @@ rc = MQTTClient_connect(client, &conn_opts); #endif - return rc; + return rc; }} void __cleanup_{locstr}(void) @@ -450,16 +419,48 @@ void connectionLost(void* context, char* reason) {{ LogWarning("ConnectionLost, reconnecting\\n"); - _connect_mqtt(); + _connect_mqtt(); /* TODO wait if error */ }} int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) {{ - /* TODO : something with message */ - printf("Message arrived\\n"); - printf(" topic: %s\\n", topicName); - printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload); + int low = 0; + int size = sizeof(topics) / sizeof(topics[0]); + int high = size - 1; + int mid; + + // bisect topic among subscribed topics + while (low <= high) {{ + int res; + mid = low + (high - low) / 2; + res = strncmp(topics[mid].topic, topicName, topicLen); + + // Check if key is present at mid + if (res == 0) + goto found; + + // If key greater, ignore left half + if (res < 0) + low = mid + 1; + + // If key is smaller, ignore right half + else + high = mid - 1; + }} + // If we reach here, then the element was not present + LogWarning("MQTT unknown topic: %s", topicName); + goto exit; + +found: + if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{ + memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen); + *topics[mid].mqtt_pchanged = 1; + }} else {{ + LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.", + topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen); + }} +exit: MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; @@ -474,16 +475,17 @@ #define INIT_UserPassword(User, Password) \\ LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password); \\ - conn_opts.username = User; \\ - conn_opts.password = Password; + conn_opts.username = User; \\ + conn_opts.password = Password; #ifdef USE_MQTT_5 -#define _SUBSCRIBE(Topic, QoS) \\ +#define _SUBSCRIBE(Topic, QoS) \\ MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\ - rc = response.reasonCode; \\ + /* when using MQTT5 responce code is 1 for some reason even if no error */ \\ + rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \\ MQTTResponse_free(response); #else -#define _SUBSCRIBE(Topic, QoS) \\ +#define _SUBSCRIBE(Topic, QoS) \\ rc = MQTTClient_subscribe(client, #Topic, QoS); #endif @@ -501,7 +503,7 @@ #ifdef USE_MQTT_5 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\ - &PLC_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \\ + &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \\ rc = response.reasonCode; \\ MQTTResponse_free(response); #else @@ -561,39 +563,39 @@ char *clientID = "{clientID}"; int rc; - MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; + MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; #ifdef USE_MQTT_5 - conn_opts.MQTTVersion = MQTTVERSION_5; + conn_opts.MQTTVersion = MQTTVERSION_5; conn_opts.cleanstart = 1; createOpts.MQTTVersion = MQTTVERSION_5; #else - conn_opts.cleansession = 1; + conn_opts.cleansession = 1; #endif MQTTClient_setTraceCallback(trace_callback); MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); - rc = MQTTClient_createWithOptions( + rc = MQTTClient_createWithOptions( &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); - if (rc != MQTTCLIENT_SUCCESS) - {{ + if (rc != MQTTCLIENT_SUCCESS) + {{ LogError("MQTT Failed to create client, return code %d\\n", rc); return rc; }} - rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); - if (rc != MQTTCLIENT_SUCCESS) - {{ + rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); + if (rc != MQTTCLIENT_SUCCESS) + {{ LogError("MQTT Failed to set callbacks, return code %d\\n", rc); return rc; - }} - - rc = _connect_mqtt(); - - if (rc != MQTTCLIENT_SUCCESS) {{ + }} + + rc = _connect_mqtt(); + + if (rc != MQTTCLIENT_SUCCESS) {{ LogError("MQTT Connect Failed, return code %d\\n", rc); return rc; }} @@ -607,16 +609,22 @@ }} #define READ_VALUE(c_loc_name, C_type) \\ - PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; + if(MQTT_##c_loc_name##_state == CHANGED){{ \\ + /* TODO care about endianess */ \\ + PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \\ + MQTT_##c_loc_name##_state = UNCHANGED; \\ + }} void __retrieve_{locstr}(void) {{ - /* TODO try take mutex */ + if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ {retrieve} - /* TODO free mutex */ + pthread_mutex_unlock(&MQTT_mutex); + }} }} #define WRITE_VALUE(c_loc_name, C_type) \\ + /* TODO care about endianess */ \\ if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\ MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\ MQTT_##c_loc_name##_state = CHANGED; \\ @@ -671,37 +679,34 @@ formatdict["init"] += """ INIT_NoAuth()""" - topics = OrderedDict() - for direction, data in self.items(): - iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction] - for row in data: - if direction == "input": - Topic, QoS, iec_type, iec_number = row - else: - Topic, QoS, _Retained, iec_type, iec_number = row - Retained = 1 if _Retained=="True" else 0 - C_type, iec_size_prefix = MQTT_IEC_types[iec_type] - c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number) - - formatdict["decl"] += """ + for row in self["output"]: + Topic, QoS, _Retained, iec_type, iec_number = row + Retained = 1 if _Retained=="True" else 0 + C_type, iec_size_prefix = MQTT_IEC_types[iec_type] + c_loc_name = "__Q" + iec_size_prefix + locstr + "_" + str(iec_number) + + formatdict["decl"] += """ DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals()) - - formatdict["topics"] += """ + formatdict["init"] += """ + INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) + formatdict["publish"] += """ + WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) + formatdict["publish_changes"] += """ + PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) + + # inputs need to be sorted for bisection search + for row in sorted(self["input"]): + Topic, QoS, iec_type, iec_number = row + C_type, iec_size_prefix = MQTT_IEC_types[iec_type] + c_loc_name = "__I" + iec_size_prefix + locstr + "_" + str(iec_number) + formatdict["decl"] += """ +DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals()) + formatdict["topics"] += """ INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals()) - - if direction == "input": - formatdict["init"] += """ + formatdict["init"] += """ INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals()) - formatdict["retrieve"] += """ - READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) - - else: - formatdict["init"] += """ - INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) - formatdict["publish"] += """ - WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) - formatdict["publish_changes"] += """ - PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) + formatdict["retrieve"] += """ + READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) Ccode = template.format(**formatdict)