# HG changeset patch # User Edouard Tisserant # Date 1720776246 -7200 # Node ID 98bd0bb33ce4ec96464e1b70674d1cb3049334a0 # Parent d0c5d77a33af19ac2e49a613d72f7a5e35337a25 MQTT: WIP, now connects to broker. Added MQTT 3 support and protocol version selection in IDE. diff -r d0c5d77a33af -r 98bd0bb33ce4 mqtt/client.py --- a/mqtt/client.py Wed Jul 10 11:10:05 2024 +0200 +++ b/mqtt/client.py Fri Jul 12 11:24:06 2024 +0200 @@ -55,6 +55,7 @@ + @@ -86,7 +87,11 @@ return attr["value"] AuthType = cfg("AuthType") - res = dict(URI=cfg("Broker_URI"), AuthType=AuthType, clientID=cfg("Client_ID")) + res = dict( + URI=cfg("Broker_URI"), + AuthType=AuthType, + clientID=cfg("Client_ID"), + UseMQTT5=cfg("Use_MQTT_5")) paramList = authParams.get(AuthType, None) if paramList: diff -r d0c5d77a33af -r 98bd0bb33ce4 mqtt/mqtt_client_gen.py --- a/mqtt/mqtt_client_gen.py Wed Jul 10 11:10:05 2024 +0200 +++ b/mqtt/mqtt_client_gen.py Fri Jul 12 11:24:06 2024 +0200 @@ -330,22 +330,18 @@ #include "MQTTClient.h" #include "MQTTClientPersistence.h" -#define _Log(level, ...) \\ - {{ \\ +#define _Log(level, ...) \\ + {{ \\ /* char mstr[256]; */ \\ /* snprintf(mstr, 255, __VA_ARGS__); */ \\ /* LogMessage(level, mstr, strlen(mstr)); */ \\ - printf(__VA_ARGS__); \\ + printf(__VA_ARGS__); \\ }} #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__); #define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__); #define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__); -static MQTTClient client; -static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; -static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent - void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) {{ LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); @@ -358,7 +354,15 @@ {decl} -#define INIT_TOPIC(topic, iec_type, c_loc_name) \\ +static MQTTClient client; +#ifdef USE_MQTT_5 +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; +#else +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; +#endif +static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent + +#define INIT_TOPIC(topic, iec_type, c_loc_name) \\ {{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, static struct {{ @@ -372,6 +376,8 @@ static int _connect_mqtt(void) {{ int rc; + +#ifdef USE_MQTT_5 MQTTProperties props = MQTTProperties_initializer; MQTTProperties willProps = MQTTProperties_initializer; MQTTResponse response = MQTTResponse_initializer; @@ -379,6 +385,9 @@ response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); rc = response.reasonCode; MQTTResponse_free(response); +#else + rc = MQTTClient_connect(client, &conn_opts); +#endif return rc; }} @@ -389,7 +398,11 @@ /* TODO stop publish thread */ +#ifdef USE_MQTT_5 if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) +#else + if (rc = MQTTClient_disconnect(client, 5000) != MQTTCLIENT_SUCCESS) +#endif {{ LogError("MQTT Failed to disconnect, return code %d\\n", rc); }} @@ -415,25 +428,33 @@ }} #define INIT_NoAuth() \\ - LogInfo("MQTT Init no auth"); + LogInfo("MQTT Init no auth\\n"); #define INIT_x509(PrivateKey, Certificate) \\ - LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate); + LogInfo("MQTT Init x509 %s,%s\\n", PrivateKey, Certificate); /* TODO */ #define INIT_UserPassword(User, Password) \\ - LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\ + LogInfo("MQTT Init UserPassword %s,%s\\n", User, Password); \\ conn_opts.username = User; \\ conn_opts.password = Password; +#ifdef USE_MQTT_5 +#define MY_SUBSCRIBE(Topic, QoS) \\ + MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\ + rc = response.reasonCode; \\ + MQTTResponse_free(response); +#else +#define MY_SUBSCRIBE(Topic, QoS) \\ + rc = MQTTClient_subscribe(client, #Topic, QoS); +#endif + #define INIT_SUBSCRIPTION(Topic, QoS) \\ {{ \\ - MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\ - rc = response.reasonCode; \\ - MQTTResponse_free(response); \\ + MY_SUBSCRIBE(Topic, QoS) \\ if (rc != MQTTCLIENT_SUCCESS) \\ {{ \\ - LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\ + LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc); \\ }} \\ }} @@ -445,10 +466,14 @@ MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; +#ifdef USE_MQTT_5 conn_opts.MQTTVersion = MQTTVERSION_5; conn_opts.cleanstart = 1; createOpts.MQTTVersion = MQTTVERSION_5; +#else + conn_opts.cleansession = 1; +#endif MQTTClient_setTraceCallback(trace_callback); MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); @@ -465,18 +490,19 @@ rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); if (rc != MQTTCLIENT_SUCCESS) {{ - LogError("MQTT Failed to set callbacks %d", rc); + LogError("MQTT Failed to set callbacks, return code %d\\n", rc); return rc; }} -{init} - rc = _connect_mqtt(); if (rc != MQTTCLIENT_SUCCESS) {{ - LogError("MQTT Init Failed %d", rc); + LogError("MQTT Connect Failed, return code %d\\n", rc); return rc; }} + +{init} + /* TODO start publish thread */ return 0; @@ -504,7 +530,7 @@ }} """ - + formatdict = dict( locstr = locstr, uri = config["URI"], @@ -514,9 +540,15 @@ cleanup = "", init = "", retrieve = "", - publish = "" + publish = "" ) + + # Use Config's "MQTTVersion" to switch between protocol version at build time + if config["UseMQTT5"]: + formatdict["decl"] += """ +#define USE_MQTT_5""".format(**config) + AuthType = config["AuthType"] if AuthType == "x509": formatdict["init"] += """ @@ -549,12 +581,13 @@ READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) if direction == "output": + # TODO: publish at init # formatdict["init"] += " NOTHING ! publish doesn't need init. " formatdict["publish"] += """ WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) Ccode = template.format(**formatdict) - + return Ccode if __name__ == "__main__": @@ -572,6 +605,7 @@ config["URI"] = sys.argv[1] if argc>1 else "tcp://localhost:1883" config["clientID"] = sys.argv[2] if argc>2 else "" config["AuthType"] = None + config["UseMQTT5"] = True if argc > 3: AuthType = sys.argv[3] @@ -595,7 +629,7 @@ def OnGenerate(evt): dlg = wx.FileDialog( frame, message="Generate file as ...", defaultDir=os.getcwd(), - defaultFile="", + defaultFile="", wildcard="C (*.c)|*.c", style=wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT ) @@ -620,9 +654,9 @@ int main(int argc, char *argv[]) { __init_test(arc,argv); - + __retrieve_test(); - + __publish_test(); __cleanup_test(); @@ -655,7 +689,7 @@ def OnSave(evt): dlg = wx.FileDialog( frame, message="Save file as ...", defaultDir=os.getcwd(), - defaultFile="", + defaultFile="", wildcard="CSV (*.csv)|*.csv", style=wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT )