20 ctypes.c_uint8, # Retained |
20 ctypes.c_uint8, # Retained |
21 ] |
21 ] |
22 res = c_function(topic, payload, len(payload), QoS, Retained) |
22 res = c_function(topic, payload, len(payload), QoS, Retained) |
23 return res |
23 return res |
24 |
24 |
25 # C per client CallBack type for __mqtt_python_subscribe_{name} |
25 # C per client CallBack type for __mqtt_python_onmsg_{name} |
26 c_cb_type = ctypes.CFUNCTYPE(ctypes.c_int, # return |
26 mqtt_c_cb_onmsg_type = ctypes.CFUNCTYPE(ctypes.c_int, # return |
27 ctypes.c_char_p, # topic |
27 ctypes.c_char_p, # topic |
28 ctypes.POINTER(ctypes.c_char), # data |
28 ctypes.POINTER(ctypes.c_char), # data |
29 ctypes.c_uint32) # data length |
29 ctypes.c_uint32) # data length |
|
30 |
|
31 # C per client CallBack type for __mqtt_python_resub_{name} |
|
32 mqtt_c_cb_resub_type = ctypes.CFUNCTYPE(ctypes.c_int) # return |
30 |
33 |
31 # CallBacks management |
34 # CallBacks management |
32 # - each call to MQTT_subscribe registers a callback |
35 # - each call to MQTT_subscribe registers a callback |
33 MQTT_subscribers_cbs = {} |
36 MQTT_subscribers_cbs = {} |
34 |
37 |
35 # - one callback registered to C side per client |
38 # - one callback registered to C side per client |
36 MQTT_client_cbs = {} |
39 MQTT_client_cbs = {} |
37 |
40 |
38 def per_client_cb_factory(client): |
41 def mqtt_per_client_cb_factory(clientname): |
39 def per_client_cb(topic, dataptr, datalen): |
42 def per_client_onmsg_cb(topic, dataptr, datalen): |
40 payload = ctypes.string_at(dataptr, datalen) |
43 payload = ctypes.string_at(dataptr, datalen) |
41 subscriber = MQTT_subscribers_cbs[client].get(topic, None) |
44 subscriber,_Qos = MQTT_subscribers_cbs[clientname].get(topic, None) |
42 if subscriber: |
45 if subscriber: |
43 subscriber(topic, payload) |
46 subscriber(topic, payload) |
44 return 0 |
47 return 0 |
45 return 1 |
48 return 1 |
46 return per_client_cb |
49 def per_client_resub_cb(): |
|
50 for topic,(_cb,QoS) in MQTT_subscribers_cbs[clientname].items(): |
|
51 _MQTT_subscribe(clientname, topic, QoS) |
|
52 return 1 |
|
53 return per_client_onmsg_cb,per_client_resub_cb |
47 |
54 |
48 def MQTT_subscribe(clientname, topic, cb, QoS = 1): |
55 def _MQTT_subscribe(clientname, topic, QoS): |
49 global MQTT_client_cbs, MQTT_subscribers_cbs |
|
50 c_function_name = "__mqtt_python_subscribe_" + clientname |
56 c_function_name = "__mqtt_python_subscribe_" + clientname |
51 c_function = getattr(PLCBinary, c_function_name) |
57 c_function = getattr(PLCBinary, c_function_name) |
52 c_function.restype = ctypes.c_int # error or 0 |
58 c_function.restype = ctypes.c_int # error or 0 |
53 c_function.argtypes = [ |
59 c_function.argtypes = [ |
54 ctypes.c_char_p, # topic |
60 ctypes.c_char_p, # topic |
55 ctypes.c_uint8] # QoS |
61 ctypes.c_uint8] # QoS |
56 |
62 |
57 MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = cb |
63 return c_function(topic, QoS) |
58 |
64 |
59 c_cb = MQTT_client_cbs.get(clientname, None) |
65 def MQTT_subscribe(clientname, topic, cb, QoS = 1): |
60 if c_cb is None: |
66 global MQTT_client_cbs, MQTT_subscribers_cbs |
61 c_cb = c_cb_type(per_client_cb_factory(clientname)) |
67 |
62 MQTT_client_cbs[clientname] = c_cb |
68 MQTT_subscribers_cbs.setdefault(clientname, {})[topic] = (cb, QoS) |
|
69 res = _MQTT_subscribe(clientname, topic, QoS) |
|
70 |
|
71 c_cbs = MQTT_client_cbs.get(clientname, None) |
|
72 if c_cbs is None: |
|
73 cb_onmsg, cb_resub = mqtt_per_client_cb_factory(clientname) |
|
74 c_cbs = (mqtt_c_cb_onmsg_type(cb_onmsg), |
|
75 mqtt_c_cb_resub_type(cb_resub)) |
|
76 MQTT_client_cbs[clientname] = c_cbs |
63 register_c_function = getattr(PLCBinary, "__mqtt_python_callback_setter_"+clientname ) |
77 register_c_function = getattr(PLCBinary, "__mqtt_python_callback_setter_"+clientname ) |
64 register_c_function.argtypes = [c_cb_type] |
78 register_c_function.argtypes = [mqtt_c_cb_onmsg_type, mqtt_c_cb_resub_type] |
65 register_c_function(c_cb) |
79 register_c_function(*c_cbs) |
66 |
80 |
67 res = c_function(topic, QoS) |
|
68 return res |
81 return res |
69 |
82 |
70 """ |
83 """ |
71 |
84 |
72 class MQTTLibrary(POULibrary): |
85 class MQTTLibrary(POULibrary): |