diff -r 5d174cdf4d98 -r 38f7122ccbf9 svghmi/svghmi_server.py --- a/svghmi/svghmi_server.py Mon Jul 05 10:51:02 2021 +0200 +++ b/svghmi/svghmi_server.py Wed Jul 07 16:31:13 2021 +0200 @@ -23,55 +23,124 @@ from autobahn.websocket.protocol import WebSocketProtocol from autobahn.twisted.resource import WebSocketResource -# TODO multiclient : -# session list lock -# svghmi_sessions = [] -# svghmi_watchdogs = [] - -svghmi_session = None +max_svghmi_sessions = None svghmi_watchdog = None svghmi_send_collect = PLCBinary.svghmi_send_collect svghmi_send_collect.restype = ctypes.c_int # error or 0 svghmi_send_collect.argtypes = [ + ctypes.c_uint32, # index ctypes.POINTER(ctypes.c_uint32), # size ctypes.POINTER(ctypes.c_void_p)] # data ptr -# TODO multiclient : switch to arrays svghmi_recv_dispatch = PLCBinary.svghmi_recv_dispatch svghmi_recv_dispatch.restype = ctypes.c_int # error or 0 svghmi_recv_dispatch.argtypes = [ + ctypes.c_uint32, # index ctypes.c_uint32, # size ctypes.c_char_p] # data ptr -# TODO multiclient : switch to arrays + +class HMISessionMgr(object): + def __init__(self): + self.multiclient_sessions = set() + self.watchdog_session = None + self.session_count = 0 + self.lock = RLock() + self.indexes = set() + + def next_index(self): + if self.indexes: + greatest = max(self.indexes) + holes = set(range(greatest)) - self.indexes + index = min(holes) if holes else greatest+1 + else: + index = 0 + self.indexes.add(index) + return index + + def free_index(self, index): + self.indexes.remove(index) + + def register(self, session): + global max_svghmi_sessions + with self.lock: + if session.is_watchdog_session: + # Creating a new watchdog session closes pre-existing one + if self.watchdog_session is not None: + self.watchdog_session.close() + self.unregister(self.watchdog_session) + self.free_index(self.watchdog_session.session_index) + else: + assert(self.session_count < max_svghmi_sessions) + self.session_count += 1 + + self.watchdog_session = session + else: + assert(self.session_count < max_svghmi_sessions) + self.multiclient_sessions.add(session) + self.session_count += 1 + session.session_index = self.next_index() + + def unregister(self, session): + with self.lock: + if session.is_watchdog_session: + assert(self.watchdog_session == session) + self.watchdog_session = None + else: + self.multiclient_sessions.remove(self) + self.free_index(self.watchdog_session.get_index()) + self.session_count -= 1 + + def close_all(self): + with self.lock: + close_list = list(self.multiclient_sessions) + if self.watchdog_session: + close_list.append(self.watchdog_session) + for session in close_list: + session.close() + self.unregister(session) + + def iter_sessions(self): + with self.lock: + nxt_session = self.watchdog_session + if nxt_session is not None: + yield nxt_session + idx = 0 + while True: + with self.lock: + if idx >= len(self.multiclient_sessions): + return + nxt_session = self.multiclient_sessions[idx] + yield nxt_session + idx += 1 + + +svghmi_session_manager = HMISessionMgr() + class HMISession(object): def __init__(self, protocol_instance): - global svghmi_session - - # Single client : - # Creating a new HMISession closes pre-existing HMISession - if svghmi_session is not None: - svghmi_session.close() - svghmi_session = self self.protocol_instance = protocol_instance - - # TODO multiclient : - # svghmi_sessions.append(self) - # get a unique bit index amont other svghmi_sessions, - # so that we can match flags passed by C->python callback + self._session_index = None + + @property + def is_watchdog_session(self): + return self.protocol_instance.has_watchdog + + @property + def session_index(self): + return self._session_index + + @session_index.setter + def session_index(self, value): + self._session_index = value def close(self): - global svghmi_session - if svghmi_session == self: - svghmi_session = None self.protocol_instance.sendClose(WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL) def onMessage(self, msg): # pass message to the C side recieve_message() - return svghmi_recv_dispatch(len(msg), msg) - - # TODO multiclient : pass client index as well + return svghmi_recv_dispatch(self.session_index, len(msg), msg) def sendMessage(self, msg): self.protocol_instance.sendMessage(msg, True) @@ -111,13 +180,15 @@ def trigger(self): self._callback() - # wait for initial timeout on re-start - self.feed(rearm=False) + # Don't repeat trigger periodically + # # wait for initial timeout on re-start + # self.feed(rearm=False) class HMIProtocol(WebSocketServerProtocol): def __init__(self, *args, **kwargs): self._hmi_session = None + self.has_watchdog = False WebSocketServerProtocol.__init__(self, *args, **kwargs) def onConnect(self, request): @@ -125,17 +196,22 @@ return WebSocketServerProtocol.onConnect(self, request) def onOpen(self): + global svghmi_session_manager assert(self._hmi_session is None) self._hmi_session = HMISession(self) + svghmi_session_manager.register(self._hmi_session) def onClose(self, wasClean, code, reason): + global svghmi_session_manager + svghmi_session_manager.unregister(self._hmi_session) self._hmi_session = None def onMessage(self, msg, isBinary): + global svghmi_watchdog assert(self._hmi_session is not None) result = self._hmi_session.onMessage(msg) - if result == 1 : # was heartbeat + if result == 1 and self.has_watchdog: # was heartbeat if svghmi_watchdog is not None: svghmi_watchdog.feed() @@ -146,23 +222,28 @@ svghmi_send_thread = None def SendThreadProc(): - global svghmi_session + global svghmi_session_manager size = ctypes.c_uint32() ptr = ctypes.c_void_p() res = 0 - while True: - res=svghmi_send_collect(ctypes.byref(size), ctypes.byref(ptr)) - if res == 0: - # TODO multiclient : dispatch to sessions - if svghmi_session is not None: - svghmi_session.sendMessage(ctypes.string_at(ptr.value,size.value)) - elif res == errno.ENODATA: - # this happens when there is no data after wakeup - # because of hmi data refresh period longer than PLC common ticktime - pass - else: - # this happens when finishing - break + finished = False + while not(finished): + for svghmi_session in svghmi_session_manager.iter_sessions(): + res = svghmi_send_collect( + svghmi_session.session_index, + ctypes.byref(size), ctypes.byref(ptr)) + if res == 0: + svghmi_session.sendMessage( + ctypes.string_at(ptr.value,size.value)) + elif res == errno.ENODATA: + # this happens when there is no data after wakeup + # because of hmi data refresh period longer than + # PLC common ticktime + pass + else: + # this happens when finishing + finished = True + break def AddPathToSVGHMIServers(path, factory): for k,v in svghmi_servers.iteritems(): @@ -182,8 +263,8 @@ def _runtime_00_svghmi_stop(): global svghmi_send_thread, svghmi_session - if svghmi_session is not None: - svghmi_session.close() + svghmi_session_manager.close_all() + # plc cleanup calls svghmi_(locstring)_cleanup and unlocks send thread svghmi_send_thread.join() svghmi_send_thread = None