--- a/mqtt/mqtt_client_gen.py Tue Jul 09 11:44:49 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Tue Jul 09 11:46:19 2024 +0200
@@ -4,11 +4,12 @@
import csv
import functools
from threading import Thread
+from collections import OrderedDict
import wx
import wx.dataview as dv
-from perfect_hash import generate_code, IntSaltHash
+# from perfect_hash import generate_code, IntSaltHash
MQTT_IEC_types = dict(
# IEC61131| C type | sz
@@ -298,7 +299,7 @@
int G[] = {$G};
char *K[] = {$K};
"""
- code = generate_code(topics, Hash=IntSaltHash, template=template)
+ code = generate_code(topics, Hash=IntSaltHash, template=template)
code += """
/* return index of key in K if key is found, -1 otherwise */
int MQTT_Topic_index(const char *key)
@@ -318,189 +319,188 @@
return -1;
}
"""
- return code
+ return code
def GenerateC(self, path, locstr, config):
template = """/* code generated by beremiz MQTT extension */
-#include "MQTTAsync.h"
+#include <stdint.h>
+#include <pthread.h>
+
+#include "MQTTClient.h"
#include "MQTTClientPersistence.h"
#define _Log(level, ...) \\
{{ \\
- char mstr[256]; \\
- snprintf(mstr, 255, __VA_ARGS__); \\
- LogMessage(level, mstr, strlen(mstr)); \\
+ /* char mstr[256]; */ \\
+ /* snprintf(mstr, 255, __VA_ARGS__); */ \\
+ /* LogMessage(level, mstr, strlen(mstr)); */ \\
+ printf(__VA_ARGS__); \\
}}
#define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
-static inline void* loadFile(const char *const path) {{
-
- FILE *fp = fopen(path, "rb");
- if(!fp) {{
- errno = 0;
- LogError("MQTT could not open %s", path);
- return NULL;
+static MQTTClient client;
+static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
+static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent
+
+void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
+{{
+ LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
+}}
+
+#define DECL_VAR(iec_type, C_type, c_loc_name) \\
+static C_type PLC_##c_loc_name##_buf = 0; \\
+static C_type MQTT_##c_loc_name##_buf = 0; \\
+C_type *c_loc_name = &PLC_##c_loc_name##_buf;
+
+{decl}
+
+#define INIT_TOPIC(topic, iec_type, c_loc_name) \\
+{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
+
+static struct {{
+ const char *topic; //null terminated topic string
+ void *mqtt_pdata; //data from/for MQTT stack
+ __IEC_types_enum vartype;
+}} topics [] = {{
+{topics}
+}};
+
+static int _connect_mqtt(void)
+{{
+ int rc;
+ MQTTProperties props = MQTTProperties_initializer;
+ MQTTProperties willProps = MQTTProperties_initializer;
+ MQTTResponse response = MQTTResponse_initializer;
+
+ response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
+ rc = response.reasonCode;
+ MQTTResponse_free(response);
+
+ return rc;
+}}
+
+void __cleanup_{locstr}(void)
+{{
+ int rc;
+
+ /* TODO stop publish thread */
+
+ if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
+ {{
+ LogError("MQTT Failed to disconnect, return code %d\\n", rc);
}}
-
- fseek(fp, 0, SEEK_END);
- size_t length = (size_t)ftell(fp);
- void* data = malloc(length);
- if(data) {{
- fseek(fp, 0, SEEK_SET);
- size_t read = fread(data, 1, fileContents.length, fp);
- if(read != length){{
- free(data);
- LogError("MQTT could not read %s", path);
- }}
- }} else {{
- LogError("MQTT Not enough memoty to load %s", path);
+ MQTTClient_destroy(&client);
+}}
+
+void connectionLost(void* context, char* reason)
+{{
+ LogWarning("ConnectionLost, reconnecting\\n");
+ _connect_mqtt();
+ /* TODO wait if error */
+}}
+
+int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
+{{
+ /* TODO : something with message */
+ printf("Message arrived\\n");
+ printf(" topic: %s\\n", topicName);
+ printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload);
+ MQTTClient_freeMessage(&message);
+ MQTTClient_free(topicName);
+ return 1;
+}}
+
+#define INIT_NoAuth() \\
+ LogInfo("MQTT Init no auth");
+
+#define INIT_x509(PrivateKey, Certificate) \\
+ LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate);
+ /* TODO */
+
+#define INIT_UserPassword(User, Password) \\
+ LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\
+ conn_opts.username = User; \\
+ conn_opts.password = Password;
+
+#define INIT_SUBSCRIPTION(Topic, QoS) \\
+ {{ \\
+ MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\
+ rc = response.reasonCode; \\
+ MQTTResponse_free(response); \\
+ if (rc != MQTTCLIENT_SUCCESS) \\
+ {{ \\
+ LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\
+ }} \\
}}
- fclose(fp);
-
- return data;
-}}
-
-static MQTTClient client;
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-
-void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
-{
- LogWarning("Paho MQTT Trace : %d, %s\n", level, message);
-}
-
-#define DECL_VAR(iec_type, C_type, c_loc_name) \\
-static C_type c_loc_name##_buf = 0; \\
-C_type *c_loc_name = &c_loc_name##_buf;
-
-{decl}
-
-#define INIT_TOPIC(topic, iec_type, c_loc_name) \\
-{topic, &c_loc_name##_buf, iec_type##_ENUM},
-
-ststic struct {
- const char *topic; //null terminated topic string
- void *pdata; //pointer to data
- __IEC_types_enum vartype;
-} topics [] = {
-{topics}
-}
-
-void __cleanup_{locstr}(void)
-{{
- MQTT_Client_disconnect(client);
- MQTT_Client_delete(client);
-}}
-
-#define INIT_NoAuth() \\
- LogInfo("MQTT Init no auth"); \\
- MQTT_ClientConfig_setDefault(cc); \\
- retval = MQTT_Client_connect(client, uri);
-
-/* Note : Single policy is enforced here, by default open62541 client supports all policies */
-#define INIT_x509(Policy, UpperCaseMode, PrivateKey, Certificate) \\
- LogInfo("MQTT Init x509 %s,%s,%s,%s", #Policy, #UpperCaseMode, PrivateKey, Certificate); \\
- \\
- MQTT_ByteString certificate = loadFile(Certificate); \\
- MQTT_ByteString privateKey = loadFile(PrivateKey); \\
- \\
- cc->securityMode = MQTT_MESSAGESECURITYMODE_##UpperCaseMode; \\
- \\
- /* replacement for default behaviour */ \\
- /* MQTT_ClientConfig_setDefaultEncryption(cc, certificate, privateKey, NULL, 0, NULL, 0); */ \\
- do{{ \\
- retval = MQTT_ClientConfig_setDefault(cc); \\
- if(retval != MQTT_STATUSCODE_GOOD) \\
- break; \\
- \\
- MQTT_SecurityPolicy *sp = (MQTT_SecurityPolicy*) \\
- MQTT_realloc(cc->securityPolicies, sizeof(MQTT_SecurityPolicy) * 2); \\
- if(!sp){{ \\
- retval = MQTT_STATUSCODE_BADOUTOFMEMORY; \\
- break; \\
- }} \\
- cc->securityPolicies = sp; \\
- \\
- retval = MQTT_SecurityPolicy_##Policy(&cc->securityPolicies[cc->securityPoliciesSize], \\
- certificate, privateKey, &cc->logger); \\
- if(retval != MQTT_STATUSCODE_GOOD) {{ \\
- MQTT_LOG_WARNING(&cc->logger, MQTT_LOGCATEGORY_USERLAND, \\
- "Could not add SecurityPolicy Policy with error code %s", \\
- MQTT_StatusCode_name(retval)); \\
- MQTT_free(cc->securityPolicies); \\
- cc->securityPolicies = NULL; \\
- break; \\
- }} \\
- \\
- ++cc->securityPoliciesSize; \\
- }} while(0); \\
- \\
- retval = MQTT_Client_connect(client, uri); \\
- \\
- MQTT_ByteString_clear(&certificate); \\
- MQTT_ByteString_clear(&privateKey);
-
-#define INIT_UserPassword(User, Password) \\
- LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\
- MQTT_ClientConfig_setDefault(cc); \\
- retval = MQTT_Client_connectUsername(client, uri, User, Password);
-
-#define INIT_READ_VARIANT(ua_type, c_loc_name) \\
- MQTT_Variant_init(&c_loc_name##_variant);
-
-#define INIT_WRITE_VARIANT(ua_type, ua_type_enum, c_loc_name) \\
- MQTT_Variant_setScalar(&c_loc_name##_variant, (ua_type*)c_loc_name, &MQTT_TYPES[ua_type_enum]);
int __init_{locstr}(int argc,char **argv)
{{
char *uri = "{uri}";
char *clientID = "{clientID}";
int rc;
- conn_opts = MQTTClient_connectOptions_initializer;
-
- if ((rc = MQTTClient_create(&client, uri, clientID,
- MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
- {
- printf("Failed to create client, return code %d\n", rc);
- rc = EXIT_FAILURE;
- goto exit;
- }
+
+ MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
+
+ conn_opts.MQTTVersion = MQTTVERSION_5;
+ conn_opts.cleanstart = 1;
+
+ createOpts.MQTTVersion = MQTTVERSION_5;
+
+ MQTTClient_setTraceCallback(trace_callback);
+ MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
+
+
+ rc = MQTTClient_createWithOptions(
+ &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
+ if (rc != MQTTCLIENT_SUCCESS)
+ {{
+ LogError("MQTT Failed to create client, return code %d\\n", rc);
+ return rc;
+ }}
+
+ rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
+ if (rc != MQTTCLIENT_SUCCESS)
+ {{
+ LogError("MQTT Failed to set callbacks %d", rc);
+ return rc;
+ }}
{init}
- if(retval != MQTT_STATUSCODE_GOOD) {{
- LogError("MQTT Init Failed %d", retval);
- MQTT_Client_delete(client);
- return EXIT_FAILURE;
+ rc = _connect_mqtt();
+
+ if (rc != MQTTCLIENT_SUCCESS) {{
+ LogError("MQTT Init Failed %d", rc);
+ return rc;
}}
+ /* TODO start publish thread */
+
return 0;
}}
-#define READ_VALUE(ua_type, ua_type_enum, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\
- retval = MQTT_Client_readValueAttribute( \\
- client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant); \\
- if(retval == MQTT_STATUSCODE_GOOD && MQTT_Variant_isScalar(&c_loc_name##_variant) && \\
- c_loc_name##_variant.type == &MQTT_TYPES[ua_type_enum]) {{ \\
- c_loc_name##_buf = *(ua_type*)c_loc_name##_variant.data; \\
- MQTT_Variant_clear(&c_loc_name##_variant); /* Unalloc requiered on each read ! */ \\
- }}
+#define READ_VALUE(c_loc_name, C_type) \\
+ PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf;
void __retrieve_{locstr}(void)
{{
- MQTT_StatusCode retval;
+ /* TODO try take mutex */
{retrieve}
-}}
-
-#define WRITE_VALUE(ua_type, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\
- MQTT_Client_writeValueAttribute( \\
- client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant);
+ /* TODO free mutex */
+}}
+
+#define WRITE_VALUE(c_loc_name, C_type) \\
+ MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf;
void __publish_{locstr}(void)
{{
+ /* TODO try take mutex */
{publish}
+ /* TODO free mutex */
+ /* TODO unblock publish thread */
}}
"""
@@ -528,11 +528,11 @@
formatdict["init"] += """
INIT_NoAuth()"""
- topics = OrderedDict()
+ topics = OrderedDict()
for direction, data in self.items():
iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
for row in data:
- Topic, QoS, Retain, iec_type, iec_number = row
+ Topic, QoS, Retain, iec_type, iec_number = row
C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
@@ -540,19 +540,18 @@
DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
formatdict["topics"] += """
-INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
-#
-# if direction == "input":
-# formatdict["init"] += """
-# INIT_READ_VARIANT({ua_type}, {c_loc_name})""".format(**locals())
-# formatdict["retrieve"] += """
-# READ_VALUE({ua_type}, {ua_type_enum}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
-#
-# if direction == "output":
-# formatdict["init"] += """
-# INIT_WRITE_VARIANT({ua_type}, {ua_type_enum}, {c_loc_name})""".format(**locals())
-# formatdict["publish"] += """
-# WRITE_VALUE({ua_type}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
+ INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
+
+ if direction == "input":
+ formatdict["init"] += """
+ INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals())
+ formatdict["retrieve"] += """
+ READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
+
+ if direction == "output":
+ # formatdict["init"] += " NOTHING ! publish doesn't need init. "
+ formatdict["publish"] += """
+ WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
Ccode = template.format(**formatdict)