svghmi/svghmi_server.py
branchsvghmi
changeset 3270 38f7122ccbf9
parent 3269 5d174cdf4d98
child 3271 561dbd1e3e04
--- a/svghmi/svghmi_server.py	Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi_server.py	Wed Jul 07 16:31:13 2021 +0200
@@ -23,55 +23,124 @@
 from autobahn.websocket.protocol import WebSocketProtocol
 from autobahn.twisted.resource import  WebSocketResource
 
-# TODO multiclient :
-# session list lock
-# svghmi_sessions = []
-# svghmi_watchdogs = []
-
-svghmi_session = None
+max_svghmi_sessions = None
 svghmi_watchdog = None
 
 svghmi_send_collect = PLCBinary.svghmi_send_collect
 svghmi_send_collect.restype = ctypes.c_int # error or 0
 svghmi_send_collect.argtypes = [
+    ctypes.c_uint32,  # index
     ctypes.POINTER(ctypes.c_uint32),  # size
     ctypes.POINTER(ctypes.c_void_p)]  # data ptr
-# TODO multiclient : switch to arrays
 
 svghmi_recv_dispatch = PLCBinary.svghmi_recv_dispatch
 svghmi_recv_dispatch.restype = ctypes.c_int # error or 0
 svghmi_recv_dispatch.argtypes = [
+    ctypes.c_uint32,  # index
     ctypes.c_uint32,  # size
     ctypes.c_char_p]  # data ptr
-# TODO multiclient : switch to arrays
+
+class HMISessionMgr(object):
+    def __init__(self):
+        self.multiclient_sessions = set()
+        self.watchdog_session = None
+        self.session_count = 0
+        self.lock = RLock()
+        self.indexes = set()
+
+    def next_index(self):
+        if self.indexes:
+            greatest = max(self.indexes)
+            holes = set(range(greatest)) - self.indexes
+            index = min(holes) if holes else greatest+1
+        else:
+            index = 0
+        self.indexes.add(index)
+        return index
+
+    def free_index(self, index):
+        self.indexes.remove(index)
+
+    def register(self, session):
+        global max_svghmi_sessions
+        with self.lock:
+            if session.is_watchdog_session:
+                # Creating a new watchdog session closes pre-existing one
+                if self.watchdog_session is not None:
+                    self.watchdog_session.close()
+                    self.unregister(self.watchdog_session)
+                    self.free_index(self.watchdog_session.session_index)
+                else:
+                    assert(self.session_count < max_svghmi_sessions)
+                    self.session_count += 1
+
+                self.watchdog_session = session
+            else:
+                assert(self.session_count < max_svghmi_sessions)
+                self.multiclient_sessions.add(session)
+                self.session_count += 1
+            session.session_index = self.next_index()
+
+    def unregister(self, session):
+        with self.lock:
+            if session.is_watchdog_session:
+                assert(self.watchdog_session == session)
+                self.watchdog_session = None
+            else:
+                self.multiclient_sessions.remove(self)
+            self.free_index(self.watchdog_session.get_index())
+            self.session_count -= 1
+        
+    def close_all(self):
+        with self.lock:
+            close_list = list(self.multiclient_sessions)
+            if self.watchdog_session:
+                close_list.append(self.watchdog_session)
+            for session in close_list:
+                session.close()
+                self.unregister(session)
+        
+    def iter_sessions(self):
+        with self.lock:
+            nxt_session = self.watchdog_session
+        if nxt_session is not None:
+            yield nxt_session
+        idx = 0
+        while True:
+            with self.lock:
+                if idx >= len(self.multiclient_sessions):
+                    return
+                nxt_session = self.multiclient_sessions[idx]
+            yield nxt_session
+            idx += 1
+
+
+svghmi_session_manager = HMISessionMgr()
+
 
 class HMISession(object):
     def __init__(self, protocol_instance):
-        global svghmi_session
-        
-        # Single client :
-        # Creating a new HMISession closes pre-existing HMISession
-        if svghmi_session is not None:
-            svghmi_session.close()
-        svghmi_session = self
         self.protocol_instance = protocol_instance
-
-        # TODO multiclient :
-        # svghmi_sessions.append(self)
-        # get a unique bit index amont other svghmi_sessions,
-        # so that we can match flags passed by C->python callback
+        self._session_index = None
+
+    @property
+    def is_watchdog_session(self):
+        return self.protocol_instance.has_watchdog
+
+    @property
+    def session_index(self):
+        return self._session_index
+
+    @session_index.setter
+    def session_index(self, value):
+        self._session_index = value
 
     def close(self):
-        global svghmi_session
-        if svghmi_session == self:
-            svghmi_session = None
         self.protocol_instance.sendClose(WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL)
 
     def onMessage(self, msg):
         # pass message to the C side recieve_message()
-        return svghmi_recv_dispatch(len(msg), msg)
-
-        # TODO multiclient : pass client index as well
+        return svghmi_recv_dispatch(self.session_index, len(msg), msg)
 
     def sendMessage(self, msg):
         self.protocol_instance.sendMessage(msg, True)
@@ -111,13 +180,15 @@
 
     def trigger(self):
         self._callback()
-        # wait for initial timeout on re-start
-        self.feed(rearm=False)
+        # Don't repeat trigger periodically
+        # # wait for initial timeout on re-start
+        # self.feed(rearm=False)
 
 class HMIProtocol(WebSocketServerProtocol):
 
     def __init__(self, *args, **kwargs):
         self._hmi_session = None
+        self.has_watchdog = False
         WebSocketServerProtocol.__init__(self, *args, **kwargs)
 
     def onConnect(self, request):
@@ -125,17 +196,22 @@
         return WebSocketServerProtocol.onConnect(self, request)
 
     def onOpen(self):
+        global svghmi_session_manager
         assert(self._hmi_session is None)
         self._hmi_session = HMISession(self)
+        svghmi_session_manager.register(self._hmi_session)
 
     def onClose(self, wasClean, code, reason):
+        global svghmi_session_manager
+        svghmi_session_manager.unregister(self._hmi_session)
         self._hmi_session = None
 
     def onMessage(self, msg, isBinary):
+        global svghmi_watchdog
         assert(self._hmi_session is not None)
 
         result = self._hmi_session.onMessage(msg)
-        if result == 1 :  # was heartbeat
+        if result == 1 and self.has_watchdog:  # was heartbeat
             if svghmi_watchdog is not None:
                 svghmi_watchdog.feed()
 
@@ -146,23 +222,28 @@
 svghmi_send_thread = None
 
 def SendThreadProc():
-    global svghmi_session
+    global svghmi_session_manager
     size = ctypes.c_uint32()
     ptr = ctypes.c_void_p()
     res = 0
-    while True:
-        res=svghmi_send_collect(ctypes.byref(size), ctypes.byref(ptr))
-        if res == 0:
-            # TODO multiclient : dispatch to sessions
-            if svghmi_session is not None:
-                svghmi_session.sendMessage(ctypes.string_at(ptr.value,size.value))
-        elif res == errno.ENODATA:
-            # this happens when there is no data after wakeup
-            # because of hmi data refresh period longer than PLC common ticktime
-            pass 
-        else:
-            # this happens when finishing
-            break
+    finished = False
+    while not(finished):
+        for svghmi_session in svghmi_session_manager.iter_sessions():
+            res = svghmi_send_collect(
+                svghmi_session.session_index,
+                ctypes.byref(size), ctypes.byref(ptr))
+            if res == 0:
+                svghmi_session.sendMessage(
+                    ctypes.string_at(ptr.value,size.value))
+            elif res == errno.ENODATA:
+                # this happens when there is no data after wakeup
+                # because of hmi data refresh period longer than 
+                # PLC common ticktime
+                pass 
+            else:
+                # this happens when finishing
+                finished = True
+                break
 
 def AddPathToSVGHMIServers(path, factory):
     for k,v in svghmi_servers.iteritems():
@@ -182,8 +263,8 @@
 def _runtime_00_svghmi_stop():
     global svghmi_send_thread, svghmi_session
 
-    if svghmi_session is not None:
-        svghmi_session.close()
+    svghmi_session_manager.close_all()
+
     # plc cleanup calls svghmi_(locstring)_cleanup and unlocks send thread
     svghmi_send_thread.join()
     svghmi_send_thread = None