# HG changeset patch # User Edouard Tisserant <edouard@beremiz.fr> # Date 1737473323 -3600 # Node ID 97e93962be0c9db07fef2769a3925d8f8d5e3950 # Parent 8c12c0df026a69d45527eefe48899cf4b0eaf428 MQTT: fix non reconnecting python topics. WIP diff -r 8c12c0df026a -r 97e93962be0c mqtt/library.py --- a/mqtt/library.py Tue Jan 21 09:29:59 2025 +0100 +++ b/mqtt/library.py Tue Jan 21 16:28:43 2025 +0100 @@ -22,11 +22,14 @@ res = c_function(topic, payload, len(payload), QoS, Retained) return res -# C per client CallBack type for __mqtt_python_subscribe_{name} -c_cb_type = ctypes.CFUNCTYPE(ctypes.c_int, # return - ctypes.c_char_p, # topic - ctypes.POINTER(ctypes.c_char), # data - ctypes.c_uint32) # data length +# C per client CallBack type for __mqtt_python_onmsg_{name} +mqtt_c_cb_onmsg_type = ctypes.CFUNCTYPE(ctypes.c_int, # return + ctypes.c_char_p, # topic + ctypes.POINTER(ctypes.c_char), # data + ctypes.c_uint32) # data length + +# C per client CallBack type for __mqtt_python_resub_{name} +mqtt_c_cb_resub_type = ctypes.CFUNCTYPE(ctypes.c_int) # return # CallBacks management # - each call to MQTT_subscribe registers a callback @@ -35,18 +38,21 @@ # - one callback registered to C side per client MQTT_client_cbs = {} -def per_client_cb_factory(client): - def per_client_cb(topic, dataptr, datalen): +def mqtt_per_client_cb_factory(client): + def per_client_onmsg_cb(topic, dataptr, datalen): payload = ctypes.string_at(dataptr, datalen) - subscriber = MQTT_subscribers_cbs[client].get(topic, None) + subscriber,_Qos = MQTT_subscribers_cbs[client].get(topic, None) if subscriber: subscriber(topic, payload) return 0 return 1 - return per_client_cb + def per_client_resub_cb(): + for topic,(_cb,Qos) in MQTT_subscribers_cbs[client].items(): + _MQTT_subscribe(clientname, topic, QoS) + return 1 + return per_client_onmsg_cb,per_client_resub_cb -def MQTT_subscribe(clientname, topic, cb, QoS = 1): - global MQTT_client_cbs, MQTT_subscribers_cbs +def _MQTT_subscribe(clientname, topic, QoS): c_function_name = "__mqtt_python_subscribe_" + clientname c_function = getattr(PLCBinary, c_function_name) c_function.restype = ctypes.c_int # error or 0 @@ -54,17 +60,24 @@ ctypes.c_char_p, # topic ctypes.c_uint8] # QoS - MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = cb + return c_function(topic, QoS) - c_cb = MQTT_client_cbs.get(clientname, None) - if c_cb is None: - c_cb = c_cb_type(per_client_cb_factory(clientname)) - MQTT_client_cbs[clientname] = c_cb +def MQTT_subscribe(clientname, topic, cb, QoS = 1): + global MQTT_client_cbs, MQTT_subscribers_cbs + + MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = (cb, QoS) + res = _MQTT_subscribe(clientname, topic, QoS) + + c_cbs = MQTT_client_cbs.get(clientname, None) + if c_cbs is None: + cb_onmsg, cb_resub = mqtt_per_client_cb_factory(clientname) + c_cbs = (mqtt_c_cb_onmsg_type(cb_onmsg), + mqtt_c_cb_resub_type(cb_resub)) + MQTT_client_cbs[clientname] = c_cbs register_c_function = getattr(PLCBinary, "__mqtt_python_callback_setter_"+clientname ) - register_c_function.argtypes = [c_cb_type] - register_c_function(c_cb) + register_c_function.argtypes = [mqtt_c_cb_onmsg_type, mqtt_c_cb_resub_type] + register_c_function(*c_cbs) - res = c_function(topic, QoS) return res """ diff -r 8c12c0df026a -r 97e93962be0c mqtt/mqtt_template.c --- a/mqtt/mqtt_template.c Tue Jan 21 09:29:59 2025 +0100 +++ b/mqtt/mqtt_template.c Tue Jan 21 16:28:43 2025 +0100 @@ -260,8 +260,11 @@ }} -typedef int(*callback_fptr_t)(char* topic, char* data, uint32_t datalen); -static callback_fptr_t __mqtt_python_callback_fptr_{name} = NULL; +typedef int(*cb_onmsg_fptr_t)(char* topic, char* data, uint32_t datalen); +static cb_onmsg_fptr_t __mqtt_python_cb_onmsg_fptr_{name} = NULL; + +typedef int(*cb_resub_fptr_t)(void); +static cb_resub_fptr_t __mqtt_python_cb_resub_fptr_{name} = NULL; static int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) {{ @@ -309,8 +312,8 @@ high = mid - 1; }} // If we reach here, then the element was not present - if(__mqtt_python_callback_fptr_{name} && - (*__mqtt_python_callback_fptr_{name})(topicName, + if(__mqtt_python_cb_onmsg_fptr_{name} && + (*__mqtt_python_cb_onmsg_fptr_{name})(topicName, (char*)message->payload, message->payloadlen) == 0){{ // Topic was handled in python @@ -458,6 +461,10 @@ {init_pubsub} + if(__mqtt_python_cb_resub_fptr_{name}){{ + (*__mqtt_python_cb_resub_fptr_{name})(); + }} + return MQTTCLIENT_SUCCESS; }} @@ -630,8 +637,9 @@ return 0; }} -int __mqtt_python_callback_setter_{name}(callback_fptr_t cb) -{{ - __mqtt_python_callback_fptr_{name} = cb; +int __mqtt_python_callback_setter_{name}(cb_onmsg_fptr_t cb_onmsg, cb_resub_fptr_t cb_resub) +{{ + __mqtt_python_cb_onmsg_fptr_{name} = cb_onmsg; + __mqtt_python_cb_resub_fptr_{name} = cb_resub; return 0; }}