# HG changeset patch # User Edouard Tisserant # Date 1720518379 -7200 # Node ID 883a85b9ebcca3d3e5867b935d285f404d24d8ec # Parent 466be4f52cb9de8d509c866e6441d323bdfbf861 MQTT: still WIP, generated C code builds and link. diff -r 466be4f52cb9 -r 883a85b9ebcc mqtt/client.py --- a/mqtt/client.py Tue Jul 09 11:44:49 2024 +0200 +++ b/mqtt/client.py Tue Jul 09 11:46:19 2024 +0200 @@ -10,9 +10,15 @@ import util.paths as paths -PahoMqttCPath = paths.ThirdPartyPath("MQTT") -PahoMqttCLibraryPath = PahoMqttCPath -PahoMqttCIncludePaths = [PahoMqttCPath] + +# assumes that "build" directory was created in paho.mqtt.c source directory, +# and cmake build was invoked from this directory +PahoMqttCLibraryPath = paths.ThirdPartyPath("paho.mqtt.c", "build", "src") + +PahoMqttCIncludePaths = [ + paths.ThirdPartyPath("paho.mqtt.c", "build"), # VersionInfo.h + paths.ThirdPartyPath("paho.mqtt.c", "src") +] class MQTTClientEditor(ConfTreeNodeEditor): CONFNODEEDITOR_TABS = [ @@ -107,7 +113,10 @@ locstr = "_".join(map(str, current_location)) c_path = os.path.join(buildpath, "mqtt_client__%s.c" % locstr) - c_code = '#include "beremiz.h"\n' + c_code = """ +#include "iec_types_all.h" +#include "beremiz.h" +""" c_code += self.modeldata.GenerateC(c_path, locstr, self.GetConfig()) with open(c_path, 'w') as c_file: diff -r 466be4f52cb9 -r 883a85b9ebcc mqtt/mqtt_client_gen.py --- a/mqtt/mqtt_client_gen.py Tue Jul 09 11:44:49 2024 +0200 +++ b/mqtt/mqtt_client_gen.py Tue Jul 09 11:46:19 2024 +0200 @@ -4,11 +4,12 @@ import csv import functools from threading import Thread +from collections import OrderedDict import wx import wx.dataview as dv -from perfect_hash import generate_code, IntSaltHash +# from perfect_hash import generate_code, IntSaltHash MQTT_IEC_types = dict( # IEC61131| C type | sz @@ -298,7 +299,7 @@ int G[] = {$G}; char *K[] = {$K}; """ - code = generate_code(topics, Hash=IntSaltHash, template=template) + code = generate_code(topics, Hash=IntSaltHash, template=template) code += """ /* return index of key in K if key is found, -1 otherwise */ int MQTT_Topic_index(const char *key) @@ -318,189 +319,188 @@ return -1; } """ - return code + return code def GenerateC(self, path, locstr, config): template = """/* code generated by beremiz MQTT extension */ -#include "MQTTAsync.h" +#include +#include + +#include "MQTTClient.h" #include "MQTTClientPersistence.h" #define _Log(level, ...) \\ {{ \\ - char mstr[256]; \\ - snprintf(mstr, 255, __VA_ARGS__); \\ - LogMessage(level, mstr, strlen(mstr)); \\ + /* char mstr[256]; */ \\ + /* snprintf(mstr, 255, __VA_ARGS__); */ \\ + /* LogMessage(level, mstr, strlen(mstr)); */ \\ + printf(__VA_ARGS__); \\ }} #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); -static inline void* loadFile(const char *const path) {{ - - FILE *fp = fopen(path, "rb"); - if(!fp) {{ - errno = 0; - LogError("MQTT could not open %s", path); - return NULL; +static MQTTClient client; +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; +static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent + +void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) +{{ + LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); +}} + +#define DECL_VAR(iec_type, C_type, c_loc_name) \\ +static C_type PLC_##c_loc_name##_buf = 0; \\ +static C_type MQTT_##c_loc_name##_buf = 0; \\ +C_type *c_loc_name = &PLC_##c_loc_name##_buf; + +{decl} + +#define INIT_TOPIC(topic, iec_type, c_loc_name) \\ +{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, + +static struct {{ + const char *topic; //null terminated topic string + void *mqtt_pdata; //data from/for MQTT stack + __IEC_types_enum vartype; +}} topics [] = {{ +{topics} +}}; + +static int _connect_mqtt(void) +{{ + int rc; + MQTTProperties props = MQTTProperties_initializer; + MQTTProperties willProps = MQTTProperties_initializer; + MQTTResponse response = MQTTResponse_initializer; + + response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); + rc = response.reasonCode; + MQTTResponse_free(response); + + return rc; +}} + +void __cleanup_{locstr}(void) +{{ + int rc; + + /* TODO stop publish thread */ + + if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) + {{ + LogError("MQTT Failed to disconnect, return code %d\\n", rc); }} - - fseek(fp, 0, SEEK_END); - size_t length = (size_t)ftell(fp); - void* data = malloc(length); - if(data) {{ - fseek(fp, 0, SEEK_SET); - size_t read = fread(data, 1, fileContents.length, fp); - if(read != length){{ - free(data); - LogError("MQTT could not read %s", path); - }} - }} else {{ - LogError("MQTT Not enough memoty to load %s", path); + MQTTClient_destroy(&client); +}} + +void connectionLost(void* context, char* reason) +{{ + LogWarning("ConnectionLost, reconnecting\\n"); + _connect_mqtt(); + /* TODO wait if error */ +}} + +int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) +{{ + /* TODO : something with message */ + printf("Message arrived\\n"); + printf(" topic: %s\\n", topicName); + printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +}} + +#define INIT_NoAuth() \\ + LogInfo("MQTT Init no auth"); + +#define INIT_x509(PrivateKey, Certificate) \\ + LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate); + /* TODO */ + +#define INIT_UserPassword(User, Password) \\ + LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\ + conn_opts.username = User; \\ + conn_opts.password = Password; + +#define INIT_SUBSCRIPTION(Topic, QoS) \\ + {{ \\ + MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\ + rc = response.reasonCode; \\ + MQTTResponse_free(response); \\ + if (rc != MQTTCLIENT_SUCCESS) \\ + {{ \\ + LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\ + }} \\ }} - fclose(fp); - - return data; -}} - -static MQTTClient client; -static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; - -void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) -{ - LogWarning("Paho MQTT Trace : %d, %s\n", level, message); -} - -#define DECL_VAR(iec_type, C_type, c_loc_name) \\ -static C_type c_loc_name##_buf = 0; \\ -C_type *c_loc_name = &c_loc_name##_buf; - -{decl} - -#define INIT_TOPIC(topic, iec_type, c_loc_name) \\ -{topic, &c_loc_name##_buf, iec_type##_ENUM}, - -ststic struct { - const char *topic; //null terminated topic string - void *pdata; //pointer to data - __IEC_types_enum vartype; -} topics [] = { -{topics} -} - -void __cleanup_{locstr}(void) -{{ - MQTT_Client_disconnect(client); - MQTT_Client_delete(client); -}} - -#define INIT_NoAuth() \\ - LogInfo("MQTT Init no auth"); \\ - MQTT_ClientConfig_setDefault(cc); \\ - retval = MQTT_Client_connect(client, uri); - -/* Note : Single policy is enforced here, by default open62541 client supports all policies */ -#define INIT_x509(Policy, UpperCaseMode, PrivateKey, Certificate) \\ - LogInfo("MQTT Init x509 %s,%s,%s,%s", #Policy, #UpperCaseMode, PrivateKey, Certificate); \\ - \\ - MQTT_ByteString certificate = loadFile(Certificate); \\ - MQTT_ByteString privateKey = loadFile(PrivateKey); \\ - \\ - cc->securityMode = MQTT_MESSAGESECURITYMODE_##UpperCaseMode; \\ - \\ - /* replacement for default behaviour */ \\ - /* MQTT_ClientConfig_setDefaultEncryption(cc, certificate, privateKey, NULL, 0, NULL, 0); */ \\ - do{{ \\ - retval = MQTT_ClientConfig_setDefault(cc); \\ - if(retval != MQTT_STATUSCODE_GOOD) \\ - break; \\ - \\ - MQTT_SecurityPolicy *sp = (MQTT_SecurityPolicy*) \\ - MQTT_realloc(cc->securityPolicies, sizeof(MQTT_SecurityPolicy) * 2); \\ - if(!sp){{ \\ - retval = MQTT_STATUSCODE_BADOUTOFMEMORY; \\ - break; \\ - }} \\ - cc->securityPolicies = sp; \\ - \\ - retval = MQTT_SecurityPolicy_##Policy(&cc->securityPolicies[cc->securityPoliciesSize], \\ - certificate, privateKey, &cc->logger); \\ - if(retval != MQTT_STATUSCODE_GOOD) {{ \\ - MQTT_LOG_WARNING(&cc->logger, MQTT_LOGCATEGORY_USERLAND, \\ - "Could not add SecurityPolicy Policy with error code %s", \\ - MQTT_StatusCode_name(retval)); \\ - MQTT_free(cc->securityPolicies); \\ - cc->securityPolicies = NULL; \\ - break; \\ - }} \\ - \\ - ++cc->securityPoliciesSize; \\ - }} while(0); \\ - \\ - retval = MQTT_Client_connect(client, uri); \\ - \\ - MQTT_ByteString_clear(&certificate); \\ - MQTT_ByteString_clear(&privateKey); - -#define INIT_UserPassword(User, Password) \\ - LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\ - MQTT_ClientConfig_setDefault(cc); \\ - retval = MQTT_Client_connectUsername(client, uri, User, Password); - -#define INIT_READ_VARIANT(ua_type, c_loc_name) \\ - MQTT_Variant_init(&c_loc_name##_variant); - -#define INIT_WRITE_VARIANT(ua_type, ua_type_enum, c_loc_name) \\ - MQTT_Variant_setScalar(&c_loc_name##_variant, (ua_type*)c_loc_name, &MQTT_TYPES[ua_type_enum]); int __init_{locstr}(int argc,char **argv) {{ char *uri = "{uri}"; char *clientID = "{clientID}"; int rc; - conn_opts = MQTTClient_connectOptions_initializer; - - if ((rc = MQTTClient_create(&client, uri, clientID, - MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) - { - printf("Failed to create client, return code %d\n", rc); - rc = EXIT_FAILURE; - goto exit; - } + + MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; + + conn_opts.MQTTVersion = MQTTVERSION_5; + conn_opts.cleanstart = 1; + + createOpts.MQTTVersion = MQTTVERSION_5; + + MQTTClient_setTraceCallback(trace_callback); + MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); + + + rc = MQTTClient_createWithOptions( + &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); + if (rc != MQTTCLIENT_SUCCESS) + {{ + LogError("MQTT Failed to create client, return code %d\\n", rc); + return rc; + }} + + rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); + if (rc != MQTTCLIENT_SUCCESS) + {{ + LogError("MQTT Failed to set callbacks %d", rc); + return rc; + }} {init} - if(retval != MQTT_STATUSCODE_GOOD) {{ - LogError("MQTT Init Failed %d", retval); - MQTT_Client_delete(client); - return EXIT_FAILURE; + rc = _connect_mqtt(); + + if (rc != MQTTCLIENT_SUCCESS) {{ + LogError("MQTT Init Failed %d", rc); + return rc; }} + /* TODO start publish thread */ + return 0; }} -#define READ_VALUE(ua_type, ua_type_enum, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\ - retval = MQTT_Client_readValueAttribute( \\ - client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant); \\ - if(retval == MQTT_STATUSCODE_GOOD && MQTT_Variant_isScalar(&c_loc_name##_variant) && \\ - c_loc_name##_variant.type == &MQTT_TYPES[ua_type_enum]) {{ \\ - c_loc_name##_buf = *(ua_type*)c_loc_name##_variant.data; \\ - MQTT_Variant_clear(&c_loc_name##_variant); /* Unalloc requiered on each read ! */ \\ - }} +#define READ_VALUE(c_loc_name, C_type) \\ + PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; void __retrieve_{locstr}(void) {{ - MQTT_StatusCode retval; + /* TODO try take mutex */ {retrieve} -}} - -#define WRITE_VALUE(ua_type, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\ - MQTT_Client_writeValueAttribute( \\ - client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant); + /* TODO free mutex */ +}} + +#define WRITE_VALUE(c_loc_name, C_type) \\ + MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; void __publish_{locstr}(void) {{ + /* TODO try take mutex */ {publish} + /* TODO free mutex */ + /* TODO unblock publish thread */ }} """ @@ -528,11 +528,11 @@ formatdict["init"] += """ INIT_NoAuth()""" - topics = OrderedDict() + topics = OrderedDict() for direction, data in self.items(): iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction] for row in data: - Topic, QoS, Retain, iec_type, iec_number = row + Topic, QoS, Retain, iec_type, iec_number = row C_type, iec_size_prefix = MQTT_IEC_types[iec_type] c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number) @@ -540,19 +540,18 @@ DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals()) formatdict["topics"] += """ -INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals()) -# -# if direction == "input": -# formatdict["init"] += """ -# INIT_READ_VARIANT({ua_type}, {c_loc_name})""".format(**locals()) -# formatdict["retrieve"] += """ -# READ_VALUE({ua_type}, {ua_type_enum}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals()) -# -# if direction == "output": -# formatdict["init"] += """ -# INIT_WRITE_VARIANT({ua_type}, {ua_type_enum}, {c_loc_name})""".format(**locals()) -# formatdict["publish"] += """ -# WRITE_VALUE({ua_type}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals()) + INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals()) + + if direction == "input": + formatdict["init"] += """ + INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals()) + formatdict["retrieve"] += """ + READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) + + if direction == "output": + # formatdict["init"] += " NOTHING ! publish doesn't need init. " + formatdict["publish"] += """ + WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) Ccode = template.format(**formatdict)