--- a/svghmi/svghmi_server.py Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi_server.py Wed Jul 07 16:31:13 2021 +0200
@@ -23,55 +23,124 @@
from autobahn.websocket.protocol import WebSocketProtocol
from autobahn.twisted.resource import WebSocketResource
-# TODO multiclient :
-# session list lock
-# svghmi_sessions = []
-# svghmi_watchdogs = []
-
-svghmi_session = None
+max_svghmi_sessions = None
svghmi_watchdog = None
svghmi_send_collect = PLCBinary.svghmi_send_collect
svghmi_send_collect.restype = ctypes.c_int # error or 0
svghmi_send_collect.argtypes = [
+ ctypes.c_uint32, # index
ctypes.POINTER(ctypes.c_uint32), # size
ctypes.POINTER(ctypes.c_void_p)] # data ptr
-# TODO multiclient : switch to arrays
svghmi_recv_dispatch = PLCBinary.svghmi_recv_dispatch
svghmi_recv_dispatch.restype = ctypes.c_int # error or 0
svghmi_recv_dispatch.argtypes = [
+ ctypes.c_uint32, # index
ctypes.c_uint32, # size
ctypes.c_char_p] # data ptr
-# TODO multiclient : switch to arrays
+
+class HMISessionMgr(object):
+ def __init__(self):
+ self.multiclient_sessions = set()
+ self.watchdog_session = None
+ self.session_count = 0
+ self.lock = RLock()
+ self.indexes = set()
+
+ def next_index(self):
+ if self.indexes:
+ greatest = max(self.indexes)
+ holes = set(range(greatest)) - self.indexes
+ index = min(holes) if holes else greatest+1
+ else:
+ index = 0
+ self.indexes.add(index)
+ return index
+
+ def free_index(self, index):
+ self.indexes.remove(index)
+
+ def register(self, session):
+ global max_svghmi_sessions
+ with self.lock:
+ if session.is_watchdog_session:
+ # Creating a new watchdog session closes pre-existing one
+ if self.watchdog_session is not None:
+ self.watchdog_session.close()
+ self.unregister(self.watchdog_session)
+ self.free_index(self.watchdog_session.session_index)
+ else:
+ assert(self.session_count < max_svghmi_sessions)
+ self.session_count += 1
+
+ self.watchdog_session = session
+ else:
+ assert(self.session_count < max_svghmi_sessions)
+ self.multiclient_sessions.add(session)
+ self.session_count += 1
+ session.session_index = self.next_index()
+
+ def unregister(self, session):
+ with self.lock:
+ if session.is_watchdog_session:
+ assert(self.watchdog_session == session)
+ self.watchdog_session = None
+ else:
+ self.multiclient_sessions.remove(self)
+ self.free_index(self.watchdog_session.get_index())
+ self.session_count -= 1
+
+ def close_all(self):
+ with self.lock:
+ close_list = list(self.multiclient_sessions)
+ if self.watchdog_session:
+ close_list.append(self.watchdog_session)
+ for session in close_list:
+ session.close()
+ self.unregister(session)
+
+ def iter_sessions(self):
+ with self.lock:
+ nxt_session = self.watchdog_session
+ if nxt_session is not None:
+ yield nxt_session
+ idx = 0
+ while True:
+ with self.lock:
+ if idx >= len(self.multiclient_sessions):
+ return
+ nxt_session = self.multiclient_sessions[idx]
+ yield nxt_session
+ idx += 1
+
+
+svghmi_session_manager = HMISessionMgr()
+
class HMISession(object):
def __init__(self, protocol_instance):
- global svghmi_session
-
- # Single client :
- # Creating a new HMISession closes pre-existing HMISession
- if svghmi_session is not None:
- svghmi_session.close()
- svghmi_session = self
self.protocol_instance = protocol_instance
-
- # TODO multiclient :
- # svghmi_sessions.append(self)
- # get a unique bit index amont other svghmi_sessions,
- # so that we can match flags passed by C->python callback
+ self._session_index = None
+
+ @property
+ def is_watchdog_session(self):
+ return self.protocol_instance.has_watchdog
+
+ @property
+ def session_index(self):
+ return self._session_index
+
+ @session_index.setter
+ def session_index(self, value):
+ self._session_index = value
def close(self):
- global svghmi_session
- if svghmi_session == self:
- svghmi_session = None
self.protocol_instance.sendClose(WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL)
def onMessage(self, msg):
# pass message to the C side recieve_message()
- return svghmi_recv_dispatch(len(msg), msg)
-
- # TODO multiclient : pass client index as well
+ return svghmi_recv_dispatch(self.session_index, len(msg), msg)
def sendMessage(self, msg):
self.protocol_instance.sendMessage(msg, True)
@@ -111,13 +180,15 @@
def trigger(self):
self._callback()
- # wait for initial timeout on re-start
- self.feed(rearm=False)
+ # Don't repeat trigger periodically
+ # # wait for initial timeout on re-start
+ # self.feed(rearm=False)
class HMIProtocol(WebSocketServerProtocol):
def __init__(self, *args, **kwargs):
self._hmi_session = None
+ self.has_watchdog = False
WebSocketServerProtocol.__init__(self, *args, **kwargs)
def onConnect(self, request):
@@ -125,17 +196,22 @@
return WebSocketServerProtocol.onConnect(self, request)
def onOpen(self):
+ global svghmi_session_manager
assert(self._hmi_session is None)
self._hmi_session = HMISession(self)
+ svghmi_session_manager.register(self._hmi_session)
def onClose(self, wasClean, code, reason):
+ global svghmi_session_manager
+ svghmi_session_manager.unregister(self._hmi_session)
self._hmi_session = None
def onMessage(self, msg, isBinary):
+ global svghmi_watchdog
assert(self._hmi_session is not None)
result = self._hmi_session.onMessage(msg)
- if result == 1 : # was heartbeat
+ if result == 1 and self.has_watchdog: # was heartbeat
if svghmi_watchdog is not None:
svghmi_watchdog.feed()
@@ -146,23 +222,28 @@
svghmi_send_thread = None
def SendThreadProc():
- global svghmi_session
+ global svghmi_session_manager
size = ctypes.c_uint32()
ptr = ctypes.c_void_p()
res = 0
- while True:
- res=svghmi_send_collect(ctypes.byref(size), ctypes.byref(ptr))
- if res == 0:
- # TODO multiclient : dispatch to sessions
- if svghmi_session is not None:
- svghmi_session.sendMessage(ctypes.string_at(ptr.value,size.value))
- elif res == errno.ENODATA:
- # this happens when there is no data after wakeup
- # because of hmi data refresh period longer than PLC common ticktime
- pass
- else:
- # this happens when finishing
- break
+ finished = False
+ while not(finished):
+ for svghmi_session in svghmi_session_manager.iter_sessions():
+ res = svghmi_send_collect(
+ svghmi_session.session_index,
+ ctypes.byref(size), ctypes.byref(ptr))
+ if res == 0:
+ svghmi_session.sendMessage(
+ ctypes.string_at(ptr.value,size.value))
+ elif res == errno.ENODATA:
+ # this happens when there is no data after wakeup
+ # because of hmi data refresh period longer than
+ # PLC common ticktime
+ pass
+ else:
+ # this happens when finishing
+ finished = True
+ break
def AddPathToSVGHMIServers(path, factory):
for k,v in svghmi_servers.iteritems():
@@ -182,8 +263,8 @@
def _runtime_00_svghmi_stop():
global svghmi_send_thread, svghmi_session
- if svghmi_session is not None:
- svghmi_session.close()
+ svghmi_session_manager.close_all()
+
# plc cleanup calls svghmi_(locstring)_cleanup and unlocks send thread
svghmi_send_thread.join()
svghmi_send_thread = None