SVGHMI: Implemented multiserver+multiclient, but only tested with single client and single server for now. To be continued... svghmi
authorEdouard Tisserant
Wed, 07 Jul 2021 16:31:13 +0200
branchsvghmi
changeset 3270 38f7122ccbf9
parent 3269 5d174cdf4d98
child 3271 561dbd1e3e04
SVGHMI: Implemented multiserver+multiclient, but only tested with single client and single server for now. To be continued...
svghmi/svghmi.c
svghmi/svghmi.py
svghmi/svghmi_server.py
--- a/svghmi/svghmi.c	Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi.c	Wed Jul 07 16:31:13 2021 +0200
@@ -9,6 +9,8 @@
 #define HMI_BUFFER_SIZE %(buffer_size)d
 #define HMI_ITEM_COUNT %(item_count)d
 #define HMI_HASH_SIZE 8
+#define MAX_CONNECTIONS %(max_connections)d
+
 static uint8_t hmi_hash[HMI_HASH_SIZE] = {%(hmi_hash_ints)s};
 
 /* PLC reads from that buffer */
@@ -45,11 +47,11 @@
 
     /* publish/write/send */
     long wlock;
-    buf_state_t wstate;
+    buf_state_t wstate[MAX_CONNECTIONS];
 
     /* zero means not subscribed */
-    uint16_t refresh_period_ms;
-    uint16_t age_ms;
+    uint16_t refresh_period_ms[MAX_CONNECTIONS];
+    uint16_t age_ms[MAX_CONNECTIONS];
 
     /* retrieve/read/recv */
     long rlock;
@@ -81,15 +83,16 @@
 
 static int write_iterator(uint32_t index, hmi_tree_item_t *dsc)
 {
-    if(AtomicCompareExchange(&dsc->wlock, 0, 1) == 0)
-    {
-        if(dsc->wstate == buf_set){
+    uint32_t session_index = 0;
+    if(AtomicCompareExchange(&dsc->wlock, 0, 1) == 0) while(session_index < MAX_CONNECTIONS)
+    {
+        if(dsc->wstate[session_index] == buf_set){
             /* if being subscribed */
-            if(dsc->refresh_period_ms){
-                if(dsc->age_ms + ticktime_ms < dsc->refresh_period_ms){
-                    dsc->age_ms += ticktime_ms;
+            if(dsc->refresh_period_ms[session_index]){
+                if(dsc->age_ms[session_index] + ticktime_ms < dsc->refresh_period_ms[session_index]){
+                    dsc->age_ms[session_index] += ticktime_ms;
                 }else{
-                    dsc->wstate = buf_tosend;
+                    dsc->wstate[session_index] = buf_tosend;
                     global_write_dirty = 1;
                 }
             }
@@ -105,34 +108,36 @@
         if(__Is_a_string(dsc)){
             sz = ((STRING*)visible_value_p)->len + 1;
         }
-        if(dsc->wstate == buf_new /* just subscribed 
+        if(dsc->wstate[session_index] == buf_new /* just subscribed 
            or already subscribed having value change */
-           || (dsc->refresh_period_ms > 0 && memcmp(dest_p, visible_value_p, sz) != 0)){
+           || (dsc->refresh_period_ms[session_index] > 0 && memcmp(dest_p, visible_value_p, sz) != 0)){
             /* copy and flag as set */
             memcpy(dest_p, visible_value_p, sz);
             /* if not already marked/signaled, do it */
-            if(dsc->wstate != buf_set && dsc->wstate != buf_tosend) {
-                if(dsc->wstate == buf_new || ticktime_ms > dsc->refresh_period_ms){
-                    dsc->wstate = buf_tosend;
+            if(dsc->wstate[session_index] != buf_set && dsc->wstate[session_index] != buf_tosend) {
+                if(dsc->wstate[session_index] == buf_new || ticktime_ms > dsc->refresh_period_ms[session_index]){
+                    dsc->wstate[session_index] = buf_tosend;
                     global_write_dirty = 1;
                 } else {
-                    dsc->wstate = buf_set;
-                }
-                dsc->age_ms = 0;
+                    dsc->wstate[session_index] = buf_set;
+                }
+                dsc->age_ms[session_index] = 0;
             }
         }
 
         AtomicCompareExchange(&dsc->wlock, 1, 0);
+        session_index++;
     }
     // else ... : PLC can't wait, variable will be updated next turn
     return 0;
 }
 
+static uint32_t send_session_index;
 static int send_iterator(uint32_t index, hmi_tree_item_t *dsc)
 {
     while(AtomicCompareExchange(&dsc->wlock, 0, 1)) sched_yield();
 
-    if(dsc->wstate == buf_tosend)
+    if(dsc->wstate[send_session_index] == buf_tosend)
     {
         uint32_t sz = __get_type_enum_size(dsc->type);
         if(sbufidx + sizeof(uint32_t) + sz <=  sizeof(sbuf))
@@ -145,7 +150,7 @@
             /* TODO : force into little endian */
             memcpy(dst_p, &index, sizeof(uint32_t));
             memcpy(dst_p + sizeof(uint32_t), src_p, sz);
-            dsc->wstate = buf_free;
+            dsc->wstate[send_session_index] = buf_free;
             sbufidx += sizeof(uint32_t) /* index */ + sz;
         }
         else
@@ -179,24 +184,25 @@
     return 0;
 }
 
-void update_refresh_period(hmi_tree_item_t *dsc, uint16_t refresh_period_ms)
+void update_refresh_period(hmi_tree_item_t *dsc, uint32_t session_index, uint16_t refresh_period_ms)
 {
     while(AtomicCompareExchange(&dsc->wlock, 0, 1)) sched_yield();
     if(refresh_period_ms) {
-        if(!dsc->refresh_period_ms)
-        {
-            dsc->wstate = buf_new;
+        if(!dsc->refresh_period_ms[session_index])
+        {
+            dsc->wstate[session_index] = buf_new;
         }
     } else {
-        dsc->wstate = buf_free;
-    }
-    dsc->refresh_period_ms = refresh_period_ms;
+        dsc->wstate[session_index] = buf_free;
+    }
+    dsc->refresh_period_ms[session_index] = refresh_period_ms;
     AtomicCompareExchange(&dsc->wlock, 1, 0);
 }
 
+static uint32_t reset_session_index;
 static int reset_iterator(uint32_t index, hmi_tree_item_t *dsc)
 {
-    update_refresh_period(dsc, 0);
+    update_refresh_period(dsc, reset_session_index, 0);
     return 0;
 }
 
@@ -236,13 +242,14 @@
 }
 
 /* PYTHON CALLS */
-int svghmi_send_collect(uint32_t *size, char **ptr){
+int svghmi_send_collect(uint32_t session_index, uint32_t *size, char **ptr){
 
     SVGHMI_SuspendFromPythonThread();
 
     if(continue_collect) {
         int res;
         sbufidx = HMI_HASH_SIZE;
+        send_session_index = session_index;
         if((res = traverse_hmi_tree(send_iterator)) == 0)
         {
             if(sbufidx > HMI_HASH_SIZE){
@@ -270,7 +277,7 @@
 
 // Returns :
 //   0 is OK, <0 is error, 1 is heartbeat
-int svghmi_recv_dispatch(uint32_t size, const uint8_t *ptr){
+int svghmi_recv_dispatch(uint32_t session_index, uint32_t size, const uint8_t *ptr){
     const uint8_t* cursor = ptr + HMI_HASH_SIZE;
     const uint8_t* end = ptr + size;
 
@@ -336,6 +343,7 @@
             case reset:
             {
                 progress = 0;
+                reset_session_index = session_index;
                 traverse_hmi_tree(reset_iterator);
             }
             break;
@@ -348,7 +356,7 @@
                 if(index < HMI_ITEM_COUNT)
                 {
                     hmi_tree_item_t *dsc = &hmi_tree_item[index];
-                    update_refresh_period(dsc, refresh_period_ms);
+                    update_refresh_period(dsc, session_index, refresh_period_ms);
                 }
                 else
                 {
--- a/svghmi/svghmi.py	Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi.py	Wed Jul 07 16:31:13 2021 +0200
@@ -47,12 +47,14 @@
 
 on_hmitree_update = None
 
+maxConnectionsTotal = 0
+
 class SVGHMILibrary(POULibrary):
     def GetLibraryPath(self):
          return paths.AbsNeighbourFile(__file__, "pous.xml")
 
     def Generate_C(self, buildpath, varlist, IECCFLAGS):
-        global hmi_tree_root, on_hmitree_update
+        global hmi_tree_root, on_hmitree_update, maxConnectionsTotal
 
         """
         PLC Instance Tree:
@@ -196,6 +198,11 @@
         # "programs_declarations": "\n".join(["extern %(type)s %(C_path)s;" %
         #                                     p for p in self._ProgramList]),
 
+        for CTNChild in self.GetCTR().IterChildren():
+            if isinstance(CTNChild, SVGHMI):
+                maxConnectionsTotal += CTNChild.GetParamsAttributes("SVGHMI.MaxConnections")["value"]
+
+
         # C code to observe/access HMI tree variables
         svghmi_c_filepath = paths.AbsNeighbourFile(__file__, "svghmi.c")
         svghmi_c_file = open(svghmi_c_filepath, 'r')
@@ -208,7 +215,8 @@
             "item_count": item_count,
             "var_access_code": targets.GetCode("var_access.c"),
             "PLC_ticktime": self.GetCTR().GetTicktime(),
-            "hmi_hash_ints": ",".join(map(str,hmi_tree_root.hash()))
+            "hmi_hash_ints": ",".join(map(str,hmi_tree_root.hash())),
+            "max_connections": maxConnectionsTotal
             }
 
         gen_svghmi_c_path = os.path.join(buildpath, "svghmi.c")
@@ -287,7 +295,8 @@
           <xsd:attribute name="WatchdogInterval" type="xsd:integer" use="optional" default="5"/>
           <xsd:attribute name="Port" type="xsd:integer" use="optional" default="8008"/>
           <xsd:attribute name="Interface" type="xsd:string" use="optional" default="localhost"/>
-          <xsd:attribute name="Path" type="xsd:string" use="optional" default=""/>
+          <xsd:attribute name="Path" type="xsd:string" use="optional" default="{name}"/>
+          <xsd:attribute name="MaxConnections" type="xsd:integer" use="optional" default="16"/>
         </xsd:complexType>
       </xsd:element>
     </xsd:schema>
@@ -562,19 +571,22 @@
 def svghmi_{location}_watchdog_trigger():
     {svghmi_cmds[Watchdog]}
 
-svghmi_watchdog = None
+max_svghmi_sessions = {maxConnections_total}
 
 def _runtime_{location}_svghmi_start():
     global svghmi_watchdog, svghmi_servers
 
     srv = svghmi_servers.get("{interface}:{port}", None)
     if srv is not None:
-        svghmi_root, svghmi_listener, path_list = srv 
+        svghmi_root, svghmi_listener, path_list = srv
         if '{path}' in path_list:
             raise Exception("SVGHMI {view_name}: path {path} already used on {interface}:{port}")
     else:
         svghmi_root = Resource()
-        svghmi_root.putChild("ws", WebSocketResource(HMIWebSocketServerFactory()))
+        factory = HMIWebSocketServerFactory()
+        factory.setProtocolOptions(maxConnections={maxConnections})
+
+        svghmi_root.putChild("ws", WebSocketResource(factory))
 
         svghmi_listener = reactor.listenTCP({port}, Site(svghmi_root), interface='{interface}')
         path_list = []
@@ -592,8 +604,8 @@
     if {enable_watchdog}:
         if svghmi_watchdog is None:
             svghmi_watchdog = Watchdog(
-                {watchdog_initial}, 
-                {watchdog_interval}, 
+                {watchdog_initial},
+                {watchdog_interval},
                 svghmi_{location}_watchdog_trigger)
         else:
             raise Exception("SVGHMI {view_name}: only one watchdog allowed")
@@ -628,6 +640,8 @@
                    enable_watchdog = enable_watchdog,
                    watchdog_initial = self.GetParamsAttributes("SVGHMI.WatchdogInitial")["value"],
                    watchdog_interval = self.GetParamsAttributes("SVGHMI.WatchdogInterval")["value"],
+                   maxConnections = self.GetParamsAttributes("SVGHMI.MaxConnections")["value"],
+                   maxConnections_total = maxConnectionsTotal
                    ))
 
         runtimefile.close()
--- a/svghmi/svghmi_server.py	Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi_server.py	Wed Jul 07 16:31:13 2021 +0200
@@ -23,55 +23,124 @@
 from autobahn.websocket.protocol import WebSocketProtocol
 from autobahn.twisted.resource import  WebSocketResource
 
-# TODO multiclient :
-# session list lock
-# svghmi_sessions = []
-# svghmi_watchdogs = []
-
-svghmi_session = None
+max_svghmi_sessions = None
 svghmi_watchdog = None
 
 svghmi_send_collect = PLCBinary.svghmi_send_collect
 svghmi_send_collect.restype = ctypes.c_int # error or 0
 svghmi_send_collect.argtypes = [
+    ctypes.c_uint32,  # index
     ctypes.POINTER(ctypes.c_uint32),  # size
     ctypes.POINTER(ctypes.c_void_p)]  # data ptr
-# TODO multiclient : switch to arrays
 
 svghmi_recv_dispatch = PLCBinary.svghmi_recv_dispatch
 svghmi_recv_dispatch.restype = ctypes.c_int # error or 0
 svghmi_recv_dispatch.argtypes = [
+    ctypes.c_uint32,  # index
     ctypes.c_uint32,  # size
     ctypes.c_char_p]  # data ptr
-# TODO multiclient : switch to arrays
+
+class HMISessionMgr(object):
+    def __init__(self):
+        self.multiclient_sessions = set()
+        self.watchdog_session = None
+        self.session_count = 0
+        self.lock = RLock()
+        self.indexes = set()
+
+    def next_index(self):
+        if self.indexes:
+            greatest = max(self.indexes)
+            holes = set(range(greatest)) - self.indexes
+            index = min(holes) if holes else greatest+1
+        else:
+            index = 0
+        self.indexes.add(index)
+        return index
+
+    def free_index(self, index):
+        self.indexes.remove(index)
+
+    def register(self, session):
+        global max_svghmi_sessions
+        with self.lock:
+            if session.is_watchdog_session:
+                # Creating a new watchdog session closes pre-existing one
+                if self.watchdog_session is not None:
+                    self.watchdog_session.close()
+                    self.unregister(self.watchdog_session)
+                    self.free_index(self.watchdog_session.session_index)
+                else:
+                    assert(self.session_count < max_svghmi_sessions)
+                    self.session_count += 1
+
+                self.watchdog_session = session
+            else:
+                assert(self.session_count < max_svghmi_sessions)
+                self.multiclient_sessions.add(session)
+                self.session_count += 1
+            session.session_index = self.next_index()
+
+    def unregister(self, session):
+        with self.lock:
+            if session.is_watchdog_session:
+                assert(self.watchdog_session == session)
+                self.watchdog_session = None
+            else:
+                self.multiclient_sessions.remove(self)
+            self.free_index(self.watchdog_session.get_index())
+            self.session_count -= 1
+        
+    def close_all(self):
+        with self.lock:
+            close_list = list(self.multiclient_sessions)
+            if self.watchdog_session:
+                close_list.append(self.watchdog_session)
+            for session in close_list:
+                session.close()
+                self.unregister(session)
+        
+    def iter_sessions(self):
+        with self.lock:
+            nxt_session = self.watchdog_session
+        if nxt_session is not None:
+            yield nxt_session
+        idx = 0
+        while True:
+            with self.lock:
+                if idx >= len(self.multiclient_sessions):
+                    return
+                nxt_session = self.multiclient_sessions[idx]
+            yield nxt_session
+            idx += 1
+
+
+svghmi_session_manager = HMISessionMgr()
+
 
 class HMISession(object):
     def __init__(self, protocol_instance):
-        global svghmi_session
-        
-        # Single client :
-        # Creating a new HMISession closes pre-existing HMISession
-        if svghmi_session is not None:
-            svghmi_session.close()
-        svghmi_session = self
         self.protocol_instance = protocol_instance
-
-        # TODO multiclient :
-        # svghmi_sessions.append(self)
-        # get a unique bit index amont other svghmi_sessions,
-        # so that we can match flags passed by C->python callback
+        self._session_index = None
+
+    @property
+    def is_watchdog_session(self):
+        return self.protocol_instance.has_watchdog
+
+    @property
+    def session_index(self):
+        return self._session_index
+
+    @session_index.setter
+    def session_index(self, value):
+        self._session_index = value
 
     def close(self):
-        global svghmi_session
-        if svghmi_session == self:
-            svghmi_session = None
         self.protocol_instance.sendClose(WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL)
 
     def onMessage(self, msg):
         # pass message to the C side recieve_message()
-        return svghmi_recv_dispatch(len(msg), msg)
-
-        # TODO multiclient : pass client index as well
+        return svghmi_recv_dispatch(self.session_index, len(msg), msg)
 
     def sendMessage(self, msg):
         self.protocol_instance.sendMessage(msg, True)
@@ -111,13 +180,15 @@
 
     def trigger(self):
         self._callback()
-        # wait for initial timeout on re-start
-        self.feed(rearm=False)
+        # Don't repeat trigger periodically
+        # # wait for initial timeout on re-start
+        # self.feed(rearm=False)
 
 class HMIProtocol(WebSocketServerProtocol):
 
     def __init__(self, *args, **kwargs):
         self._hmi_session = None
+        self.has_watchdog = False
         WebSocketServerProtocol.__init__(self, *args, **kwargs)
 
     def onConnect(self, request):
@@ -125,17 +196,22 @@
         return WebSocketServerProtocol.onConnect(self, request)
 
     def onOpen(self):
+        global svghmi_session_manager
         assert(self._hmi_session is None)
         self._hmi_session = HMISession(self)
+        svghmi_session_manager.register(self._hmi_session)
 
     def onClose(self, wasClean, code, reason):
+        global svghmi_session_manager
+        svghmi_session_manager.unregister(self._hmi_session)
         self._hmi_session = None
 
     def onMessage(self, msg, isBinary):
+        global svghmi_watchdog
         assert(self._hmi_session is not None)
 
         result = self._hmi_session.onMessage(msg)
-        if result == 1 :  # was heartbeat
+        if result == 1 and self.has_watchdog:  # was heartbeat
             if svghmi_watchdog is not None:
                 svghmi_watchdog.feed()
 
@@ -146,23 +222,28 @@
 svghmi_send_thread = None
 
 def SendThreadProc():
-    global svghmi_session
+    global svghmi_session_manager
     size = ctypes.c_uint32()
     ptr = ctypes.c_void_p()
     res = 0
-    while True:
-        res=svghmi_send_collect(ctypes.byref(size), ctypes.byref(ptr))
-        if res == 0:
-            # TODO multiclient : dispatch to sessions
-            if svghmi_session is not None:
-                svghmi_session.sendMessage(ctypes.string_at(ptr.value,size.value))
-        elif res == errno.ENODATA:
-            # this happens when there is no data after wakeup
-            # because of hmi data refresh period longer than PLC common ticktime
-            pass 
-        else:
-            # this happens when finishing
-            break
+    finished = False
+    while not(finished):
+        for svghmi_session in svghmi_session_manager.iter_sessions():
+            res = svghmi_send_collect(
+                svghmi_session.session_index,
+                ctypes.byref(size), ctypes.byref(ptr))
+            if res == 0:
+                svghmi_session.sendMessage(
+                    ctypes.string_at(ptr.value,size.value))
+            elif res == errno.ENODATA:
+                # this happens when there is no data after wakeup
+                # because of hmi data refresh period longer than 
+                # PLC common ticktime
+                pass 
+            else:
+                # this happens when finishing
+                finished = True
+                break
 
 def AddPathToSVGHMIServers(path, factory):
     for k,v in svghmi_servers.iteritems():
@@ -182,8 +263,8 @@
 def _runtime_00_svghmi_stop():
     global svghmi_send_thread, svghmi_session
 
-    if svghmi_session is not None:
-        svghmi_session.close()
+    svghmi_session_manager.close_all()
+
     # plc cleanup calls svghmi_(locstring)_cleanup and unlocks send thread
     svghmi_send_thread.join()
     svghmi_send_thread = None