# HG changeset patch # User Edouard Tisserant # Date 1524133360 -7200 # Node ID 1fdc32be71b80eceb8b3d87ce301c1ef35eda019 # Parent cbf0a9ffc782142efa70838b509d5efde4cbd224 Rework of runtime non-real-time threading, and shared object dynamic loading : - All exposed operations on PLCObject are now serialized through main thread (@RunInMain). Only one exception : python trace thread calling _GetDebugData - Re-loading of same shared object is prevented. dlclose/delte/rewrite/dlopen don't happen in that case. This is a workaround for some xenomai copperplate bug triggering segfault, inbetween delete and rewrite. - Trace thread now waits for data, dies if no data, and dies instead of suspending as before when no data consumed after 3 seconds. - Disabled threading in Pyro client - Enforced auxiliary threads starting sequence : Pyro -> [UI] -> PLCWorker.RunLoop diff -r cbf0a9ffc782 -r 1fdc32be71b8 Beremiz_service.py --- a/Beremiz_service.py Tue Apr 17 11:19:18 2018 +0200 +++ b/Beremiz_service.py Thu Apr 19 12:22:40 2018 +0200 @@ -30,9 +30,10 @@ import sys import getopt import threading -from threading import Thread, currentThread, Semaphore +from threading import Thread, currentThread, Semaphore, Lock import traceback import __builtin__ +import Pyro import Pyro.core as pyro from runtime import PLCObject, ServicePublisher, MainWorker @@ -399,6 +400,7 @@ return res + class Server(object): def __init__(self, servicename, ip_addr, port, workdir, argv, @@ -436,8 +438,9 @@ sys.stdout.flush() - def PyroLoop(self): + def PyroLoop(self, when_ready): while self.continueloop: + Pyro.config.PYRO_MULTITHREADED = 0 pyro.initServer() self.daemon = pyro.Daemon(host=self.ip_addr, port=self.port) @@ -452,6 +455,7 @@ self.servicepublisher = ServicePublisher.ServicePublisher() self.servicepublisher.RegisterService(self.servicename, self.ip_addr, self.port) + when_ready() self.daemon.requestLoop() self.daemon.sock.close() @@ -631,12 +635,20 @@ except Exception: LogMessageAndException(_("WAMP client startup failed. ")) -pyro_thread = Thread(target=pyroserver.PyroLoop) +pyro_thread_started = Lock() +pyro_thread_started.acquire() +pyro_thread = Thread(target=pyroserver.PyroLoop, + kwargs=dict(when_ready=pyro_thread_started.release)) pyro_thread.start() +# Wait for pyro thread to be effective +pyro_thread_started.acquire() + pyroserver.PrintServerInfo() if havetwisted or havewx: + ui_thread_started = Lock() + ui_thread_started.acquire() if havetwisted: # reactor._installSignalHandlersAgain() def ui_thread_target(): @@ -649,6 +661,16 @@ ui_thread = Thread(target = ui_thread_target) ui_thread.start() + # This order ui loop to unblock main thread when ready. + if havetwisted: + reactor.callLater(0,ui_thread_started.release) + else : + wx.CallAfter(ui_thread_started.release) + + # Wait for ui thread to be effective + ui_thread_started.acquire() + print("UI thread started successfully.") + try: MainWorker.runloop(pyroserver.AutoLoad) except KeyboardInterrupt: diff -r cbf0a9ffc782 -r 1fdc32be71b8 runtime/PLCObject.py --- a/runtime/PLCObject.py Tue Apr 17 11:19:18 2018 +0200 +++ b/runtime/PLCObject.py Thu Apr 19 12:22:40 2018 +0200 @@ -95,6 +95,7 @@ self.mutex = Lock() self.todo = Condition(self.mutex) self.done = Condition(self.mutex) + self.free = Condition(self.mutex) self.job = None def runloop(self,*args,**kwargs): @@ -110,7 +111,9 @@ self.todo.wait() if self.job is not None: self.job.do() - self.done.notify_all() + self.done.notify() + else: + self.free.notify() self.mutex.release() def call(self, *args, **kwargs): @@ -123,7 +126,7 @@ _job = job(*args,**kwargs) - if self._threadID == thread.get_ident(): + if self._threadID == thread.get_ident() or self._threadID is None: # if caller is worker thread execute immediately _job.do() else: @@ -131,7 +134,7 @@ self.mutex.acquire() while self.job is not None: - self.done.wait() + self.free.wait() self.job = _job self.todo.notify() @@ -168,7 +171,6 @@ class PLCObject(pyro.ObjBase): def __init__(self, server): - pyro.ObjBase.__init__(self) self.evaluator = server.evaluator self.argv = [server.workdir] + server.argv # force argv[0] to be "path" to exec... @@ -187,11 +189,11 @@ self.python_runtime_vars = None self.TraceThread = None self.TraceLock = Lock() - self.TraceWakeup = Event() self.Traces = [] + # First task of worker -> no @RunInMain def AutoLoad(self): - # Get the last transfered PLC if connector must be restart + # Get the last transfered PLC try: self.CurrentPLCFilename = open( self._GetMD5FileName(), @@ -207,6 +209,7 @@ for callee in self.statuschange: callee(self.PLCStatus) + @RunInMain def LogMessage(self, *args): if len(args) == 2: level, msg = args @@ -218,16 +221,19 @@ return self._LogMessage(level, msg, len(msg)) return None + @RunInMain def ResetLogCount(self): if self._ResetLogCount is not None: self._ResetLogCount() + # used internaly def GetLogCount(self, level): if self._GetLogCount is not None: return int(self._GetLogCount(level)) elif self._loading_error is not None and level == 0: return 1 + @RunInMain def GetLogMessage(self, level, msgid): tick = ctypes.c_uint32() tv_sec = ctypes.c_uint32() @@ -252,13 +258,13 @@ def _GetLibFileName(self): return os.path.join(self.workingdir, self.CurrentPLCFilename) - @RunInMain - def LoadPLC(self): + def _LoadPLC(self): """ Load PLC library Declare all functions, arguments and return values """ md5 = open(self._GetMD5FileName(), "r").read() + self.PLClibraryLock.acquire() try: self._PLClibraryHandle = dlopen(self._GetLibFileName()) self.PLClibraryHandle = ctypes.CDLL(self.CurrentPLCFilename, handle=self._PLClibraryHandle) @@ -335,14 +341,24 @@ self._loading_error = None - self.PythonRuntimeInit() - - return True except Exception: self._loading_error = traceback.format_exc() PLCprint(self._loading_error) + return False + finally: + self.PLClibraryLock.release() + + return True + + @RunInMain + def LoadPLC(self): + res = self._LoadPLC() + if res: + self.PythonRuntimeInit() + else: self._FreePLC() - return False + + return res @RunInMain def UnLoadPLC(self): @@ -375,15 +391,17 @@ This is also called by __init__ to create dummy C func proxies """ self.PLClibraryLock.acquire() - - # Unload library explicitely - if getattr(self, "_PLClibraryHandle", None) is not None: - dlclose(self._PLClibraryHandle) - - # Forget all refs to library - self._InitPLCStubCalls() - - self.PLClibraryLock.release() + try: + # Unload library explicitely + if getattr(self, "_PLClibraryHandle", None) is not None: + dlclose(self._PLClibraryHandle) + + # Forget all refs to library + self._InitPLCStubCalls() + + finally: + self.PLClibraryLock.release() + return False def PythonRuntimeCall(self, methodname): @@ -396,6 +414,7 @@ if exp is not None: self.LogMessage(0, '\n'.join(traceback.format_exception(*exp))) + # used internaly def PythonRuntimeInit(self): MethodNames = ["init", "start", "stop", "cleanup"] self.python_runtime_vars = globals().copy() @@ -447,6 +466,7 @@ self.PythonRuntimeCall("init") + # used internaly def PythonRuntimeCleanup(self): if self.python_runtime_vars is not None: self.PythonRuntimeCall("cleanup") @@ -509,12 +529,12 @@ self.StatusChange() self.PythonRuntimeCall("stop") if self.TraceThread is not None: - self.TraceWakeup.set() self.TraceThread.join() self.TraceThread = None return True return False + @RunInMain def GetPLCstatus(self): return self.PLCStatus, map(self.GetLogCount, xrange(LogLevelsCount)) @@ -524,14 +544,25 @@ NewFileName = md5sum + lib_ext extra_files_log = os.path.join(self.workingdir, "extra_files.txt") - self.UnLoadPLC() + old_PLC_filename = os.path.join(self.workingdir, \ + self.CurrentPLCFilename) \ + if self.CurrentPLCFilename is not None \ + else None + new_PLC_filename = os.path.join(self.workingdir, NewFileName) + + # Some platform (i.e. Xenomai) don't like reloading same .so file + replace_PLC_shared_object = new_PLC_filename != old_PLC_filename + + if replace_PLC_shared_object: + self.UnLoadPLC() self.LogMessage("NewPLC (%s)" % md5sum) self.PLCStatus = "Empty" + try: - os.remove(os.path.join(self.workingdir, - self.CurrentPLCFilename)) + if replace_PLC_shared_object: + os.remove(old_PLC_filename) for filename in file(extra_files_log, "r").readlines() + [extra_files_log]: try: os.remove(os.path.join(self.workingdir, filename.strip())) @@ -542,8 +573,8 @@ try: # Create new PLC file - open(os.path.join(self.workingdir, NewFileName), - 'wb').write(data) + if replace_PLC_shared_object: + open(new_PLC_filename, 'wb').write(data) # Store new PLC filename based on md5 key open(self._GetMD5FileName(), "w").write(md5sum) @@ -563,7 +594,9 @@ PLCprint(traceback.format_exc()) return False - if self.LoadPLC(): + if not replace_PLC_shared_object: + self.PLCStatus = "Stopped" + elif self.LoadPLC(): self.PLCStatus = "Stopped" else: self.PLCStatus = "Broken" @@ -602,13 +635,6 @@ else: self._suspendDebug(True) - def _TracesPush(self, trace): - self.TraceLock.acquire() - lT = len(self.Traces) - if lT != 0 and lT * len(self.Traces[0]) > 1024 * 1024: - self.Traces.pop(0) - self.Traces.append(trace) - self.TraceLock.release() def _TracesSwap(self): self.LastSwapTrace = time() @@ -619,26 +645,9 @@ Traces = self.Traces self.Traces = [] self.TraceLock.release() - self.TraceWakeup.set() return Traces - def _TracesAutoSuspend(self): - # TraceProc stops here if Traces not polled for 3 seconds - traces_age = time() - self.LastSwapTrace - if traces_age > 3: - self.TraceLock.acquire() - self.Traces = [] - self.TraceLock.release() - self._suspendDebug(True) # Disable debugger - self.TraceWakeup.clear() - self.TraceWakeup.wait() - self._resumeDebug() # Re-enable debugger - - def _TracesFlush(self): - self.TraceLock.acquire() - self.Traces = [] - self.TraceLock.release() - + @RunInMain def GetTraceVariables(self): return self.PLCStatus, self._TracesSwap() @@ -646,28 +655,47 @@ """ Return a list of traces, corresponding to the list of required idx """ + self._resumeDebug() # Re-enable debugger while self.PLCStatus == "Started": tick = ctypes.c_uint32() size = ctypes.c_uint32() buff = ctypes.c_void_p() TraceBuffer = None - if self.PLClibraryLock.acquire(False): - res = self._GetDebugData(ctypes.byref(tick), - ctypes.byref(size), - ctypes.byref(buff)) - if res == 0: - if size.value: - TraceBuffer = ctypes.string_at(buff.value, size.value) - self._FreeDebugData() - self.PLClibraryLock.release() + + self.PLClibraryLock.acquire() + + res = self._GetDebugData(ctypes.byref(tick), + ctypes.byref(size), + ctypes.byref(buff)) + if res == 0: + if size.value: + TraceBuffer = ctypes.string_at(buff.value, size.value) + self._FreeDebugData() + + self.PLClibraryLock.release() - if res != 0: - break + # leave thread if GetDebugData isn't happy. + if res != 0: + break if TraceBuffer is not None: - self._TracesPush((tick.value, TraceBuffer)) - self._TracesAutoSuspend() - self._TracesFlush() + self.TraceLock.acquire() + lT = len(self.Traces) + if lT != 0 and lT * len(self.Traces[0]) > 1024 * 1024: + self.Traces.pop(0) + self.Traces.append((tick.value, TraceBuffer)) + self.TraceLock.release() + + # TraceProc stops here if Traces not polled for 3 seconds + traces_age = time() - self.LastSwapTrace + if traces_age > 3: + self.TraceLock.acquire() + self.Traces = [] + self.TraceLock.release() + self._suspendDebug(True) # Disable debugger + break + + self.TraceThread = None def RemoteExec(self, script, *kwargs): try: