mqtt/library.py
changeset 4103 63c002e87c57
parent 4100 c6c80c088497
equal deleted inserted replaced
4102:32e0ffdf2f44 4103:63c002e87c57
    20         ctypes.c_uint8,   # Retained
    20         ctypes.c_uint8,   # Retained
    21     ]
    21     ]
    22     res = c_function(topic, payload, len(payload), QoS, Retained)
    22     res = c_function(topic, payload, len(payload), QoS, Retained)
    23     return res
    23     return res
    24 
    24 
    25 # C per client CallBack type for __mqtt_python_subscribe_{name}
    25 # C per client CallBack type for __mqtt_python_onmsg_{name}
    26 c_cb_type = ctypes.CFUNCTYPE(ctypes.c_int,                   # return
    26 mqtt_c_cb_onmsg_type = ctypes.CFUNCTYPE(ctypes.c_int,                   # return
    27                              ctypes.c_char_p,                # topic
    27                                         ctypes.c_char_p,                # topic
    28                              ctypes.POINTER(ctypes.c_char),  # data
    28                                         ctypes.POINTER(ctypes.c_char),  # data
    29                              ctypes.c_uint32)                # data length
    29                                         ctypes.c_uint32)                # data length
       
    30 
       
    31 # C per client CallBack type for __mqtt_python_resub_{name}
       
    32 mqtt_c_cb_resub_type = ctypes.CFUNCTYPE(ctypes.c_int)                  # return
    30 
    33 
    31 # CallBacks management
    34 # CallBacks management
    32 # - each call to MQTT_subscribe registers a callback
    35 # - each call to MQTT_subscribe registers a callback
    33 MQTT_subscribers_cbs = {}
    36 MQTT_subscribers_cbs = {}
    34 
    37 
    35 # - one callback registered to C side per client
    38 # - one callback registered to C side per client
    36 MQTT_client_cbs = {}
    39 MQTT_client_cbs = {}
    37 
    40 
    38 def per_client_cb_factory(client):
    41 def mqtt_per_client_cb_factory(clientname):
    39     def per_client_cb(topic, dataptr, datalen):
    42     def per_client_onmsg_cb(topic, dataptr, datalen):
    40         payload = ctypes.string_at(dataptr, datalen)
    43         payload = ctypes.string_at(dataptr, datalen)
    41         subscriber = MQTT_subscribers_cbs[client].get(topic, None)
    44         subscriber,_Qos = MQTT_subscribers_cbs[clientname].get(topic, None)
    42         if subscriber:
    45         if subscriber:
    43             subscriber(topic, payload)
    46             subscriber(topic, payload)
    44             return 0
    47             return 0
    45         return 1
    48         return 1
    46     return per_client_cb
    49     def per_client_resub_cb():
       
    50         for topic,(_cb,QoS) in MQTT_subscribers_cbs[clientname].items():
       
    51             _MQTT_subscribe(clientname, topic, QoS)
       
    52         return 1
       
    53     return per_client_onmsg_cb,per_client_resub_cb
    47     
    54     
    48 def MQTT_subscribe(clientname, topic, cb, QoS = 1):
    55 def _MQTT_subscribe(clientname, topic, QoS):
    49     global MQTT_client_cbs, MQTT_subscribers_cbs
       
    50     c_function_name = "__mqtt_python_subscribe_" + clientname
    56     c_function_name = "__mqtt_python_subscribe_" + clientname
    51     c_function = getattr(PLCBinary, c_function_name)
    57     c_function = getattr(PLCBinary, c_function_name)
    52     c_function.restype = ctypes.c_int # error or 0
    58     c_function.restype = ctypes.c_int # error or 0
    53     c_function.argtypes = [
    59     c_function.argtypes = [
    54         ctypes.c_char_p,  # topic
    60         ctypes.c_char_p,  # topic
    55         ctypes.c_uint8]   # QoS
    61         ctypes.c_uint8]   # QoS
    56 
    62 
    57     MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = cb
    63     return c_function(topic, QoS)
    58 
    64 
    59     c_cb = MQTT_client_cbs.get(clientname, None)
    65 def MQTT_subscribe(clientname, topic, cb, QoS = 1):
    60     if c_cb is None:
    66     global MQTT_client_cbs, MQTT_subscribers_cbs
    61         c_cb = c_cb_type(per_client_cb_factory(clientname))
    67 
    62         MQTT_client_cbs[clientname] = c_cb
    68     MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = (cb, QoS)
       
    69     res = _MQTT_subscribe(clientname, topic, QoS)
       
    70 
       
    71     c_cbs = MQTT_client_cbs.get(clientname, None)
       
    72     if c_cbs is None:
       
    73         cb_onmsg, cb_resub = mqtt_per_client_cb_factory(clientname)
       
    74         c_cbs = (mqtt_c_cb_onmsg_type(cb_onmsg),
       
    75                  mqtt_c_cb_resub_type(cb_resub))
       
    76         MQTT_client_cbs[clientname] = c_cbs
    63         register_c_function = getattr(PLCBinary, "__mqtt_python_callback_setter_"+clientname )
    77         register_c_function = getattr(PLCBinary, "__mqtt_python_callback_setter_"+clientname )
    64         register_c_function.argtypes = [c_cb_type]
    78         register_c_function.argtypes = [mqtt_c_cb_onmsg_type, mqtt_c_cb_resub_type]
    65         register_c_function(c_cb)
    79         register_c_function(*c_cbs)
    66 
    80 
    67     res = c_function(topic, QoS)
       
    68     return res
    81     return res
    69 
    82 
    70 """
    83 """
    71 
    84 
    72 class MQTTLibrary(POULibrary):
    85 class MQTTLibrary(POULibrary):