SVGHMI: Implemented multiserver+multiclient, but only tested with single client and single server for now. To be continued...
--- a/svghmi/svghmi.c Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi.c Wed Jul 07 16:31:13 2021 +0200
@@ -9,6 +9,8 @@
#define HMI_BUFFER_SIZE %(buffer_size)d
#define HMI_ITEM_COUNT %(item_count)d
#define HMI_HASH_SIZE 8
+#define MAX_CONNECTIONS %(max_connections)d
+
static uint8_t hmi_hash[HMI_HASH_SIZE] = {%(hmi_hash_ints)s};
/* PLC reads from that buffer */
@@ -45,11 +47,11 @@
/* publish/write/send */
long wlock;
- buf_state_t wstate;
+ buf_state_t wstate[MAX_CONNECTIONS];
/* zero means not subscribed */
- uint16_t refresh_period_ms;
- uint16_t age_ms;
+ uint16_t refresh_period_ms[MAX_CONNECTIONS];
+ uint16_t age_ms[MAX_CONNECTIONS];
/* retrieve/read/recv */
long rlock;
@@ -81,15 +83,16 @@
static int write_iterator(uint32_t index, hmi_tree_item_t *dsc)
{
- if(AtomicCompareExchange(&dsc->wlock, 0, 1) == 0)
- {
- if(dsc->wstate == buf_set){
+ uint32_t session_index = 0;
+ if(AtomicCompareExchange(&dsc->wlock, 0, 1) == 0) while(session_index < MAX_CONNECTIONS)
+ {
+ if(dsc->wstate[session_index] == buf_set){
/* if being subscribed */
- if(dsc->refresh_period_ms){
- if(dsc->age_ms + ticktime_ms < dsc->refresh_period_ms){
- dsc->age_ms += ticktime_ms;
+ if(dsc->refresh_period_ms[session_index]){
+ if(dsc->age_ms[session_index] + ticktime_ms < dsc->refresh_period_ms[session_index]){
+ dsc->age_ms[session_index] += ticktime_ms;
}else{
- dsc->wstate = buf_tosend;
+ dsc->wstate[session_index] = buf_tosend;
global_write_dirty = 1;
}
}
@@ -105,34 +108,36 @@
if(__Is_a_string(dsc)){
sz = ((STRING*)visible_value_p)->len + 1;
}
- if(dsc->wstate == buf_new /* just subscribed
+ if(dsc->wstate[session_index] == buf_new /* just subscribed
or already subscribed having value change */
- || (dsc->refresh_period_ms > 0 && memcmp(dest_p, visible_value_p, sz) != 0)){
+ || (dsc->refresh_period_ms[session_index] > 0 && memcmp(dest_p, visible_value_p, sz) != 0)){
/* copy and flag as set */
memcpy(dest_p, visible_value_p, sz);
/* if not already marked/signaled, do it */
- if(dsc->wstate != buf_set && dsc->wstate != buf_tosend) {
- if(dsc->wstate == buf_new || ticktime_ms > dsc->refresh_period_ms){
- dsc->wstate = buf_tosend;
+ if(dsc->wstate[session_index] != buf_set && dsc->wstate[session_index] != buf_tosend) {
+ if(dsc->wstate[session_index] == buf_new || ticktime_ms > dsc->refresh_period_ms[session_index]){
+ dsc->wstate[session_index] = buf_tosend;
global_write_dirty = 1;
} else {
- dsc->wstate = buf_set;
- }
- dsc->age_ms = 0;
+ dsc->wstate[session_index] = buf_set;
+ }
+ dsc->age_ms[session_index] = 0;
}
}
AtomicCompareExchange(&dsc->wlock, 1, 0);
+ session_index++;
}
// else ... : PLC can't wait, variable will be updated next turn
return 0;
}
+static uint32_t send_session_index;
static int send_iterator(uint32_t index, hmi_tree_item_t *dsc)
{
while(AtomicCompareExchange(&dsc->wlock, 0, 1)) sched_yield();
- if(dsc->wstate == buf_tosend)
+ if(dsc->wstate[send_session_index] == buf_tosend)
{
uint32_t sz = __get_type_enum_size(dsc->type);
if(sbufidx + sizeof(uint32_t) + sz <= sizeof(sbuf))
@@ -145,7 +150,7 @@
/* TODO : force into little endian */
memcpy(dst_p, &index, sizeof(uint32_t));
memcpy(dst_p + sizeof(uint32_t), src_p, sz);
- dsc->wstate = buf_free;
+ dsc->wstate[send_session_index] = buf_free;
sbufidx += sizeof(uint32_t) /* index */ + sz;
}
else
@@ -179,24 +184,25 @@
return 0;
}
-void update_refresh_period(hmi_tree_item_t *dsc, uint16_t refresh_period_ms)
+void update_refresh_period(hmi_tree_item_t *dsc, uint32_t session_index, uint16_t refresh_period_ms)
{
while(AtomicCompareExchange(&dsc->wlock, 0, 1)) sched_yield();
if(refresh_period_ms) {
- if(!dsc->refresh_period_ms)
- {
- dsc->wstate = buf_new;
+ if(!dsc->refresh_period_ms[session_index])
+ {
+ dsc->wstate[session_index] = buf_new;
}
} else {
- dsc->wstate = buf_free;
- }
- dsc->refresh_period_ms = refresh_period_ms;
+ dsc->wstate[session_index] = buf_free;
+ }
+ dsc->refresh_period_ms[session_index] = refresh_period_ms;
AtomicCompareExchange(&dsc->wlock, 1, 0);
}
+static uint32_t reset_session_index;
static int reset_iterator(uint32_t index, hmi_tree_item_t *dsc)
{
- update_refresh_period(dsc, 0);
+ update_refresh_period(dsc, reset_session_index, 0);
return 0;
}
@@ -236,13 +242,14 @@
}
/* PYTHON CALLS */
-int svghmi_send_collect(uint32_t *size, char **ptr){
+int svghmi_send_collect(uint32_t session_index, uint32_t *size, char **ptr){
SVGHMI_SuspendFromPythonThread();
if(continue_collect) {
int res;
sbufidx = HMI_HASH_SIZE;
+ send_session_index = session_index;
if((res = traverse_hmi_tree(send_iterator)) == 0)
{
if(sbufidx > HMI_HASH_SIZE){
@@ -270,7 +277,7 @@
// Returns :
// 0 is OK, <0 is error, 1 is heartbeat
-int svghmi_recv_dispatch(uint32_t size, const uint8_t *ptr){
+int svghmi_recv_dispatch(uint32_t session_index, uint32_t size, const uint8_t *ptr){
const uint8_t* cursor = ptr + HMI_HASH_SIZE;
const uint8_t* end = ptr + size;
@@ -336,6 +343,7 @@
case reset:
{
progress = 0;
+ reset_session_index = session_index;
traverse_hmi_tree(reset_iterator);
}
break;
@@ -348,7 +356,7 @@
if(index < HMI_ITEM_COUNT)
{
hmi_tree_item_t *dsc = &hmi_tree_item[index];
- update_refresh_period(dsc, refresh_period_ms);
+ update_refresh_period(dsc, session_index, refresh_period_ms);
}
else
{
--- a/svghmi/svghmi.py Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi.py Wed Jul 07 16:31:13 2021 +0200
@@ -47,12 +47,14 @@
on_hmitree_update = None
+maxConnectionsTotal = 0
+
class SVGHMILibrary(POULibrary):
def GetLibraryPath(self):
return paths.AbsNeighbourFile(__file__, "pous.xml")
def Generate_C(self, buildpath, varlist, IECCFLAGS):
- global hmi_tree_root, on_hmitree_update
+ global hmi_tree_root, on_hmitree_update, maxConnectionsTotal
"""
PLC Instance Tree:
@@ -196,6 +198,11 @@
# "programs_declarations": "\n".join(["extern %(type)s %(C_path)s;" %
# p for p in self._ProgramList]),
+ for CTNChild in self.GetCTR().IterChildren():
+ if isinstance(CTNChild, SVGHMI):
+ maxConnectionsTotal += CTNChild.GetParamsAttributes("SVGHMI.MaxConnections")["value"]
+
+
# C code to observe/access HMI tree variables
svghmi_c_filepath = paths.AbsNeighbourFile(__file__, "svghmi.c")
svghmi_c_file = open(svghmi_c_filepath, 'r')
@@ -208,7 +215,8 @@
"item_count": item_count,
"var_access_code": targets.GetCode("var_access.c"),
"PLC_ticktime": self.GetCTR().GetTicktime(),
- "hmi_hash_ints": ",".join(map(str,hmi_tree_root.hash()))
+ "hmi_hash_ints": ",".join(map(str,hmi_tree_root.hash())),
+ "max_connections": maxConnectionsTotal
}
gen_svghmi_c_path = os.path.join(buildpath, "svghmi.c")
@@ -287,7 +295,8 @@
<xsd:attribute name="WatchdogInterval" type="xsd:integer" use="optional" default="5"/>
<xsd:attribute name="Port" type="xsd:integer" use="optional" default="8008"/>
<xsd:attribute name="Interface" type="xsd:string" use="optional" default="localhost"/>
- <xsd:attribute name="Path" type="xsd:string" use="optional" default=""/>
+ <xsd:attribute name="Path" type="xsd:string" use="optional" default="{name}"/>
+ <xsd:attribute name="MaxConnections" type="xsd:integer" use="optional" default="16"/>
</xsd:complexType>
</xsd:element>
</xsd:schema>
@@ -562,19 +571,22 @@
def svghmi_{location}_watchdog_trigger():
{svghmi_cmds[Watchdog]}
-svghmi_watchdog = None
+max_svghmi_sessions = {maxConnections_total}
def _runtime_{location}_svghmi_start():
global svghmi_watchdog, svghmi_servers
srv = svghmi_servers.get("{interface}:{port}", None)
if srv is not None:
- svghmi_root, svghmi_listener, path_list = srv
+ svghmi_root, svghmi_listener, path_list = srv
if '{path}' in path_list:
raise Exception("SVGHMI {view_name}: path {path} already used on {interface}:{port}")
else:
svghmi_root = Resource()
- svghmi_root.putChild("ws", WebSocketResource(HMIWebSocketServerFactory()))
+ factory = HMIWebSocketServerFactory()
+ factory.setProtocolOptions(maxConnections={maxConnections})
+
+ svghmi_root.putChild("ws", WebSocketResource(factory))
svghmi_listener = reactor.listenTCP({port}, Site(svghmi_root), interface='{interface}')
path_list = []
@@ -592,8 +604,8 @@
if {enable_watchdog}:
if svghmi_watchdog is None:
svghmi_watchdog = Watchdog(
- {watchdog_initial},
- {watchdog_interval},
+ {watchdog_initial},
+ {watchdog_interval},
svghmi_{location}_watchdog_trigger)
else:
raise Exception("SVGHMI {view_name}: only one watchdog allowed")
@@ -628,6 +640,8 @@
enable_watchdog = enable_watchdog,
watchdog_initial = self.GetParamsAttributes("SVGHMI.WatchdogInitial")["value"],
watchdog_interval = self.GetParamsAttributes("SVGHMI.WatchdogInterval")["value"],
+ maxConnections = self.GetParamsAttributes("SVGHMI.MaxConnections")["value"],
+ maxConnections_total = maxConnectionsTotal
))
runtimefile.close()
--- a/svghmi/svghmi_server.py Mon Jul 05 10:51:02 2021 +0200
+++ b/svghmi/svghmi_server.py Wed Jul 07 16:31:13 2021 +0200
@@ -23,55 +23,124 @@
from autobahn.websocket.protocol import WebSocketProtocol
from autobahn.twisted.resource import WebSocketResource
-# TODO multiclient :
-# session list lock
-# svghmi_sessions = []
-# svghmi_watchdogs = []
-
-svghmi_session = None
+max_svghmi_sessions = None
svghmi_watchdog = None
svghmi_send_collect = PLCBinary.svghmi_send_collect
svghmi_send_collect.restype = ctypes.c_int # error or 0
svghmi_send_collect.argtypes = [
+ ctypes.c_uint32, # index
ctypes.POINTER(ctypes.c_uint32), # size
ctypes.POINTER(ctypes.c_void_p)] # data ptr
-# TODO multiclient : switch to arrays
svghmi_recv_dispatch = PLCBinary.svghmi_recv_dispatch
svghmi_recv_dispatch.restype = ctypes.c_int # error or 0
svghmi_recv_dispatch.argtypes = [
+ ctypes.c_uint32, # index
ctypes.c_uint32, # size
ctypes.c_char_p] # data ptr
-# TODO multiclient : switch to arrays
+
+class HMISessionMgr(object):
+ def __init__(self):
+ self.multiclient_sessions = set()
+ self.watchdog_session = None
+ self.session_count = 0
+ self.lock = RLock()
+ self.indexes = set()
+
+ def next_index(self):
+ if self.indexes:
+ greatest = max(self.indexes)
+ holes = set(range(greatest)) - self.indexes
+ index = min(holes) if holes else greatest+1
+ else:
+ index = 0
+ self.indexes.add(index)
+ return index
+
+ def free_index(self, index):
+ self.indexes.remove(index)
+
+ def register(self, session):
+ global max_svghmi_sessions
+ with self.lock:
+ if session.is_watchdog_session:
+ # Creating a new watchdog session closes pre-existing one
+ if self.watchdog_session is not None:
+ self.watchdog_session.close()
+ self.unregister(self.watchdog_session)
+ self.free_index(self.watchdog_session.session_index)
+ else:
+ assert(self.session_count < max_svghmi_sessions)
+ self.session_count += 1
+
+ self.watchdog_session = session
+ else:
+ assert(self.session_count < max_svghmi_sessions)
+ self.multiclient_sessions.add(session)
+ self.session_count += 1
+ session.session_index = self.next_index()
+
+ def unregister(self, session):
+ with self.lock:
+ if session.is_watchdog_session:
+ assert(self.watchdog_session == session)
+ self.watchdog_session = None
+ else:
+ self.multiclient_sessions.remove(self)
+ self.free_index(self.watchdog_session.get_index())
+ self.session_count -= 1
+
+ def close_all(self):
+ with self.lock:
+ close_list = list(self.multiclient_sessions)
+ if self.watchdog_session:
+ close_list.append(self.watchdog_session)
+ for session in close_list:
+ session.close()
+ self.unregister(session)
+
+ def iter_sessions(self):
+ with self.lock:
+ nxt_session = self.watchdog_session
+ if nxt_session is not None:
+ yield nxt_session
+ idx = 0
+ while True:
+ with self.lock:
+ if idx >= len(self.multiclient_sessions):
+ return
+ nxt_session = self.multiclient_sessions[idx]
+ yield nxt_session
+ idx += 1
+
+
+svghmi_session_manager = HMISessionMgr()
+
class HMISession(object):
def __init__(self, protocol_instance):
- global svghmi_session
-
- # Single client :
- # Creating a new HMISession closes pre-existing HMISession
- if svghmi_session is not None:
- svghmi_session.close()
- svghmi_session = self
self.protocol_instance = protocol_instance
-
- # TODO multiclient :
- # svghmi_sessions.append(self)
- # get a unique bit index amont other svghmi_sessions,
- # so that we can match flags passed by C->python callback
+ self._session_index = None
+
+ @property
+ def is_watchdog_session(self):
+ return self.protocol_instance.has_watchdog
+
+ @property
+ def session_index(self):
+ return self._session_index
+
+ @session_index.setter
+ def session_index(self, value):
+ self._session_index = value
def close(self):
- global svghmi_session
- if svghmi_session == self:
- svghmi_session = None
self.protocol_instance.sendClose(WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL)
def onMessage(self, msg):
# pass message to the C side recieve_message()
- return svghmi_recv_dispatch(len(msg), msg)
-
- # TODO multiclient : pass client index as well
+ return svghmi_recv_dispatch(self.session_index, len(msg), msg)
def sendMessage(self, msg):
self.protocol_instance.sendMessage(msg, True)
@@ -111,13 +180,15 @@
def trigger(self):
self._callback()
- # wait for initial timeout on re-start
- self.feed(rearm=False)
+ # Don't repeat trigger periodically
+ # # wait for initial timeout on re-start
+ # self.feed(rearm=False)
class HMIProtocol(WebSocketServerProtocol):
def __init__(self, *args, **kwargs):
self._hmi_session = None
+ self.has_watchdog = False
WebSocketServerProtocol.__init__(self, *args, **kwargs)
def onConnect(self, request):
@@ -125,17 +196,22 @@
return WebSocketServerProtocol.onConnect(self, request)
def onOpen(self):
+ global svghmi_session_manager
assert(self._hmi_session is None)
self._hmi_session = HMISession(self)
+ svghmi_session_manager.register(self._hmi_session)
def onClose(self, wasClean, code, reason):
+ global svghmi_session_manager
+ svghmi_session_manager.unregister(self._hmi_session)
self._hmi_session = None
def onMessage(self, msg, isBinary):
+ global svghmi_watchdog
assert(self._hmi_session is not None)
result = self._hmi_session.onMessage(msg)
- if result == 1 : # was heartbeat
+ if result == 1 and self.has_watchdog: # was heartbeat
if svghmi_watchdog is not None:
svghmi_watchdog.feed()
@@ -146,23 +222,28 @@
svghmi_send_thread = None
def SendThreadProc():
- global svghmi_session
+ global svghmi_session_manager
size = ctypes.c_uint32()
ptr = ctypes.c_void_p()
res = 0
- while True:
- res=svghmi_send_collect(ctypes.byref(size), ctypes.byref(ptr))
- if res == 0:
- # TODO multiclient : dispatch to sessions
- if svghmi_session is not None:
- svghmi_session.sendMessage(ctypes.string_at(ptr.value,size.value))
- elif res == errno.ENODATA:
- # this happens when there is no data after wakeup
- # because of hmi data refresh period longer than PLC common ticktime
- pass
- else:
- # this happens when finishing
- break
+ finished = False
+ while not(finished):
+ for svghmi_session in svghmi_session_manager.iter_sessions():
+ res = svghmi_send_collect(
+ svghmi_session.session_index,
+ ctypes.byref(size), ctypes.byref(ptr))
+ if res == 0:
+ svghmi_session.sendMessage(
+ ctypes.string_at(ptr.value,size.value))
+ elif res == errno.ENODATA:
+ # this happens when there is no data after wakeup
+ # because of hmi data refresh period longer than
+ # PLC common ticktime
+ pass
+ else:
+ # this happens when finishing
+ finished = True
+ break
def AddPathToSVGHMIServers(path, factory):
for k,v in svghmi_servers.iteritems():
@@ -182,8 +263,8 @@
def _runtime_00_svghmi_stop():
global svghmi_send_thread, svghmi_session
- if svghmi_session is not None:
- svghmi_session.close()
+ svghmi_session_manager.close_all()
+
# plc cleanup calls svghmi_(locstring)_cleanup and unlocks send thread
svghmi_send_thread.join()
svghmi_send_thread = None