mqtt/mqtt_client_gen.py
changeset 3989 987c69b1582f
parent 3988 150599d9073f
child 3990 24656e0e8732
equal deleted inserted replaced
3988:150599d9073f 3989:987c69b1582f
   365 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
   365 void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
   366 {{
   366 {{
   367     LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
   367     LogWarning("Paho MQTT Trace : %d, %s\\n", level, message);
   368 }}
   368 }}
   369 
   369 
       
   370 #define CHANGED 1
       
   371 #define UNCHANGED 0
       
   372 
   370 #define DECL_VAR(iec_type, C_type, c_loc_name)                                                     \\
   373 #define DECL_VAR(iec_type, C_type, c_loc_name)                                                     \\
   371 static C_type PLC_##c_loc_name##_buf = 0;                                                          \\
   374 static C_type PLC_##c_loc_name##_buf = 0;                                                          \\
   372 static C_type MQTT_##c_loc_name##_buf = 0;                                                         \\
   375 static C_type MQTT_##c_loc_name##_buf = 0;                                                         \\
       
   376 static int MQTT_##c_loc_name##_state = UNCHANGED;  /* systematically published at init */          \\
   373 C_type *c_loc_name = &PLC_##c_loc_name##_buf;
   377 C_type *c_loc_name = &PLC_##c_loc_name##_buf;
   374 
   378 
   375 {decl}
   379 {decl}
   376 
   380 
   377 static MQTTClient client;
   381 static MQTTClient client;
   378 #ifdef USE_MQTT_5
   382 #ifdef USE_MQTT_5
   379 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
   383 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
   380 #else
   384 #else
   381 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
   385 static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
   382 #endif
   386 #endif
   383 static pthread_mutex_t clientMutex;  // mutex to keep PLC data consistent
   387 
       
   388 /* condition to quit publish thread */
       
   389 static int MQTT_stop_thread = 0;
       
   390 
       
   391 /* condition to wakeup publish thread */
       
   392 static int MQTT_any_pub_var_changed = 0;
       
   393 
       
   394 /* mutex to keep PLC data consistent, and protect MQTT_any_pub_var_changed */
       
   395 static pthread_mutex_t MQTT_mutex;
       
   396 
       
   397 /* wakeup publish thread when PLC changed published variable */
       
   398 static pthread_cond_t MQTT_new_data = PTHREAD_COND_INITIALIZER;
       
   399 
       
   400 /* publish thread */
       
   401 static pthread_t publishThread;
   384 
   402 
   385 #define INIT_TOPIC(topic, iec_type, c_loc_name)                                                    \\
   403 #define INIT_TOPIC(topic, iec_type, c_loc_name)                                                    \\
   386 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
   404 {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}},
   387 
   405 
   388 static struct {{
   406 static struct {{
   476         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
   494         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
   477         {{                                                                                        \\
   495         {{                                                                                        \\
   478             LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);   \\
   496             LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);   \\
   479         }}                                                                                        \\
   497         }}                                                                                        \\
   480     }}
   498     }}
   481 
       
   482 
       
   483 
       
   484 
   499 
   485 
   500 
   486 #ifdef USE_MQTT_5
   501 #ifdef USE_MQTT_5
   487 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
   502 #define _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
   488         MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type),               \\
   503         MQTTResponse response = MQTTClient_publish5(client, #Topic, sizeof(C_type),               \\
   499     {{                                                                                            \\
   514     {{                                                                                            \\
   500         int rc;                                                                                   \\
   515         int rc;                                                                                   \\
   501         _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
   516         _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
   502         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
   517         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
   503         {{                                                                                        \\
   518         {{                                                                                        \\
   504             LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);   \\
   519             LogError("MQTT client failed to init publication of '%s', return code %d\\n", #Topic, rc);\\
       
   520             /* TODO update status variable accordingly */                                         \\
   505         }}                                                                                        \\
   521         }}                                                                                        \\
   506     }}
   522     }}
   507 
   523 
   508 
   524 #define PUBLISH_CHANGE(Topic, QoS, C_type, c_loc_name, Retained)                                  \\
       
   525     if(MQTT_##c_loc_name##_state == CHANGED)                                                      \\
       
   526     {{                                                                                            \\
       
   527         int rc;                                                                                   \\
       
   528         _PUBLISH(Topic, QoS, C_type, c_loc_name, Retained)                                        \\
       
   529         if (rc != MQTTCLIENT_SUCCESS)                                                             \\
       
   530         {{                                                                                        \\
       
   531             LogError("MQTT client failed to publish '%s', return code %d\\n", #Topic, rc);        \\
       
   532             /* TODO update status variable accordingly */                                         \\
       
   533         }} else {{                                                                                \\
       
   534             MQTT_##c_loc_name##_state = UNCHANGED;                                                \\
       
   535         }}                                                                                        \\
       
   536     }}
       
   537 
       
   538 static void *__publish_thread(void *_unused) {{
       
   539     int rc = 0;
       
   540     while((rc = pthread_mutex_lock(&MQTT_mutex)) == 0 && !MQTT_stop_thread){{
       
   541         pthread_cond_wait(&MQTT_new_data, &MQTT_mutex);
       
   542         if(MQTT_any_pub_var_changed){{
       
   543 
       
   544             /* publish changes, and reset variable's state to UNCHANGED */
       
   545 {publish_changes}
       
   546             MQTT_any_pub_var_changed = 0;
       
   547         }}
       
   548 
       
   549         pthread_mutex_unlock(&MQTT_mutex);
       
   550     }}
       
   551 
       
   552     if(!MQTT_stop_thread){{
       
   553         /* if thread exits outside of normal shutdown, report error*/
       
   554         LogError("MQTT client thread exited unexpectedly, return code %d\\n", rc);
       
   555     }}
       
   556 }}
       
   557     
   509 int __init_{locstr}(int argc,char **argv)
   558 int __init_{locstr}(int argc,char **argv)
   510 {{
   559 {{
   511     char *uri = "{uri}";
   560     char *uri = "{uri}";
   512     char *clientID = "{clientID}";
   561     char *clientID = "{clientID}";
   513     int rc;
   562     int rc;
   550     }}
   599     }}
   551 
   600 
   552 {init}
   601 {init}
   553 
   602 
   554     /* TODO start publish thread */
   603     /* TODO start publish thread */
       
   604     rc = pthread_create(&publishThread, NULL, &__publish_thread, NULL);
   555 
   605 
   556     return 0;
   606     return 0;
   557 }}
   607 }}
   558 
   608 
   559 #define READ_VALUE(c_loc_name, C_type) \\
   609 #define READ_VALUE(c_loc_name, C_type) \\
   565 {retrieve}
   615 {retrieve}
   566     /* TODO free mutex */
   616     /* TODO free mutex */
   567 }}
   617 }}
   568 
   618 
   569 #define WRITE_VALUE(c_loc_name, C_type) \\
   619 #define WRITE_VALUE(c_loc_name, C_type) \\
   570     MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf;
   620     if(MQTT_##c_loc_name##_buf != PLC_##c_loc_name##_buf){{ \\
       
   621         MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; \\
       
   622         MQTT_##c_loc_name##_state = CHANGED; \\
       
   623         MQTT_any_pub_var_changed = 1; \\
       
   624     }}
   571 
   625 
   572 void __publish_{locstr}(void)
   626 void __publish_{locstr}(void)
   573 {{
   627 {{
   574     /* TODO try take mutex */
   628     if (pthread_mutex_trylock(&MQTT_mutex) == 0){{
       
   629         MQTT_any_pub_var_changed = 0;
       
   630         /* copy PLC_* variables to MQTT_*, and mark those who changed */
   575 {publish}
   631 {publish}
   576     /* TODO free mutex */
   632         /* if any change detcted, unblock publish thread */
   577     /* TODO unblock publish thread */
   633         if(MQTT_any_pub_var_changed){{
       
   634             pthread_cond_signal(&MQTT_new_data);
       
   635         }}
       
   636         pthread_mutex_unlock(&MQTT_mutex);
       
   637     }} else {{
       
   638         /* TODO if couldn't lock mutex set status variable accordingly */ 
       
   639     }}
   578 }}
   640 }}
   579 
   641 
   580 """
   642 """
   581 
   643 
   582         formatdict = dict(
   644         formatdict = dict(
   583             locstr   = locstr,
   645             locstr          = locstr,
   584             uri      = config["URI"],
   646             uri             = config["URI"],
   585             clientID = config["clientID"],
   647             clientID        = config["clientID"],
   586             decl     = "",
   648             decl            = "",
   587             topics   = "",
   649             topics          = "",
   588             cleanup  = "",
   650             cleanup         = "",
   589             init     = "",
   651             init            = "",
   590             retrieve = "",
   652             retrieve        = "",
   591             publish  = ""
   653             publish         = "",
       
   654             publish_changes = ""
   592         )
   655         )
   593 
   656 
   594 
   657 
   595         # Use Config's "MQTTVersion" to switch between protocol version at build time
   658         # Use Config's "MQTTVersion" to switch between protocol version at build time
   596         if config["UseMQTT5"]:
   659         if config["UseMQTT5"]:
   635                 else:
   698                 else:
   636                     formatdict["init"] += """
   699                     formatdict["init"] += """
   637     INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
   700     INIT_PUBLICATION({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
   638                     formatdict["publish"] += """
   701                     formatdict["publish"] += """
   639     WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
   702     WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals())
       
   703                     formatdict["publish_changes"] += """
       
   704     PUBLISH_CHANGE({Topic}, {QoS}, {C_type}, {c_loc_name}, {Retained})""".format(**locals())
   640 
   705 
   641         Ccode = template.format(**formatdict)
   706         Ccode = template.format(**formatdict)
   642 
   707 
   643         return Ccode
   708         return Ccode
   644 
   709