# HG changeset patch # User Edouard Tisserant # Date 1625668273 -7200 # Node ID 38f7122ccbf98dea804f1284698b1df567c0fb6c # Parent 5d174cdf4d988af641f1254e92ff98a4b9f2b480 SVGHMI: Implemented multiserver+multiclient, but only tested with single client and single server for now. To be continued... diff -r 5d174cdf4d98 -r 38f7122ccbf9 svghmi/svghmi.c --- a/svghmi/svghmi.c Mon Jul 05 10:51:02 2021 +0200 +++ b/svghmi/svghmi.c Wed Jul 07 16:31:13 2021 +0200 @@ -9,6 +9,8 @@ #define HMI_BUFFER_SIZE %(buffer_size)d #define HMI_ITEM_COUNT %(item_count)d #define HMI_HASH_SIZE 8 +#define MAX_CONNECTIONS %(max_connections)d + static uint8_t hmi_hash[HMI_HASH_SIZE] = {%(hmi_hash_ints)s}; /* PLC reads from that buffer */ @@ -45,11 +47,11 @@ /* publish/write/send */ long wlock; - buf_state_t wstate; + buf_state_t wstate[MAX_CONNECTIONS]; /* zero means not subscribed */ - uint16_t refresh_period_ms; - uint16_t age_ms; + uint16_t refresh_period_ms[MAX_CONNECTIONS]; + uint16_t age_ms[MAX_CONNECTIONS]; /* retrieve/read/recv */ long rlock; @@ -81,15 +83,16 @@ static int write_iterator(uint32_t index, hmi_tree_item_t *dsc) { - if(AtomicCompareExchange(&dsc->wlock, 0, 1) == 0) - { - if(dsc->wstate == buf_set){ + uint32_t session_index = 0; + if(AtomicCompareExchange(&dsc->wlock, 0, 1) == 0) while(session_index < MAX_CONNECTIONS) + { + if(dsc->wstate[session_index] == buf_set){ /* if being subscribed */ - if(dsc->refresh_period_ms){ - if(dsc->age_ms + ticktime_ms < dsc->refresh_period_ms){ - dsc->age_ms += ticktime_ms; + if(dsc->refresh_period_ms[session_index]){ + if(dsc->age_ms[session_index] + ticktime_ms < dsc->refresh_period_ms[session_index]){ + dsc->age_ms[session_index] += ticktime_ms; }else{ - dsc->wstate = buf_tosend; + dsc->wstate[session_index] = buf_tosend; global_write_dirty = 1; } } @@ -105,34 +108,36 @@ if(__Is_a_string(dsc)){ sz = ((STRING*)visible_value_p)->len + 1; } - if(dsc->wstate == buf_new /* just subscribed + if(dsc->wstate[session_index] == buf_new /* just subscribed or already subscribed having value change */ - || (dsc->refresh_period_ms > 0 && memcmp(dest_p, visible_value_p, sz) != 0)){ + || (dsc->refresh_period_ms[session_index] > 0 && memcmp(dest_p, visible_value_p, sz) != 0)){ /* copy and flag as set */ memcpy(dest_p, visible_value_p, sz); /* if not already marked/signaled, do it */ - if(dsc->wstate != buf_set && dsc->wstate != buf_tosend) { - if(dsc->wstate == buf_new || ticktime_ms > dsc->refresh_period_ms){ - dsc->wstate = buf_tosend; + if(dsc->wstate[session_index] != buf_set && dsc->wstate[session_index] != buf_tosend) { + if(dsc->wstate[session_index] == buf_new || ticktime_ms > dsc->refresh_period_ms[session_index]){ + dsc->wstate[session_index] = buf_tosend; global_write_dirty = 1; } else { - dsc->wstate = buf_set; - } - dsc->age_ms = 0; + dsc->wstate[session_index] = buf_set; + } + dsc->age_ms[session_index] = 0; } } AtomicCompareExchange(&dsc->wlock, 1, 0); + session_index++; } // else ... : PLC can't wait, variable will be updated next turn return 0; } +static uint32_t send_session_index; static int send_iterator(uint32_t index, hmi_tree_item_t *dsc) { while(AtomicCompareExchange(&dsc->wlock, 0, 1)) sched_yield(); - if(dsc->wstate == buf_tosend) + if(dsc->wstate[send_session_index] == buf_tosend) { uint32_t sz = __get_type_enum_size(dsc->type); if(sbufidx + sizeof(uint32_t) + sz <= sizeof(sbuf)) @@ -145,7 +150,7 @@ /* TODO : force into little endian */ memcpy(dst_p, &index, sizeof(uint32_t)); memcpy(dst_p + sizeof(uint32_t), src_p, sz); - dsc->wstate = buf_free; + dsc->wstate[send_session_index] = buf_free; sbufidx += sizeof(uint32_t) /* index */ + sz; } else @@ -179,24 +184,25 @@ return 0; } -void update_refresh_period(hmi_tree_item_t *dsc, uint16_t refresh_period_ms) +void update_refresh_period(hmi_tree_item_t *dsc, uint32_t session_index, uint16_t refresh_period_ms) { while(AtomicCompareExchange(&dsc->wlock, 0, 1)) sched_yield(); if(refresh_period_ms) { - if(!dsc->refresh_period_ms) - { - dsc->wstate = buf_new; + if(!dsc->refresh_period_ms[session_index]) + { + dsc->wstate[session_index] = buf_new; } } else { - dsc->wstate = buf_free; - } - dsc->refresh_period_ms = refresh_period_ms; + dsc->wstate[session_index] = buf_free; + } + dsc->refresh_period_ms[session_index] = refresh_period_ms; AtomicCompareExchange(&dsc->wlock, 1, 0); } +static uint32_t reset_session_index; static int reset_iterator(uint32_t index, hmi_tree_item_t *dsc) { - update_refresh_period(dsc, 0); + update_refresh_period(dsc, reset_session_index, 0); return 0; } @@ -236,13 +242,14 @@ } /* PYTHON CALLS */ -int svghmi_send_collect(uint32_t *size, char **ptr){ +int svghmi_send_collect(uint32_t session_index, uint32_t *size, char **ptr){ SVGHMI_SuspendFromPythonThread(); if(continue_collect) { int res; sbufidx = HMI_HASH_SIZE; + send_session_index = session_index; if((res = traverse_hmi_tree(send_iterator)) == 0) { if(sbufidx > HMI_HASH_SIZE){ @@ -270,7 +277,7 @@ // Returns : // 0 is OK, <0 is error, 1 is heartbeat -int svghmi_recv_dispatch(uint32_t size, const uint8_t *ptr){ +int svghmi_recv_dispatch(uint32_t session_index, uint32_t size, const uint8_t *ptr){ const uint8_t* cursor = ptr + HMI_HASH_SIZE; const uint8_t* end = ptr + size; @@ -336,6 +343,7 @@ case reset: { progress = 0; + reset_session_index = session_index; traverse_hmi_tree(reset_iterator); } break; @@ -348,7 +356,7 @@ if(index < HMI_ITEM_COUNT) { hmi_tree_item_t *dsc = &hmi_tree_item[index]; - update_refresh_period(dsc, refresh_period_ms); + update_refresh_period(dsc, session_index, refresh_period_ms); } else { diff -r 5d174cdf4d98 -r 38f7122ccbf9 svghmi/svghmi.py --- a/svghmi/svghmi.py Mon Jul 05 10:51:02 2021 +0200 +++ b/svghmi/svghmi.py Wed Jul 07 16:31:13 2021 +0200 @@ -47,12 +47,14 @@ on_hmitree_update = None +maxConnectionsTotal = 0 + class SVGHMILibrary(POULibrary): def GetLibraryPath(self): return paths.AbsNeighbourFile(__file__, "pous.xml") def Generate_C(self, buildpath, varlist, IECCFLAGS): - global hmi_tree_root, on_hmitree_update + global hmi_tree_root, on_hmitree_update, maxConnectionsTotal """ PLC Instance Tree: @@ -196,6 +198,11 @@ # "programs_declarations": "\n".join(["extern %(type)s %(C_path)s;" % # p for p in self._ProgramList]), + for CTNChild in self.GetCTR().IterChildren(): + if isinstance(CTNChild, SVGHMI): + maxConnectionsTotal += CTNChild.GetParamsAttributes("SVGHMI.MaxConnections")["value"] + + # C code to observe/access HMI tree variables svghmi_c_filepath = paths.AbsNeighbourFile(__file__, "svghmi.c") svghmi_c_file = open(svghmi_c_filepath, 'r') @@ -208,7 +215,8 @@ "item_count": item_count, "var_access_code": targets.GetCode("var_access.c"), "PLC_ticktime": self.GetCTR().GetTicktime(), - "hmi_hash_ints": ",".join(map(str,hmi_tree_root.hash())) + "hmi_hash_ints": ",".join(map(str,hmi_tree_root.hash())), + "max_connections": maxConnectionsTotal } gen_svghmi_c_path = os.path.join(buildpath, "svghmi.c") @@ -287,7 +295,8 @@ - + + @@ -562,19 +571,22 @@ def svghmi_{location}_watchdog_trigger(): {svghmi_cmds[Watchdog]} -svghmi_watchdog = None +max_svghmi_sessions = {maxConnections_total} def _runtime_{location}_svghmi_start(): global svghmi_watchdog, svghmi_servers srv = svghmi_servers.get("{interface}:{port}", None) if srv is not None: - svghmi_root, svghmi_listener, path_list = srv + svghmi_root, svghmi_listener, path_list = srv if '{path}' in path_list: raise Exception("SVGHMI {view_name}: path {path} already used on {interface}:{port}") else: svghmi_root = Resource() - svghmi_root.putChild("ws", WebSocketResource(HMIWebSocketServerFactory())) + factory = HMIWebSocketServerFactory() + factory.setProtocolOptions(maxConnections={maxConnections}) + + svghmi_root.putChild("ws", WebSocketResource(factory)) svghmi_listener = reactor.listenTCP({port}, Site(svghmi_root), interface='{interface}') path_list = [] @@ -592,8 +604,8 @@ if {enable_watchdog}: if svghmi_watchdog is None: svghmi_watchdog = Watchdog( - {watchdog_initial}, - {watchdog_interval}, + {watchdog_initial}, + {watchdog_interval}, svghmi_{location}_watchdog_trigger) else: raise Exception("SVGHMI {view_name}: only one watchdog allowed") @@ -628,6 +640,8 @@ enable_watchdog = enable_watchdog, watchdog_initial = self.GetParamsAttributes("SVGHMI.WatchdogInitial")["value"], watchdog_interval = self.GetParamsAttributes("SVGHMI.WatchdogInterval")["value"], + maxConnections = self.GetParamsAttributes("SVGHMI.MaxConnections")["value"], + maxConnections_total = maxConnectionsTotal )) runtimefile.close() diff -r 5d174cdf4d98 -r 38f7122ccbf9 svghmi/svghmi_server.py --- a/svghmi/svghmi_server.py Mon Jul 05 10:51:02 2021 +0200 +++ b/svghmi/svghmi_server.py Wed Jul 07 16:31:13 2021 +0200 @@ -23,55 +23,124 @@ from autobahn.websocket.protocol import WebSocketProtocol from autobahn.twisted.resource import WebSocketResource -# TODO multiclient : -# session list lock -# svghmi_sessions = [] -# svghmi_watchdogs = [] - -svghmi_session = None +max_svghmi_sessions = None svghmi_watchdog = None svghmi_send_collect = PLCBinary.svghmi_send_collect svghmi_send_collect.restype = ctypes.c_int # error or 0 svghmi_send_collect.argtypes = [ + ctypes.c_uint32, # index ctypes.POINTER(ctypes.c_uint32), # size ctypes.POINTER(ctypes.c_void_p)] # data ptr -# TODO multiclient : switch to arrays svghmi_recv_dispatch = PLCBinary.svghmi_recv_dispatch svghmi_recv_dispatch.restype = ctypes.c_int # error or 0 svghmi_recv_dispatch.argtypes = [ + ctypes.c_uint32, # index ctypes.c_uint32, # size ctypes.c_char_p] # data ptr -# TODO multiclient : switch to arrays + +class HMISessionMgr(object): + def __init__(self): + self.multiclient_sessions = set() + self.watchdog_session = None + self.session_count = 0 + self.lock = RLock() + self.indexes = set() + + def next_index(self): + if self.indexes: + greatest = max(self.indexes) + holes = set(range(greatest)) - self.indexes + index = min(holes) if holes else greatest+1 + else: + index = 0 + self.indexes.add(index) + return index + + def free_index(self, index): + self.indexes.remove(index) + + def register(self, session): + global max_svghmi_sessions + with self.lock: + if session.is_watchdog_session: + # Creating a new watchdog session closes pre-existing one + if self.watchdog_session is not None: + self.watchdog_session.close() + self.unregister(self.watchdog_session) + self.free_index(self.watchdog_session.session_index) + else: + assert(self.session_count < max_svghmi_sessions) + self.session_count += 1 + + self.watchdog_session = session + else: + assert(self.session_count < max_svghmi_sessions) + self.multiclient_sessions.add(session) + self.session_count += 1 + session.session_index = self.next_index() + + def unregister(self, session): + with self.lock: + if session.is_watchdog_session: + assert(self.watchdog_session == session) + self.watchdog_session = None + else: + self.multiclient_sessions.remove(self) + self.free_index(self.watchdog_session.get_index()) + self.session_count -= 1 + + def close_all(self): + with self.lock: + close_list = list(self.multiclient_sessions) + if self.watchdog_session: + close_list.append(self.watchdog_session) + for session in close_list: + session.close() + self.unregister(session) + + def iter_sessions(self): + with self.lock: + nxt_session = self.watchdog_session + if nxt_session is not None: + yield nxt_session + idx = 0 + while True: + with self.lock: + if idx >= len(self.multiclient_sessions): + return + nxt_session = self.multiclient_sessions[idx] + yield nxt_session + idx += 1 + + +svghmi_session_manager = HMISessionMgr() + class HMISession(object): def __init__(self, protocol_instance): - global svghmi_session - - # Single client : - # Creating a new HMISession closes pre-existing HMISession - if svghmi_session is not None: - svghmi_session.close() - svghmi_session = self self.protocol_instance = protocol_instance - - # TODO multiclient : - # svghmi_sessions.append(self) - # get a unique bit index amont other svghmi_sessions, - # so that we can match flags passed by C->python callback + self._session_index = None + + @property + def is_watchdog_session(self): + return self.protocol_instance.has_watchdog + + @property + def session_index(self): + return self._session_index + + @session_index.setter + def session_index(self, value): + self._session_index = value def close(self): - global svghmi_session - if svghmi_session == self: - svghmi_session = None self.protocol_instance.sendClose(WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL) def onMessage(self, msg): # pass message to the C side recieve_message() - return svghmi_recv_dispatch(len(msg), msg) - - # TODO multiclient : pass client index as well + return svghmi_recv_dispatch(self.session_index, len(msg), msg) def sendMessage(self, msg): self.protocol_instance.sendMessage(msg, True) @@ -111,13 +180,15 @@ def trigger(self): self._callback() - # wait for initial timeout on re-start - self.feed(rearm=False) + # Don't repeat trigger periodically + # # wait for initial timeout on re-start + # self.feed(rearm=False) class HMIProtocol(WebSocketServerProtocol): def __init__(self, *args, **kwargs): self._hmi_session = None + self.has_watchdog = False WebSocketServerProtocol.__init__(self, *args, **kwargs) def onConnect(self, request): @@ -125,17 +196,22 @@ return WebSocketServerProtocol.onConnect(self, request) def onOpen(self): + global svghmi_session_manager assert(self._hmi_session is None) self._hmi_session = HMISession(self) + svghmi_session_manager.register(self._hmi_session) def onClose(self, wasClean, code, reason): + global svghmi_session_manager + svghmi_session_manager.unregister(self._hmi_session) self._hmi_session = None def onMessage(self, msg, isBinary): + global svghmi_watchdog assert(self._hmi_session is not None) result = self._hmi_session.onMessage(msg) - if result == 1 : # was heartbeat + if result == 1 and self.has_watchdog: # was heartbeat if svghmi_watchdog is not None: svghmi_watchdog.feed() @@ -146,23 +222,28 @@ svghmi_send_thread = None def SendThreadProc(): - global svghmi_session + global svghmi_session_manager size = ctypes.c_uint32() ptr = ctypes.c_void_p() res = 0 - while True: - res=svghmi_send_collect(ctypes.byref(size), ctypes.byref(ptr)) - if res == 0: - # TODO multiclient : dispatch to sessions - if svghmi_session is not None: - svghmi_session.sendMessage(ctypes.string_at(ptr.value,size.value)) - elif res == errno.ENODATA: - # this happens when there is no data after wakeup - # because of hmi data refresh period longer than PLC common ticktime - pass - else: - # this happens when finishing - break + finished = False + while not(finished): + for svghmi_session in svghmi_session_manager.iter_sessions(): + res = svghmi_send_collect( + svghmi_session.session_index, + ctypes.byref(size), ctypes.byref(ptr)) + if res == 0: + svghmi_session.sendMessage( + ctypes.string_at(ptr.value,size.value)) + elif res == errno.ENODATA: + # this happens when there is no data after wakeup + # because of hmi data refresh period longer than + # PLC common ticktime + pass + else: + # this happens when finishing + finished = True + break def AddPathToSVGHMIServers(path, factory): for k,v in svghmi_servers.iteritems(): @@ -182,8 +263,8 @@ def _runtime_00_svghmi_stop(): global svghmi_send_thread, svghmi_session - if svghmi_session is not None: - svghmi_session.close() + svghmi_session_manager.close_all() + # plc cleanup calls svghmi_(locstring)_cleanup and unlocks send thread svghmi_send_thread.join() svghmi_send_thread = None