365 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) |
365 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) |
366 {{ |
366 {{ |
367 LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); |
367 LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); |
368 }} |
368 }} |
369 |
369 |
|
370 #define CHANGED 1 |
|
371 #define UNCHANGED 0 |
|
372 |
370 #define DECL_VAR(iec_type, C_type, c_loc_name) \\ |
373 #define DECL_VAR(iec_type, C_type, c_loc_name) \\ |
371 static C_type PLC_##c_loc_name##_buf = 0; \\ |
374 static C_type PLC_##c_loc_name##_buf = 0; \\ |
372 static C_type MQTT_##c_loc_name##_buf = 0; \\ |
375 static C_type MQTT_##c_loc_name##_buf = 0; \\ |
|
376 static int MQTT_##c_loc_name##_state = UNCHANGED; /* systematically published at init */ \\ |
373 C_type *c_loc_name = &PLC_##c_loc_name##_buf; |
377 C_type *c_loc_name = &PLC_##c_loc_name##_buf; |
374 |
378 |
375 {decl} |
379 {decl} |
376 |
380 |
377 static MQTTClient client; |
381 static MQTTClient client; |
378 #ifdef USE_MQTT_5 |
382 #ifdef USE_MQTT_5 |
379 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; |
383 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; |
380 #else |
384 #else |
381 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; |
385 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; |
382 #endif |
386 #endif |
383 static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent |
387 |
|
388 /* condition to quit publish thread */ |
|
389 static int MQTT_stop_thread = 0; |
|
390 |
|
391 /* condition to wakeup publish thread */ |
|
392 static int MQTT_any_pub_var_changed = 0; |
|
393 |
|
394 /* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */ |
|
395 static pthread_mutex_t MQTT_mutex; |
|
396 |
|
397 /* wakeup publish thread when PLC changed published variable */ |
|
398 static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER; |
|
399 |
|
400 /* publish thread */ |
|
401 static pthread_t publishThread; |
384 |
402 |
385 #define INIT_TOPIC(topic, iec_type, c_loc_name) \\ |
403 #define INIT_TOPIC(topic, iec_type, c_loc_name) \\ |
386 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, |
404 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, |
387 |
405 |
388 static struct {{ |
406 static struct {{ |
476 if (rc != MQTTCLIENT_SUCCESS) \\ |
494 if (rc != MQTTCLIENT_SUCCESS) \\ |
477 {{ \\ |
495 {{ \\ |
478 LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\ |
496 LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\ |
479 }} \\ |
497 }} \\ |
480 }} |
498 }} |
481 |
|
482 |
|
483 |
|
484 |
499 |
485 |
500 |
486 #ifdef USE_MQTT_5 |
501 #ifdef USE_MQTT_5 |
487 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ |
502 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ |
488 MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\ |
503 MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type), \\ |
499 {{ \\ |
514 {{ \\ |
500 int rc; \\ |
515 int rc; \\ |
501 _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ |
516 _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ |
502 if (rc != MQTTCLIENT_SUCCESS) \\ |
517 if (rc != MQTTCLIENT_SUCCESS) \\ |
503 {{ \\ |
518 {{ \\ |
504 LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\ |
519 LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\ |
|
520 /* TODO update status variable accordingly */ \\ |
505 }} \\ |
521 }} \\ |
506 }} |
522 }} |
507 |
523 |
508 |
524 #define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained) \\ |
|
525 if(MQTT_##c_loc_name##_state == CHANGED) \\ |
|
526 {{ \\ |
|
527 int rc; \\ |
|
528 _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained) \\ |
|
529 if (rc != MQTTCLIENT_SUCCESS) \\ |
|
530 {{ \\ |
|
531 LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc); \\ |
|
532 /* TODO update status variable accordingly */ \\ |
|
533 }} else {{ \\ |
|
534 MQTT_##c_loc_name##_state = UNCHANGED; \\ |
|
535 }} \\ |
|
536 }} |
|
537 |
|
538 static void *__publish_thread(void *_unused) {{ |
|
539 int rc = 0; |
|
540 while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{ |
|
541 pthread_cond_wait(&MQTT_new_data, &MQTT_mutex); |
|
542 if(MQTT_any_pub_var_changed){{ |
|
543 |
|
544 /* publish changes, and reset variable's state to UNCHANGED */ |
|
545 {publish_changes} |
|
546 MQTT_any_pub_var_changed = 0; |
|
547 }} |
|
548 |
|
549 pthread_mutex_unlock(&MQTT_mutex); |
|
550 }} |
|
551 |
|
552 if(!MQTT_stop_thread){{ |
|
553 /* if thread exits outside of normal shutdown, report error*/ |
|
554 LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc); |
|
555 }} |
|
556 }} |
|
557 |
509 int __init_{locstr}(int argc,char **argv) |
558 int __init_{locstr}(int argc,char **argv) |
510 {{ |
559 {{ |
511 char *uri = "{uri}"; |
560 char *uri = "{uri}"; |
512 char *clientID = "{clientID}"; |
561 char *clientID = "{clientID}"; |
513 int rc; |
562 int rc; |
565 {retrieve} |
615 {retrieve} |
566 /* TODO free mutex */ |
616 /* TODO free mutex */ |
567 }} |
617 }} |
568 |
618 |
569 #define WRITE_VALUE(c_loc_name, C_type) \\ |
619 #define WRITE_VALUE(c_loc_name, C_type) \\ |
570 MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; |
620 if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\ |
|
621 MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\ |
|
622 MQTT_##c_loc_name##_state = CHANGED; \\ |
|
623 MQTT_any_pub_var_changed = 1; \\ |
|
624 }} |
571 |
625 |
572 void __publish_{locstr}(void) |
626 void __publish_{locstr}(void) |
573 {{ |
627 {{ |
574 /* TODO try take mutex */ |
628 if (pthread_mutex_trylock(&MQTT_mutex) == 0){{ |
|
629 MQTT_any_pub_var_changed = 0; |
|
630 /* copy PLC_* variables to MQTT_*, and mark those who changed */ |
575 {publish} |
631 {publish} |
576 /* TODO free mutex */ |
632 /* if any change detcted, unblock publish thread */ |
577 /* TODO unblock publish thread */ |
633 if(MQTT_any_pub_var_changed){{ |
|
634 pthread_cond_signal(&MQTT_new_data); |
|
635 }} |
|
636 pthread_mutex_unlock(&MQTT_mutex); |
|
637 }} else {{ |
|
638 /* TODO if couldn't lock mutex set status variable accordingly */ |
|
639 }} |
578 }} |
640 }} |
579 |
641 |
580 """ |
642 """ |
581 |
643 |
582 formatdict = dict( |
644 formatdict = dict( |
583 locstr = locstr, |
645 locstr = locstr, |
584 uri = config["URI"], |
646 uri = config["URI"], |
585 clientID = config["clientID"], |
647 clientID = config["clientID"], |
586 decl = "", |
648 decl = "", |
587 topics = "", |
649 topics = "", |
588 cleanup = "", |
650 cleanup = "", |
589 init = "", |
651 init = "", |
590 retrieve = "", |
652 retrieve = "", |
591 publish = "" |
653 publish = "", |
|
654 publish_changes = "" |
592 ) |
655 ) |
593 |
656 |
594 |
657 |
595 # Use Config's "MQTTVersion" to switch between protocol version at build time |
658 # Use Config's "MQTTVersion" to switch between protocol version at build time |
596 if config["UseMQTT5"]: |
659 if config["UseMQTT5"]: |
635 else: |
698 else: |
636 formatdict["init"] += """ |
699 formatdict["init"] += """ |
637 INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) |
700 INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) |
638 formatdict["publish"] += """ |
701 formatdict["publish"] += """ |
639 WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) |
702 WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) |
|
703 formatdict["publish_changes"] += """ |
|
704 PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals()) |
640 |
705 |
641 Ccode = template.format(**formatdict) |
706 Ccode = template.format(**formatdict) |
642 |
707 |
643 return Ccode |
708 return Ccode |
644 |
709 |