mqtt/mqtt_client_gen.py
changeset 3984 883a85b9ebcc
parent 3981 74035ea6792c
child 3986 98bd0bb33ce4
equal deleted inserted replaced
3983:466be4f52cb9 3984:883a85b9ebcc
     2 from __future__ import absolute_import
     2 from __future__ import absolute_import
     3 
     3 
     4 import csv
     4 import csv
     5 import functools
     5 import functools
     6 from threading import Thread
     6 from threading import Thread
       
     7 from collections import OrderedDict
     7 
     8 
     8 import wx
     9 import wx
     9 import wx.dataview as dv
    10 import wx.dataview as dv
    10 
    11 
    11 from perfect_hash import generate_code, IntSaltHash
    12 # from perfect_hash import generate_code, IntSaltHash
    12 
    13 
    13 MQTT_IEC_types = dict(
    14 MQTT_IEC_types = dict(
    14 # IEC61131|  C  type   | sz
    15 # IEC61131|  C  type   | sz
    15     BOOL  = ("uint8_t" , "X"),
    16     BOOL  = ("uint8_t" , "X"),
    16     SINT  = ("int8_t"  , "B"),
    17     SINT  = ("int8_t"  , "B"),
   296 int S1[] = {$S1};
   297 int S1[] = {$S1};
   297 int S2[] = {$S2};
   298 int S2[] = {$S2};
   298 int G[] = {$G};
   299 int G[] = {$G};
   299 char *K[] = {$K};
   300 char *K[] = {$K};
   300 """
   301 """
   301 		code = generate_code(topics, Hash=IntSaltHash, template=template)
   302         code = generate_code(topics, Hash=IntSaltHash, template=template)
   302         code += """
   303         code += """
   303 /* return index of key in K if key is found, -1 otherwise */
   304 /* return index of key in K if key is found, -1 otherwise */
   304 int MQTT_Topic_index(const char *key)
   305 int MQTT_Topic_index(const char *key)
   305 {
   306 {
   306     int i, f1 = 0, f2 = 0;
   307     int i, f1 = 0, f2 = 0;
   316         return i;
   317         return i;
   317 
   318 
   318     return -1;
   319     return -1;
   319 }
   320 }
   320 """ 
   321 """ 
   321 		return code
   322         return code
   322 
   323 
   323     def GenerateC(self, path, locstr, config):
   324     def GenerateC(self, path, locstr, config):
   324         template = """/* code generated by beremiz MQTT extension */
   325         template = """/* code generated by beremiz MQTT extension */
   325 
   326 
   326 #include "MQTTAsync.h"
   327 #include <stdint.h>
       
   328 #include <pthread.h>
       
   329 
       
   330 #include "MQTTClient.h"
   327 #include "MQTTClientPersistence.h"
   331 #include "MQTTClientPersistence.h"
   328 
   332 
   329 #define _Log(level, ...)                                                                           \\
   333 #define _Log(level, ...)                                                                           \\
   330     {{                                                                                             \\
   334     {{                                                                                             \\
   331         char mstr[256];                                                                            \\
   335         /* char mstr[256];                          */                                            \\
   332         snprintf(mstr, 255, __VA_ARGS__);                                                          \\
   336         /* snprintf(mstr, 255, __VA_ARGS__);        */                                            \\
   333         LogMessage(level, mstr, strlen(mstr));                                                     \\
   337         /* LogMessage(level, mstr, strlen(mstr));   */                                            \\
       
   338         printf(__VA_ARGS__);                                                          \\
   334     }}
   339     }}
   335 
   340 
   336 #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
   341 #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
   337 #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
   342 #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
   338 #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
   343 #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
   339 
   344 
   340 static inline void* loadFile(const char *const path) {{
   345 static MQTTClient client;
   341 
   346 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
   342     FILE *fp = fopen(path, "rb");
   347 static pthread_mutex_t clientMutex;  // mutex to keep PLC data consistent
   343     if(!fp) {{
   348 
   344         errno = 0;
   349 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
   345         LogError("MQTT could not open %s", path);
   350 {{
   346         return NULL;
   351     LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
       
   352 }}
       
   353 
       
   354 #define DECL_VAR(iec_type, C_type, c_loc_name)                                                     \\
       
   355 static C_type PLC_##c_loc_name##_buf = 0;                                                          \\
       
   356 static C_type MQTT_##c_loc_name##_buf = 0;                                                         \\
       
   357 C_type *c_loc_name = &PLC_##c_loc_name##_buf;
       
   358 
       
   359 {decl}
       
   360 
       
   361 #define INIT_TOPIC(topic, iec_type, c_loc_name)                                            \\
       
   362 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
       
   363 
       
   364 static struct {{
       
   365     const char *topic; //null terminated topic string
       
   366     void *mqtt_pdata; //data from/for MQTT stack
       
   367     __IEC_types_enum vartype;
       
   368 }} topics [] = {{
       
   369 {topics}
       
   370 }};
       
   371 
       
   372 static int _connect_mqtt(void)
       
   373 {{
       
   374     int rc;
       
   375     MQTTProperties props = MQTTProperties_initializer;
       
   376     MQTTProperties willProps = MQTTProperties_initializer;
       
   377     MQTTResponse response = MQTTResponse_initializer;
       
   378 
       
   379     response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
       
   380     rc = response.reasonCode;
       
   381     MQTTResponse_free(response);
       
   382 
       
   383 	return rc;
       
   384 }}
       
   385 
       
   386 void __cleanup_{locstr}(void)
       
   387 {{
       
   388     int rc;
       
   389 
       
   390     /* TODO stop publish thread */
       
   391 
       
   392     if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS)
       
   393     {{
       
   394         LogError("MQTT Failed to disconnect, return code %d\\n", rc);
   347     }}
   395     }}
   348 
   396     MQTTClient_destroy(&client);
   349     fseek(fp, 0, SEEK_END);
   397 }}
   350     size_t length = (size_t)ftell(fp);
   398 
   351     void* data = malloc(length);
   399 void connectionLost(void* context, char* reason)
   352     if(data) {{
   400 {{
   353         fseek(fp, 0, SEEK_SET);
   401     LogWarning("ConnectionLost, reconnecting\\n");
   354         size_t read = fread(data, 1, fileContents.length, fp);
   402 	_connect_mqtt();
   355         if(read != length){{
   403     /* TODO wait if error */
   356             free(data);
   404 }}
   357             LogError("MQTT could not read %s", path);
   405 
   358         }}
   406 int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
   359     }} else {{
   407 {{
   360         LogError("MQTT Not enough memoty to load %s", path);
   408     /* TODO : something with message */
       
   409     printf("Message arrived\\n");
       
   410     printf("     topic: %s\\n", topicName);
       
   411     printf("   message: %.*s\\n", message->payloadlen, (char*)message->payload);
       
   412     MQTTClient_freeMessage(&message);
       
   413     MQTTClient_free(topicName);
       
   414     return 1;
       
   415 }}
       
   416 
       
   417 #define INIT_NoAuth()                                                                             \\
       
   418     LogInfo("MQTT Init no auth");
       
   419 
       
   420 #define INIT_x509(PrivateKey, Certificate)                                                        \\
       
   421     LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate);
       
   422     /* TODO */
       
   423 
       
   424 #define INIT_UserPassword(User, Password)                                                         \\
       
   425     LogInfo("MQTT Init UserPassword %s,%s", User, Password);                                      \\
       
   426 	conn_opts.username = User;                                                                    \\
       
   427 	conn_opts.password = Password;
       
   428 
       
   429 #define INIT_SUBSCRIPTION(Topic, QoS)                                                             \\
       
   430     {{                                                                                            \\
       
   431         MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL);            \\
       
   432         rc = response.reasonCode;                                                                 \\
       
   433         MQTTResponse_free(response);                                                              \\
       
   434         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
       
   435         {{                                                                                        \\
       
   436             LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\
       
   437         }}                                                                                        \\
   361     }}
   438     }}
   362     fclose(fp);
       
   363 
       
   364     return data;
       
   365 }}
       
   366 
       
   367 static MQTTClient client;
       
   368 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
       
   369 
       
   370 void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
       
   371 {
       
   372 	LogWarning("Paho MQTT Trace : %d, %s\n", level, message);
       
   373 }
       
   374 
       
   375 #define DECL_VAR(iec_type, C_type, c_loc_name)                                                       \\
       
   376 static C_type c_loc_name##_buf = 0;                                                                 \\
       
   377 C_type *c_loc_name = &c_loc_name##_buf;
       
   378 
       
   379 {decl}
       
   380 
       
   381 #define INIT_TOPIC(topic, iec_type, c_loc_name)                                                  \\
       
   382 {topic, &c_loc_name##_buf, iec_type##_ENUM},
       
   383 
       
   384 ststic struct {
       
   385 	const char *topic; //null terminated topic string
       
   386 	void *pdata; //pointer to data
       
   387 	__IEC_types_enum vartype;
       
   388 } topics [] = {
       
   389 {topics}
       
   390 }
       
   391 
       
   392 void __cleanup_{locstr}(void)
       
   393 {{
       
   394     MQTT_Client_disconnect(client);
       
   395     MQTT_Client_delete(client);
       
   396 }}
       
   397 
       
   398 #define INIT_NoAuth()                                                                              \\
       
   399     LogInfo("MQTT Init no auth");                                                                \\
       
   400     MQTT_ClientConfig_setDefault(cc);                                                                \\
       
   401     retval = MQTT_Client_connect(client, uri);
       
   402 
       
   403 /* Note : Single policy is enforced here, by default open62541 client supports all policies */
       
   404 #define INIT_x509(Policy, UpperCaseMode, PrivateKey, Certificate)                                  \\
       
   405     LogInfo("MQTT Init x509 %s,%s,%s,%s", #Policy, #UpperCaseMode, PrivateKey, Certificate);     \\
       
   406                                                                                                    \\
       
   407     MQTT_ByteString certificate = loadFile(Certificate);                                             \\
       
   408     MQTT_ByteString privateKey  = loadFile(PrivateKey);                                              \\
       
   409                                                                                                    \\
       
   410     cc->securityMode = MQTT_MESSAGESECURITYMODE_##UpperCaseMode;                                     \\
       
   411                                                                                                    \\
       
   412     /* replacement for default behaviour */                                                        \\
       
   413     /* MQTT_ClientConfig_setDefaultEncryption(cc, certificate, privateKey, NULL, 0, NULL, 0); */     \\
       
   414     do{{                                                                                           \\
       
   415         retval = MQTT_ClientConfig_setDefault(cc);                                                   \\
       
   416         if(retval != MQTT_STATUSCODE_GOOD)                                                           \\
       
   417             break;                                                                                 \\
       
   418                                                                                                    \\
       
   419         MQTT_SecurityPolicy *sp = (MQTT_SecurityPolicy*)                                               \\
       
   420             MQTT_realloc(cc->securityPolicies, sizeof(MQTT_SecurityPolicy) * 2);                       \\
       
   421         if(!sp){{                                                                                  \\
       
   422             retval = MQTT_STATUSCODE_BADOUTOFMEMORY;                                                 \\
       
   423             break;                                                                                 \\
       
   424         }}                                                                                         \\
       
   425         cc->securityPolicies = sp;                                                                 \\
       
   426                                                                                                    \\
       
   427         retval = MQTT_SecurityPolicy_##Policy(&cc->securityPolicies[cc->securityPoliciesSize],       \\
       
   428                                                  certificate, privateKey, &cc->logger);            \\
       
   429         if(retval != MQTT_STATUSCODE_GOOD) {{                                                        \\
       
   430             MQTT_LOG_WARNING(&cc->logger, MQTT_LOGCATEGORY_USERLAND,                                   \\
       
   431                            "Could not add SecurityPolicy Policy with error code %s",               \\
       
   432                            MQTT_StatusCode_name(retval));                                            \\
       
   433             MQTT_free(cc->securityPolicies);                                                         \\
       
   434             cc->securityPolicies = NULL;                                                           \\
       
   435             break;                                                                                 \\
       
   436         }}                                                                                         \\
       
   437                                                                                                    \\
       
   438         ++cc->securityPoliciesSize;                                                                \\
       
   439     }} while(0);                                                                                   \\
       
   440                                                                                                    \\
       
   441     retval = MQTT_Client_connect(client, uri);                                                       \\
       
   442                                                                                                    \\
       
   443     MQTT_ByteString_clear(&certificate);                                                             \\
       
   444     MQTT_ByteString_clear(&privateKey);
       
   445 
       
   446 #define INIT_UserPassword(User, Password)                                                          \\
       
   447     LogInfo("MQTT Init UserPassword %s,%s", User, Password);                                     \\
       
   448     MQTT_ClientConfig_setDefault(cc);                                                                \\
       
   449     retval = MQTT_Client_connectUsername(client, uri, User, Password);
       
   450 
       
   451 #define INIT_READ_VARIANT(ua_type, c_loc_name)                                                     \\
       
   452     MQTT_Variant_init(&c_loc_name##_variant);
       
   453 
       
   454 #define INIT_WRITE_VARIANT(ua_type, ua_type_enum, c_loc_name)                                      \\
       
   455     MQTT_Variant_setScalar(&c_loc_name##_variant, (ua_type*)c_loc_name, &MQTT_TYPES[ua_type_enum]);
       
   456 
   439 
   457 int __init_{locstr}(int argc,char **argv)
   440 int __init_{locstr}(int argc,char **argv)
   458 {{
   441 {{
   459     char *uri = "{uri}";
   442     char *uri = "{uri}";
   460     char *clientID = "{clientID}";
   443     char *clientID = "{clientID}";
   461     int rc;
   444     int rc;
   462     conn_opts = MQTTClient_connectOptions_initializer;
   445 
   463 
   446 	MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
   464     if ((rc = MQTTClient_create(&client, uri, clientID,
   447 
   465         MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
   448 	conn_opts.MQTTVersion = MQTTVERSION_5;
   466     {
   449     conn_opts.cleanstart = 1;
   467         printf("Failed to create client, return code %d\n", rc);
   450 
   468         rc = EXIT_FAILURE;
   451     createOpts.MQTTVersion = MQTTVERSION_5;
   469         goto exit;
   452 
   470     }
   453     MQTTClient_setTraceCallback(trace_callback);
       
   454     MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR);
       
   455 
       
   456 
       
   457 	rc = MQTTClient_createWithOptions(
       
   458         &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts);
       
   459 	if (rc != MQTTCLIENT_SUCCESS)
       
   460 	{{
       
   461         LogError("MQTT Failed to create client, return code %d\\n", rc);
       
   462         return rc;
       
   463     }}
       
   464 
       
   465 	rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL);
       
   466 	if (rc != MQTTCLIENT_SUCCESS)
       
   467 	{{
       
   468         LogError("MQTT Failed to set callbacks %d", rc);
       
   469         return rc;
       
   470 	}}
   471 
   471 
   472 {init}
   472 {init}
   473 
   473 
   474     if(retval != MQTT_STATUSCODE_GOOD) {{
   474 	rc = _connect_mqtt();
   475         LogError("MQTT Init Failed %d", retval);
   475 
   476         MQTT_Client_delete(client);
   476 	if (rc != MQTTCLIENT_SUCCESS) {{
   477         return EXIT_FAILURE;
   477         LogError("MQTT Init Failed %d", rc);
       
   478         return rc;
   478     }}
   479     }}
       
   480     /* TODO start publish thread */
       
   481 
   479     return 0;
   482     return 0;
   480 }}
   483 }}
   481 
   484 
   482 #define READ_VALUE(ua_type, ua_type_enum, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id)        \\
   485 #define READ_VALUE(c_loc_name, C_type) \\
   483     retval = MQTT_Client_readValueAttribute(                                                         \\
   486     PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf;
   484         client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant);                      \\
       
   485     if(retval == MQTT_STATUSCODE_GOOD && MQTT_Variant_isScalar(&c_loc_name##_variant) &&               \\
       
   486        c_loc_name##_variant.type == &MQTT_TYPES[ua_type_enum]) {{                                    \\
       
   487             c_loc_name##_buf = *(ua_type*)c_loc_name##_variant.data;                               \\
       
   488             MQTT_Variant_clear(&c_loc_name##_variant);  /* Unalloc requiered on each read ! */       \\
       
   489     }}
       
   490 
   487 
   491 void __retrieve_{locstr}(void)
   488 void __retrieve_{locstr}(void)
   492 {{
   489 {{
   493     MQTT_StatusCode retval;
   490     /* TODO try take mutex */
   494 {retrieve}
   491 {retrieve}
   495 }}
   492     /* TODO free mutex */
   496 
   493 }}
   497 #define WRITE_VALUE(ua_type, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id)                     \\
   494 
   498     MQTT_Client_writeValueAttribute(                                                                 \\
   495 #define WRITE_VALUE(c_loc_name, C_type) \\
   499         client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant);
   496     MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf;
   500 
   497 
   501 void __publish_{locstr}(void)
   498 void __publish_{locstr}(void)
   502 {{
   499 {{
       
   500     /* TODO try take mutex */
   503 {publish}
   501 {publish}
       
   502     /* TODO free mutex */
       
   503     /* TODO unblock publish thread */
   504 }}
   504 }}
   505 
   505 
   506 """
   506 """
   507         
   507         
   508         formatdict = dict(
   508         formatdict = dict(
   526     INIT_UserPassword("{User}", "{Password}")""".format(**config)
   526     INIT_UserPassword("{User}", "{Password}")""".format(**config)
   527         else:
   527         else:
   528             formatdict["init"] += """
   528             formatdict["init"] += """
   529     INIT_NoAuth()"""
   529     INIT_NoAuth()"""
   530 
   530 
   531 		topics = OrderedDict()
   531         topics = OrderedDict()
   532         for direction, data in self.items():
   532         for direction, data in self.items():
   533             iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
   533             iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
   534             for row in data:
   534             for row in data:
   535 				Topic, QoS, Retain, iec_type, iec_number = row
   535                 Topic, QoS, Retain, iec_type, iec_number = row
   536                 C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
   536                 C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
   537                 c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
   537                 c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
   538 
   538 
   539                 formatdict["decl"] += """
   539                 formatdict["decl"] += """
   540 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
   540 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
   541 
   541 
   542                 formatdict["topics"] += """
   542                 formatdict["topics"] += """
   543 INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
   543     INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
   544 #
   544 
   545 #                if direction == "input":
   545                 if direction == "input":
   546 #                    formatdict["init"] += """
   546                     formatdict["init"] += """
   547 #    INIT_READ_VARIANT({ua_type}, {c_loc_name})""".format(**locals())
   547     INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals())
   548 #                    formatdict["retrieve"] += """
   548                     formatdict["retrieve"] += """
   549 #    READ_VALUE({ua_type}, {ua_type_enum}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
   549     READ_VALUE({c_loc_name}, {C_type})""".format(**locals())
   550 #
   550 
   551 #                if direction == "output":
   551                 if direction == "output":
   552 #                    formatdict["init"] += """
   552                     # formatdict["init"] += " NOTHING ! publish doesn't need init. "
   553 #    INIT_WRITE_VARIANT({ua_type}, {ua_type_enum}, {c_loc_name})""".format(**locals())
   553                     formatdict["publish"] += """
   554 #                    formatdict["publish"] += """
   554     WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
   555 #    WRITE_VALUE({ua_type}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
       
   556 
   555 
   557         Ccode = template.format(**formatdict)
   556         Ccode = template.format(**formatdict)
   558         
   557         
   559         return Ccode
   558         return Ccode
   560 
   559