mqtt/library.py
changeset 4103 63c002e87c57
parent 4100 c6c80c088497
--- a/mqtt/library.py	Wed Jan 22 22:05:08 2025 +0100
+++ b/mqtt/library.py	Sun Jan 26 14:58:13 2025 +0100
@@ -22,11 +22,14 @@
     res = c_function(topic, payload, len(payload), QoS, Retained)
     return res
 
-# C per client CallBack type for __mqtt_python_subscribe_{name}
-c_cb_type = ctypes.CFUNCTYPE(ctypes.c_int,                   # return
-                             ctypes.c_char_p,                # topic
-                             ctypes.POINTER(ctypes.c_char),  # data
-                             ctypes.c_uint32)                # data length
+# C per client CallBack type for __mqtt_python_onmsg_{name}
+mqtt_c_cb_onmsg_type = ctypes.CFUNCTYPE(ctypes.c_int,                   # return
+                                        ctypes.c_char_p,                # topic
+                                        ctypes.POINTER(ctypes.c_char),  # data
+                                        ctypes.c_uint32)                # data length
+
+# C per client CallBack type for __mqtt_python_resub_{name}
+mqtt_c_cb_resub_type = ctypes.CFUNCTYPE(ctypes.c_int)                  # return
 
 # CallBacks management
 # - each call to MQTT_subscribe registers a callback
@@ -35,18 +38,21 @@
 # - one callback registered to C side per client
 MQTT_client_cbs = {}
 
-def per_client_cb_factory(client):
-    def per_client_cb(topic, dataptr, datalen):
+def mqtt_per_client_cb_factory(clientname):
+    def per_client_onmsg_cb(topic, dataptr, datalen):
         payload = ctypes.string_at(dataptr, datalen)
-        subscriber = MQTT_subscribers_cbs[client].get(topic, None)
+        subscriber,_Qos = MQTT_subscribers_cbs[clientname].get(topic, None)
         if subscriber:
             subscriber(topic, payload)
             return 0
         return 1
-    return per_client_cb
+    def per_client_resub_cb():
+        for topic,(_cb,QoS) in MQTT_subscribers_cbs[clientname].items():
+            _MQTT_subscribe(clientname, topic, QoS)
+        return 1
+    return per_client_onmsg_cb,per_client_resub_cb
     
-def MQTT_subscribe(clientname, topic, cb, QoS = 1):
-    global MQTT_client_cbs, MQTT_subscribers_cbs
+def _MQTT_subscribe(clientname, topic, QoS):
     c_function_name = "__mqtt_python_subscribe_" + clientname
     c_function = getattr(PLCBinary, c_function_name)
     c_function.restype = ctypes.c_int # error or 0
@@ -54,17 +60,24 @@
         ctypes.c_char_p,  # topic
         ctypes.c_uint8]   # QoS
 
-    MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = cb
+    return c_function(topic, QoS)
 
-    c_cb = MQTT_client_cbs.get(clientname, None)
-    if c_cb is None:
-        c_cb = c_cb_type(per_client_cb_factory(clientname))
-        MQTT_client_cbs[clientname] = c_cb
+def MQTT_subscribe(clientname, topic, cb, QoS = 1):
+    global MQTT_client_cbs, MQTT_subscribers_cbs
+
+    MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = (cb, QoS)
+    res = _MQTT_subscribe(clientname, topic, QoS)
+
+    c_cbs = MQTT_client_cbs.get(clientname, None)
+    if c_cbs is None:
+        cb_onmsg, cb_resub = mqtt_per_client_cb_factory(clientname)
+        c_cbs = (mqtt_c_cb_onmsg_type(cb_onmsg),
+                 mqtt_c_cb_resub_type(cb_resub))
+        MQTT_client_cbs[clientname] = c_cbs
         register_c_function = getattr(PLCBinary, "__mqtt_python_callback_setter_"+clientname )
-        register_c_function.argtypes = [c_cb_type]
-        register_c_function(c_cb)
+        register_c_function.argtypes = [mqtt_c_cb_onmsg_type, mqtt_c_cb_resub_type]
+        register_c_function(*c_cbs)
 
-    res = c_function(topic, QoS)
     return res
 
 """