MQTT: fix non reconnecting python topics. WIP
--- a/mqtt/library.py Tue Jan 21 09:29:59 2025 +0100
+++ b/mqtt/library.py Tue Jan 21 16:28:43 2025 +0100
@@ -22,11 +22,14 @@
res = c_function(topic, payload, len(payload), QoS, Retained)
return res
-# C per client CallBack type for __mqtt_python_subscribe_{name}
-c_cb_type = ctypes.CFUNCTYPE(ctypes.c_int, # return
- ctypes.c_char_p, # topic
- ctypes.POINTER(ctypes.c_char), # data
- ctypes.c_uint32) # data length
+# C per client CallBack type for __mqtt_python_onmsg_{name}
+mqtt_c_cb_onmsg_type = ctypes.CFUNCTYPE(ctypes.c_int, # return
+ ctypes.c_char_p, # topic
+ ctypes.POINTER(ctypes.c_char), # data
+ ctypes.c_uint32) # data length
+
+# C per client CallBack type for __mqtt_python_resub_{name}
+mqtt_c_cb_resub_type = ctypes.CFUNCTYPE(ctypes.c_int) # return
# CallBacks management
# - each call to MQTT_subscribe registers a callback
@@ -35,18 +38,21 @@
# - one callback registered to C side per client
MQTT_client_cbs = {}
-def per_client_cb_factory(client):
- def per_client_cb(topic, dataptr, datalen):
+def mqtt_per_client_cb_factory(client):
+ def per_client_onmsg_cb(topic, dataptr, datalen):
payload = ctypes.string_at(dataptr, datalen)
- subscriber = MQTT_subscribers_cbs[client].get(topic, None)
+ subscriber,_Qos = MQTT_subscribers_cbs[client].get(topic, None)
if subscriber:
subscriber(topic, payload)
return 0
return 1
- return per_client_cb
+ def per_client_resub_cb():
+ for topic,(_cb,Qos) in MQTT_subscribers_cbs[client].items():
+ _MQTT_subscribe(clientname, topic, QoS)
+ return 1
+ return per_client_onmsg_cb,per_client_resub_cb
-def MQTT_subscribe(clientname, topic, cb, QoS = 1):
- global MQTT_client_cbs, MQTT_subscribers_cbs
+def _MQTT_subscribe(clientname, topic, QoS):
c_function_name = "__mqtt_python_subscribe_" + clientname
c_function = getattr(PLCBinary, c_function_name)
c_function.restype = ctypes.c_int # error or 0
@@ -54,17 +60,24 @@
ctypes.c_char_p, # topic
ctypes.c_uint8] # QoS
- MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = cb
+ return c_function(topic, QoS)
- c_cb = MQTT_client_cbs.get(clientname, None)
- if c_cb is None:
- c_cb = c_cb_type(per_client_cb_factory(clientname))
- MQTT_client_cbs[clientname] = c_cb
+def MQTT_subscribe(clientname, topic, cb, QoS = 1):
+ global MQTT_client_cbs, MQTT_subscribers_cbs
+
+ MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = (cb, QoS)
+ res = _MQTT_subscribe(clientname, topic, QoS)
+
+ c_cbs = MQTT_client_cbs.get(clientname, None)
+ if c_cbs is None:
+ cb_onmsg, cb_resub = mqtt_per_client_cb_factory(clientname)
+ c_cbs = (mqtt_c_cb_onmsg_type(cb_onmsg),
+ mqtt_c_cb_resub_type(cb_resub))
+ MQTT_client_cbs[clientname] = c_cbs
register_c_function = getattr(PLCBinary, "__mqtt_python_callback_setter_"+clientname )
- register_c_function.argtypes = [c_cb_type]
- register_c_function(c_cb)
+ register_c_function.argtypes = [mqtt_c_cb_onmsg_type, mqtt_c_cb_resub_type]
+ register_c_function(*c_cbs)
- res = c_function(topic, QoS)
return res
"""
--- a/mqtt/mqtt_template.c Tue Jan 21 09:29:59 2025 +0100
+++ b/mqtt/mqtt_template.c Tue Jan 21 16:28:43 2025 +0100
@@ -260,8 +260,11 @@
}}
-typedef int(*callback_fptr_t)(char* topic, char* data, uint32_t datalen);
-static callback_fptr_t __mqtt_python_callback_fptr_{name} = NULL;
+typedef int(*cb_onmsg_fptr_t)(char* topic, char* data, uint32_t datalen);
+static cb_onmsg_fptr_t __mqtt_python_cb_onmsg_fptr_{name} = NULL;
+
+typedef int(*cb_resub_fptr_t)(void);
+static cb_resub_fptr_t __mqtt_python_cb_resub_fptr_{name} = NULL;
static int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{{
@@ -309,8 +312,8 @@
high = mid - 1;
}}
// If we reach here, then the element was not present
- if(__mqtt_python_callback_fptr_{name} &&
- (*__mqtt_python_callback_fptr_{name})(topicName,
+ if(__mqtt_python_cb_onmsg_fptr_{name} &&
+ (*__mqtt_python_cb_onmsg_fptr_{name})(topicName,
(char*)message->payload,
message->payloadlen) == 0){{
// Topic was handled in python
@@ -458,6 +461,10 @@
{init_pubsub}
+ if(__mqtt_python_cb_resub_fptr_{name}){{
+ (*__mqtt_python_cb_resub_fptr_{name})();
+ }}
+
return MQTTCLIENT_SUCCESS;
}}
@@ -630,8 +637,9 @@
return 0;
}}
-int __mqtt_python_callback_setter_{name}(callback_fptr_t cb)
-{{
- __mqtt_python_callback_fptr_{name} = cb;
+int __mqtt_python_callback_setter_{name}(cb_onmsg_fptr_t cb_onmsg, cb_resub_fptr_t cb_resub)
+{{
+ __mqtt_python_cb_onmsg_fptr_{name} = cb_onmsg;
+ __mqtt_python_cb_resub_fptr_{name} = cb_resub;
return 0;
}}