316 return i; |
317 return i; |
317 |
318 |
318 return -1; |
319 return -1; |
319 } |
320 } |
320 """ |
321 """ |
321 return code |
322 return code |
322 |
323 |
323 def GenerateC(self, path, locstr, config): |
324 def GenerateC(self, path, locstr, config): |
324 template = """/* code generated by beremiz MQTT extension */ |
325 template = """/* code generated by beremiz MQTT extension */ |
325 |
326 |
326 #include "MQTTAsync.h" |
327 #include <stdint.h> |
|
328 #include <pthread.h> |
|
329 |
|
330 #include "MQTTClient.h" |
327 #include "MQTTClientPersistence.h" |
331 #include "MQTTClientPersistence.h" |
328 |
332 |
329 #define _Log(level, ...) \\ |
333 #define _Log(level, ...) \\ |
330 {{ \\ |
334 {{ \\ |
331 char mstr[256]; \\ |
335 /* char mstr[256]; */ \\ |
332 snprintf(mstr, 255, __VA_ARGS__); \\ |
336 /* snprintf(mstr, 255, __VA_ARGS__); */ \\ |
333 LogMessage(level, mstr, strlen(mstr)); \\ |
337 /* LogMessage(level, mstr, strlen(mstr)); */ \\ |
|
338 printf(__VA_ARGS__); \\ |
334 }} |
339 }} |
335 |
340 |
336 #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); |
341 #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); |
337 #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); |
342 #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); |
338 #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); |
343 #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); |
339 |
344 |
340 static inline void* loadFile(const char *const path) {{ |
345 static MQTTClient client; |
341 |
346 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; |
342 FILE *fp = fopen(path, "rb"); |
347 static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent |
343 if(!fp) {{ |
348 |
344 errno = 0; |
349 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) |
345 LogError("MQTT could not open %s", path); |
350 {{ |
346 return NULL; |
351 LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); |
|
352 }} |
|
353 |
|
354 #define DECL_VAR(iec_type, C_type, c_loc_name) \\ |
|
355 static C_type PLC_##c_loc_name##_buf = 0; \\ |
|
356 static C_type MQTT_##c_loc_name##_buf = 0; \\ |
|
357 C_type *c_loc_name = &PLC_##c_loc_name##_buf; |
|
358 |
|
359 {decl} |
|
360 |
|
361 #define INIT_TOPIC(topic, iec_type, c_loc_name) \\ |
|
362 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, |
|
363 |
|
364 static struct {{ |
|
365 const char *topic; //null terminated topic string |
|
366 void *mqtt_pdata; //data from/for MQTT stack |
|
367 __IEC_types_enum vartype; |
|
368 }} topics [] = {{ |
|
369 {topics} |
|
370 }}; |
|
371 |
|
372 static int _connect_mqtt(void) |
|
373 {{ |
|
374 int rc; |
|
375 MQTTProperties props = MQTTProperties_initializer; |
|
376 MQTTProperties willProps = MQTTProperties_initializer; |
|
377 MQTTResponse response = MQTTResponse_initializer; |
|
378 |
|
379 response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); |
|
380 rc = response.reasonCode; |
|
381 MQTTResponse_free(response); |
|
382 |
|
383 return rc; |
|
384 }} |
|
385 |
|
386 void __cleanup_{locstr}(void) |
|
387 {{ |
|
388 int rc; |
|
389 |
|
390 /* TODO stop publish thread */ |
|
391 |
|
392 if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) |
|
393 {{ |
|
394 LogError("MQTT Failed to disconnect, return code %d\\n", rc); |
347 }} |
395 }} |
348 |
396 MQTTClient_destroy(&client); |
349 fseek(fp, 0, SEEK_END); |
397 }} |
350 size_t length = (size_t)ftell(fp); |
398 |
351 void* data = malloc(length); |
399 void connectionLost(void* context, char* reason) |
352 if(data) {{ |
400 {{ |
353 fseek(fp, 0, SEEK_SET); |
401 LogWarning("ConnectionLost, reconnecting\\n"); |
354 size_t read = fread(data, 1, fileContents.length, fp); |
402 _connect_mqtt(); |
355 if(read != length){{ |
403 /* TODO wait if error */ |
356 free(data); |
404 }} |
357 LogError("MQTT could not read %s", path); |
405 |
358 }} |
406 int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) |
359 }} else {{ |
407 {{ |
360 LogError("MQTT Not enough memoty to load %s", path); |
408 /* TODO : something with message */ |
|
409 printf("Message arrived\\n"); |
|
410 printf(" topic: %s\\n", topicName); |
|
411 printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload); |
|
412 MQTTClient_freeMessage(&message); |
|
413 MQTTClient_free(topicName); |
|
414 return 1; |
|
415 }} |
|
416 |
|
417 #define INIT_NoAuth() \\ |
|
418 LogInfo("MQTT Init no auth"); |
|
419 |
|
420 #define INIT_x509(PrivateKey, Certificate) \\ |
|
421 LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate); |
|
422 /* TODO */ |
|
423 |
|
424 #define INIT_UserPassword(User, Password) \\ |
|
425 LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\ |
|
426 conn_opts.username = User; \\ |
|
427 conn_opts.password = Password; |
|
428 |
|
429 #define INIT_SUBSCRIPTION(Topic, QoS) \\ |
|
430 {{ \\ |
|
431 MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\ |
|
432 rc = response.reasonCode; \\ |
|
433 MQTTResponse_free(response); \\ |
|
434 if (rc != MQTTCLIENT_SUCCESS) \\ |
|
435 {{ \\ |
|
436 LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\ |
|
437 }} \\ |
361 }} |
438 }} |
362 fclose(fp); |
|
363 |
|
364 return data; |
|
365 }} |
|
366 |
|
367 static MQTTClient client; |
|
368 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; |
|
369 |
|
370 void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) |
|
371 { |
|
372 LogWarning("Paho MQTT Trace : %d, %s\n", level, message); |
|
373 } |
|
374 |
|
375 #define DECL_VAR(iec_type, C_type, c_loc_name) \\ |
|
376 static C_type c_loc_name##_buf = 0; \\ |
|
377 C_type *c_loc_name = &c_loc_name##_buf; |
|
378 |
|
379 {decl} |
|
380 |
|
381 #define INIT_TOPIC(topic, iec_type, c_loc_name) \\ |
|
382 {topic, &c_loc_name##_buf, iec_type##_ENUM}, |
|
383 |
|
384 ststic struct { |
|
385 const char *topic; //null terminated topic string |
|
386 void *pdata; //pointer to data |
|
387 __IEC_types_enum vartype; |
|
388 } topics [] = { |
|
389 {topics} |
|
390 } |
|
391 |
|
392 void __cleanup_{locstr}(void) |
|
393 {{ |
|
394 MQTT_Client_disconnect(client); |
|
395 MQTT_Client_delete(client); |
|
396 }} |
|
397 |
|
398 #define INIT_NoAuth() \\ |
|
399 LogInfo("MQTT Init no auth"); \\ |
|
400 MQTT_ClientConfig_setDefault(cc); \\ |
|
401 retval = MQTT_Client_connect(client, uri); |
|
402 |
|
403 /* Note : Single policy is enforced here, by default open62541 client supports all policies */ |
|
404 #define INIT_x509(Policy, UpperCaseMode, PrivateKey, Certificate) \\ |
|
405 LogInfo("MQTT Init x509 %s,%s,%s,%s", #Policy, #UpperCaseMode, PrivateKey, Certificate); \\ |
|
406 \\ |
|
407 MQTT_ByteString certificate = loadFile(Certificate); \\ |
|
408 MQTT_ByteString privateKey = loadFile(PrivateKey); \\ |
|
409 \\ |
|
410 cc->securityMode = MQTT_MESSAGESECURITYMODE_##UpperCaseMode; \\ |
|
411 \\ |
|
412 /* replacement for default behaviour */ \\ |
|
413 /* MQTT_ClientConfig_setDefaultEncryption(cc, certificate, privateKey, NULL, 0, NULL, 0); */ \\ |
|
414 do{{ \\ |
|
415 retval = MQTT_ClientConfig_setDefault(cc); \\ |
|
416 if(retval != MQTT_STATUSCODE_GOOD) \\ |
|
417 break; \\ |
|
418 \\ |
|
419 MQTT_SecurityPolicy *sp = (MQTT_SecurityPolicy*) \\ |
|
420 MQTT_realloc(cc->securityPolicies, sizeof(MQTT_SecurityPolicy) * 2); \\ |
|
421 if(!sp){{ \\ |
|
422 retval = MQTT_STATUSCODE_BADOUTOFMEMORY; \\ |
|
423 break; \\ |
|
424 }} \\ |
|
425 cc->securityPolicies = sp; \\ |
|
426 \\ |
|
427 retval = MQTT_SecurityPolicy_##Policy(&cc->securityPolicies[cc->securityPoliciesSize], \\ |
|
428 certificate, privateKey, &cc->logger); \\ |
|
429 if(retval != MQTT_STATUSCODE_GOOD) {{ \\ |
|
430 MQTT_LOG_WARNING(&cc->logger, MQTT_LOGCATEGORY_USERLAND, \\ |
|
431 "Could not add SecurityPolicy Policy with error code %s", \\ |
|
432 MQTT_StatusCode_name(retval)); \\ |
|
433 MQTT_free(cc->securityPolicies); \\ |
|
434 cc->securityPolicies = NULL; \\ |
|
435 break; \\ |
|
436 }} \\ |
|
437 \\ |
|
438 ++cc->securityPoliciesSize; \\ |
|
439 }} while(0); \\ |
|
440 \\ |
|
441 retval = MQTT_Client_connect(client, uri); \\ |
|
442 \\ |
|
443 MQTT_ByteString_clear(&certificate); \\ |
|
444 MQTT_ByteString_clear(&privateKey); |
|
445 |
|
446 #define INIT_UserPassword(User, Password) \\ |
|
447 LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\ |
|
448 MQTT_ClientConfig_setDefault(cc); \\ |
|
449 retval = MQTT_Client_connectUsername(client, uri, User, Password); |
|
450 |
|
451 #define INIT_READ_VARIANT(ua_type, c_loc_name) \\ |
|
452 MQTT_Variant_init(&c_loc_name##_variant); |
|
453 |
|
454 #define INIT_WRITE_VARIANT(ua_type, ua_type_enum, c_loc_name) \\ |
|
455 MQTT_Variant_setScalar(&c_loc_name##_variant, (ua_type*)c_loc_name, &MQTT_TYPES[ua_type_enum]); |
|
456 |
439 |
457 int __init_{locstr}(int argc,char **argv) |
440 int __init_{locstr}(int argc,char **argv) |
458 {{ |
441 {{ |
459 char *uri = "{uri}"; |
442 char *uri = "{uri}"; |
460 char *clientID = "{clientID}"; |
443 char *clientID = "{clientID}"; |
461 int rc; |
444 int rc; |
462 conn_opts = MQTTClient_connectOptions_initializer; |
445 |
463 |
446 MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; |
464 if ((rc = MQTTClient_create(&client, uri, clientID, |
447 |
465 MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) |
448 conn_opts.MQTTVersion = MQTTVERSION_5; |
466 { |
449 conn_opts.cleanstart = 1; |
467 printf("Failed to create client, return code %d\n", rc); |
450 |
468 rc = EXIT_FAILURE; |
451 createOpts.MQTTVersion = MQTTVERSION_5; |
469 goto exit; |
452 |
470 } |
453 MQTTClient_setTraceCallback(trace_callback); |
|
454 MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); |
|
455 |
|
456 |
|
457 rc = MQTTClient_createWithOptions( |
|
458 &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); |
|
459 if (rc != MQTTCLIENT_SUCCESS) |
|
460 {{ |
|
461 LogError("MQTT Failed to create client, return code %d\\n", rc); |
|
462 return rc; |
|
463 }} |
|
464 |
|
465 rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); |
|
466 if (rc != MQTTCLIENT_SUCCESS) |
|
467 {{ |
|
468 LogError("MQTT Failed to set callbacks %d", rc); |
|
469 return rc; |
|
470 }} |
471 |
471 |
472 {init} |
472 {init} |
473 |
473 |
474 if(retval != MQTT_STATUSCODE_GOOD) {{ |
474 rc = _connect_mqtt(); |
475 LogError("MQTT Init Failed %d", retval); |
475 |
476 MQTT_Client_delete(client); |
476 if (rc != MQTTCLIENT_SUCCESS) {{ |
477 return EXIT_FAILURE; |
477 LogError("MQTT Init Failed %d", rc); |
|
478 return rc; |
478 }} |
479 }} |
|
480 /* TODO start publish thread */ |
|
481 |
479 return 0; |
482 return 0; |
480 }} |
483 }} |
481 |
484 |
482 #define READ_VALUE(ua_type, ua_type_enum, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\ |
485 #define READ_VALUE(c_loc_name, C_type) \\ |
483 retval = MQTT_Client_readValueAttribute( \\ |
486 PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; |
484 client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant); \\ |
|
485 if(retval == MQTT_STATUSCODE_GOOD && MQTT_Variant_isScalar(&c_loc_name##_variant) && \\ |
|
486 c_loc_name##_variant.type == &MQTT_TYPES[ua_type_enum]) {{ \\ |
|
487 c_loc_name##_buf = *(ua_type*)c_loc_name##_variant.data; \\ |
|
488 MQTT_Variant_clear(&c_loc_name##_variant); /* Unalloc requiered on each read ! */ \\ |
|
489 }} |
|
490 |
487 |
491 void __retrieve_{locstr}(void) |
488 void __retrieve_{locstr}(void) |
492 {{ |
489 {{ |
493 MQTT_StatusCode retval; |
490 /* TODO try take mutex */ |
494 {retrieve} |
491 {retrieve} |
495 }} |
492 /* TODO free mutex */ |
496 |
493 }} |
497 #define WRITE_VALUE(ua_type, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\ |
494 |
498 MQTT_Client_writeValueAttribute( \\ |
495 #define WRITE_VALUE(c_loc_name, C_type) \\ |
499 client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant); |
496 MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; |
500 |
497 |
501 void __publish_{locstr}(void) |
498 void __publish_{locstr}(void) |
502 {{ |
499 {{ |
|
500 /* TODO try take mutex */ |
503 {publish} |
501 {publish} |
|
502 /* TODO free mutex */ |
|
503 /* TODO unblock publish thread */ |
504 }} |
504 }} |
505 |
505 |
506 """ |
506 """ |
507 |
507 |
508 formatdict = dict( |
508 formatdict = dict( |
526 INIT_UserPassword("{User}", "{Password}")""".format(**config) |
526 INIT_UserPassword("{User}", "{Password}")""".format(**config) |
527 else: |
527 else: |
528 formatdict["init"] += """ |
528 formatdict["init"] += """ |
529 INIT_NoAuth()""" |
529 INIT_NoAuth()""" |
530 |
530 |
531 topics = OrderedDict() |
531 topics = OrderedDict() |
532 for direction, data in self.items(): |
532 for direction, data in self.items(): |
533 iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction] |
533 iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction] |
534 for row in data: |
534 for row in data: |
535 Topic, QoS, Retain, iec_type, iec_number = row |
535 Topic, QoS, Retain, iec_type, iec_number = row |
536 C_type, iec_size_prefix = MQTT_IEC_types[iec_type] |
536 C_type, iec_size_prefix = MQTT_IEC_types[iec_type] |
537 c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number) |
537 c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number) |
538 |
538 |
539 formatdict["decl"] += """ |
539 formatdict["decl"] += """ |
540 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals()) |
540 DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals()) |
541 |
541 |
542 formatdict["topics"] += """ |
542 formatdict["topics"] += """ |
543 INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals()) |
543 INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals()) |
544 # |
544 |
545 # if direction == "input": |
545 if direction == "input": |
546 # formatdict["init"] += """ |
546 formatdict["init"] += """ |
547 # INIT_READ_VARIANT({ua_type}, {c_loc_name})""".format(**locals()) |
547 INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals()) |
548 # formatdict["retrieve"] += """ |
548 formatdict["retrieve"] += """ |
549 # READ_VALUE({ua_type}, {ua_type_enum}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals()) |
549 READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) |
550 # |
550 |
551 # if direction == "output": |
551 if direction == "output": |
552 # formatdict["init"] += """ |
552 # formatdict["init"] += " NOTHING ! publish doesn't need init. " |
553 # INIT_WRITE_VARIANT({ua_type}, {ua_type_enum}, {c_loc_name})""".format(**locals()) |
553 formatdict["publish"] += """ |
554 # formatdict["publish"] += """ |
554 WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) |
555 # WRITE_VALUE({ua_type}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals()) |
|
556 |
555 |
557 Ccode = template.format(**formatdict) |
556 Ccode = template.format(**formatdict) |
558 |
557 |
559 return Ccode |
558 return Ccode |
560 |
559 |