--- a/mqtt/mqtt_client_gen.py Tue Jul 16 09:41:45 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Wed Jul 17 17:02:32 2024 +0200
@@ -308,53 +308,21 @@
for row in data:
writer.writerow([direction] + row)
- def _TopicsPerfectHash(self, topics):
- template = """
-#define NK $NK /* number of keys */
-#define NG $NG /* number of vertices */
-#define NS $NS /* length of array T1 and T2 */
-
-int S1[] = {$S1};
-int S2[] = {$S2};
-int G[] = {$G};
-char *K[] = {$K};
-"""
- code = generate_code(topics, Hash=IntSaltHash, template=template)
- code += """
-/* return index of key in K if key is found, -1 otherwise */
-int MQTT_Topic_index(const char *key)
-{
- int i, f1 = 0, f2 = 0;
-
- for (i = 0; key[i] != '\\0' && i < NS; i++) {
- f1 += S1[i] * key[i];
- f2 += S2[i] * key[i];
- f1 %= NG;
- f2 %= NG;
- }
- i = (G[f1] + G[f2]) % NG;
- if (i < NK && strcmp(key, K[i]) == 0)
- return i;
-
- return -1;
-}
-"""
- return code
-
def GenerateC(self, path, locstr, config):
template = """/* code generated by beremiz MQTT extension */
#include <stdint.h>
#include <pthread.h>
+#include <string.h>
#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#define _Log(level, ...) \\
{{ \\
- /* char mstr[256]; */ \\
- /* snprintf(mstr, 255, __VA_ARGS__); */ \\
- /* LogMessage(level, mstr, strlen(mstr)); */ \\
+ char mstr[256]; \\
+ snprintf(mstr, 255, __VA_ARGS__); \\
+ LogMessage(level, mstr, strlen(mstr)); \\
printf(__VA_ARGS__); \\
}}
@@ -364,7 +332,7 @@
void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
{{
- LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
+ LogInfo("Paho MQTT Trace : %d, %s\\n", level, message);
}}
#define CHANGED 1
@@ -401,11 +369,12 @@
static pthread_t publishThread;
#define INIT_TOPIC(topic, iec_type, c_loc_name) \\
-{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
+{{#topic, &MQTT_##c_loc_name##_buf, &MQTT_##c_loc_name##_state, iec_type##_ENUM}},
static struct {{
const char *topic; //null terminated topic string
- void *mqtt_pdata; //data from/for MQTT stack
+ void *mqtt_pdata; // pointer to data from/for MQTT stack
+ int *mqtt_pchanged; // pointer to changed flag
__IEC_types_enum vartype;
}} topics [] = {{
{topics}
@@ -427,7 +396,7 @@
rc = MQTTClient_connect(client, &conn_opts);
#endif
- return rc;
+ return rc;
}}
void __cleanup_{locstr}(void)
@@ -450,16 +419,48 @@
void connectionLost(void* context, char* reason)
{{
LogWarning("ConnectionLost, reconnecting\\n");
- _connect_mqtt();
+ _connect_mqtt();
/* TODO wait if error */
}}
int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{{
- /* TODO : something with message */
- printf("Message arrived\\n");
- printf(" topic: %s\\n", topicName);
- printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload);
+ int low = 0;
+ int size = sizeof(topics) / sizeof(topics[0]);
+ int high = size - 1;
+ int mid;
+
+ // bisect topic among subscribed topics
+ while (low <= high) {{
+ int res;
+ mid = low + (high - low) / 2;
+ res = strncmp(topics[mid].topic, topicName, topicLen);
+
+ // Check if key is present at mid
+ if (res == 0)
+ goto found;
+
+ // If key greater, ignore left half
+ if (res < 0)
+ low = mid + 1;
+
+ // If key is smaller, ignore right half
+ else
+ high = mid - 1;
+ }}
+ // If we reach here, then the element was not present
+ LogWarning("MQTT unknown topic: %s", topicName);
+ goto exit;
+
+found:
+ if(__get_type_enum_size(topics[mid].vartype) == message->payloadlen){{
+ memcpy(topics[mid].mqtt_pdata, (char*)message->payload, message->payloadlen);
+ *topics[mid].mqtt_pchanged = 1;
+ }} else {{
+ LogWarning("MQTT wrong payload size for topic: %s. Should be %d, but got %d.",
+ topicName, __get_type_enum_size(topics[mid].vartype), message->payloadlen);
+ }}
+exit:
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
@@ -474,16 +475,17 @@
#define INIT_UserPassword(User, Password) \\
LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password); \\
- conn_opts.username = User; \\
- conn_opts.password = Password;
+ conn_opts.username = User; \\
+ conn_opts.password = Password;
#ifdef USE_MQTT_5
-#define _SUBSCRIBE(Topic, QoS) \\
+#define _SUBSCRIBE(Topic, QoS) \\
MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\
- rc = response.reasonCode; \\
+ /* when using MQTT5 responce code is 1 for some reason even if no error */ \\
+ rc = response.reasonCode == 1 ? MQTTCLIENT_SUCCESS : response.reasonCode; \\
MQTTResponse_free(response);
#else
-#define _SUBSCRIBE(Topic, QoS) \\
+#define _SUBSCRIBE(Topic, QoS) \\
rc = MQTTClient_subscribe(client, #Topic, QoS);
#endif
@@ -501,7 +503,7 @@
#ifdef USE_MQTT_5
#define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\
MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\
- &PLC_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \\
+ &MQTT_##c_loc_name##_buf, QoS, Retained, NULL, NULL); \\
rc = response.reasonCode; \\
MQTTResponse_free(response);
#else
@@ -561,39 +563,39 @@
char *clientID = "{clientID}";
int rc;
- MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
+ MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
#ifdef USE_MQTT_5
- conn_opts.MQTTVersion = MQTTVERSION_5;
+ conn_opts.MQTTVersion = MQTTVERSION_5;
conn_opts.cleanstart = 1;
createOpts.MQTTVersion = MQTTVERSION_5;
#else
- conn_opts.cleansession = 1;
+ conn_opts.cleansession = 1;
#endif
MQTTClient_setTraceCallback(trace_callback);
MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
- rc = MQTTClient_createWithOptions(
+ rc = MQTTClient_createWithOptions(
&client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
- if (rc != MQTTCLIENT_SUCCESS)
- {{
+ if (rc != MQTTCLIENT_SUCCESS)
+ {{
LogError("MQTT Failed to create client, return code %d\\n", rc);
return rc;
}}
- rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
- if (rc != MQTTCLIENT_SUCCESS)
- {{
+ rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
+ if (rc != MQTTCLIENT_SUCCESS)
+ {{
LogError("MQTT Failed to set callbacks, return code %d\\n", rc);
return rc;
- }}
-
- rc = _connect_mqtt();
-
- if (rc != MQTTCLIENT_SUCCESS) {{
+ }}
+
+ rc = _connect_mqtt();
+
+ if (rc != MQTTCLIENT_SUCCESS) {{
LogError("MQTT Connect Failed, return code %d\\n", rc);
return rc;
}}
@@ -607,16 +609,22 @@
}}
#define READ_VALUE(c_loc_name, C_type) \\
- PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf;
+ if(MQTT_##c_loc_name##_state == CHANGED){{ \\
+ /* TODO care about endianess */ \\
+ PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; \\
+ MQTT_##c_loc_name##_state = UNCHANGED; \\
+ }}
void __retrieve_{locstr}(void)
{{
- /* TODO try take mutex */
+ if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
{retrieve}
- /* TODO free mutex */
+ pthread_mutex_unlock(&MQTT_mutex);
+ }}
}}
#define WRITE_VALUE(c_loc_name, C_type) \\
+ /* TODO care about endianess */ \\
if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\
MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\
MQTT_##c_loc_name##_state = CHANGED; \\
@@ -671,37 +679,34 @@
formatdict["init"] += """
INIT_NoAuth()"""
- topics = OrderedDict()
- for direction, data in self.items():
- iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
- for row in data:
- if direction == "input":
- Topic, QoS, iec_type, iec_number = row
- else:
- Topic, QoS, _Retained, iec_type, iec_number = row
- Retained = 1 if _Retained=="True" else 0
- C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
- c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
-
- formatdict["decl"] += """
+ for row in self["output"]:
+ Topic, QoS, _Retained, iec_type, iec_number = row
+ Retained = 1 if _Retained=="True" else 0
+ C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
+ c_loc_name = "__Q" + iec_size_prefix + locstr + "_" + str(iec_number)
+
+ formatdict["decl"] += """
DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
-
- formatdict["topics"] += """
+ formatdict["init"] += """
+ INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
+ formatdict["publish"] += """
+ WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
+ formatdict["publish_changes"] += """
+ PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
+
+ # inputs need to be sorted for bisection search
+ for row in sorted(self["input"]):
+ Topic, QoS, iec_type, iec_number = row
+ C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
+ c_loc_name = "__I" + iec_size_prefix + locstr + "_" + str(iec_number)
+ formatdict["decl"] += """
+DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
+ formatdict["topics"] += """
INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
-
- if direction == "input":
- formatdict["init"] += """
+ formatdict["init"] += """
INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals())
- formatdict["retrieve"] += """
- READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
-
- else:
- formatdict["init"] += """
- INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
- formatdict["publish"] += """
- WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
- formatdict["publish_changes"] += """
- PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
+ formatdict["retrieve"] += """
+ READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
Ccode = template.format(**formatdict)