Rework of runtime non-real-time threading, and shared object dynamic loading :
authorEdouard Tisserant
Thu, 19 Apr 2018 12:22:40 +0200
changeset 1994 1fdc32be71b8
parent 1993 cbf0a9ffc782
child 1995 691d119ba20f
Rework of runtime non-real-time threading, and shared object dynamic loading :
- All exposed operations on PLCObject are now serialized through main thread
(@RunInMain). Only one exception : python trace thread calling _GetDebugData
- Re-loading of same shared object is prevented. dlclose/delte/rewrite/dlopen
don't happen in that case. This is a workaround for some xenomai copperplate
bug triggering segfault, inbetween delete and rewrite.
- Trace thread now waits for data, dies if no data, and dies instead of
suspending as before when no data consumed after 3 seconds.
- Disabled threading in Pyro client
- Enforced auxiliary threads starting sequence : Pyro -> [UI] ->
PLCWorker.RunLoop
Beremiz_service.py
runtime/PLCObject.py
--- a/Beremiz_service.py	Tue Apr 17 11:19:18 2018 +0200
+++ b/Beremiz_service.py	Thu Apr 19 12:22:40 2018 +0200
@@ -30,9 +30,10 @@
 import sys
 import getopt
 import threading
-from threading import Thread, currentThread, Semaphore
+from threading import Thread, currentThread, Semaphore, Lock
 import traceback
 import __builtin__
+import Pyro
 import Pyro.core as pyro
 
 from runtime import PLCObject, ServicePublisher, MainWorker
@@ -399,6 +400,7 @@
     return res
 
 
+
 class Server(object):
     def __init__(self, servicename, ip_addr, port,
                  workdir, argv,
@@ -436,8 +438,9 @@
         sys.stdout.flush()
 
 
-    def PyroLoop(self):
+    def PyroLoop(self, when_ready):
         while self.continueloop:
+            Pyro.config.PYRO_MULTITHREADED = 0
             pyro.initServer()
             self.daemon = pyro.Daemon(host=self.ip_addr, port=self.port)
 
@@ -452,6 +455,7 @@
                 self.servicepublisher = ServicePublisher.ServicePublisher()
                 self.servicepublisher.RegisterService(self.servicename, self.ip_addr, self.port)
 
+            when_ready()
             self.daemon.requestLoop()
             self.daemon.sock.close()
 
@@ -631,12 +635,20 @@
         except Exception:
             LogMessageAndException(_("WAMP client startup failed. "))
 
-pyro_thread = Thread(target=pyroserver.PyroLoop)
+pyro_thread_started = Lock()
+pyro_thread_started.acquire()
+pyro_thread = Thread(target=pyroserver.PyroLoop,
+                     kwargs=dict(when_ready=pyro_thread_started.release))
 pyro_thread.start()
 
+# Wait for pyro thread to be effective
+pyro_thread_started.acquire()
+
 pyroserver.PrintServerInfo()
 
 if havetwisted or havewx:
+    ui_thread_started = Lock()
+    ui_thread_started.acquire()
     if havetwisted:
         # reactor._installSignalHandlersAgain()
         def ui_thread_target():
@@ -649,6 +661,16 @@
     ui_thread = Thread(target = ui_thread_target)
     ui_thread.start()
 
+    # This order ui loop to unblock main thread when ready.
+    if havetwisted:
+        reactor.callLater(0,ui_thread_started.release)
+    else :
+        wx.CallAfter(ui_thread_started.release)
+
+    # Wait for ui thread to be effective
+    ui_thread_started.acquire()
+    print("UI thread started successfully.")
+
 try:
     MainWorker.runloop(pyroserver.AutoLoad)
 except KeyboardInterrupt:
--- a/runtime/PLCObject.py	Tue Apr 17 11:19:18 2018 +0200
+++ b/runtime/PLCObject.py	Thu Apr 19 12:22:40 2018 +0200
@@ -95,6 +95,7 @@
         self.mutex = Lock()
         self.todo = Condition(self.mutex)
         self.done = Condition(self.mutex)
+        self.free = Condition(self.mutex)
         self.job = None
 
     def runloop(self,*args,**kwargs):
@@ -110,7 +111,9 @@
             self.todo.wait()
             if self.job is not None:
                 self.job.do()
-            self.done.notify_all()
+                self.done.notify()
+            else:
+                self.free.notify()
         self.mutex.release()
     
     def call(self, *args, **kwargs):
@@ -123,7 +126,7 @@
 
         _job = job(*args,**kwargs)
 
-        if self._threadID == thread.get_ident():
+        if self._threadID == thread.get_ident() or self._threadID is None:
             # if caller is worker thread execute immediately
             _job.do()
         else:
@@ -131,7 +134,7 @@
             self.mutex.acquire()
 
             while self.job is not None:
-                self.done.wait()
+                self.free.wait()
 
             self.job = _job
             self.todo.notify()
@@ -168,7 +171,6 @@
 
 class PLCObject(pyro.ObjBase):
     def __init__(self, server):
-
         pyro.ObjBase.__init__(self)
         self.evaluator = server.evaluator
         self.argv = [server.workdir] + server.argv  # force argv[0] to be "path" to exec...
@@ -187,11 +189,11 @@
         self.python_runtime_vars = None
         self.TraceThread = None
         self.TraceLock = Lock()
-        self.TraceWakeup = Event()
         self.Traces = []
 
+    # First task of worker -> no @RunInMain
     def AutoLoad(self):
-        # Get the last transfered PLC if connector must be restart
+        # Get the last transfered PLC 
         try:
             self.CurrentPLCFilename = open(
                 self._GetMD5FileName(),
@@ -207,6 +209,7 @@
             for callee in self.statuschange:
                 callee(self.PLCStatus)
 
+    @RunInMain
     def LogMessage(self, *args):
         if len(args) == 2:
             level, msg = args
@@ -218,16 +221,19 @@
             return self._LogMessage(level, msg, len(msg))
         return None
 
+    @RunInMain
     def ResetLogCount(self):
         if self._ResetLogCount is not None:
             self._ResetLogCount()
 
+    # used internaly 
     def GetLogCount(self, level):
         if self._GetLogCount is not None:
             return int(self._GetLogCount(level))
         elif self._loading_error is not None and level == 0:
             return 1
 
+    @RunInMain
     def GetLogMessage(self, level, msgid):
         tick = ctypes.c_uint32()
         tv_sec = ctypes.c_uint32()
@@ -252,13 +258,13 @@
     def _GetLibFileName(self):
         return os.path.join(self.workingdir, self.CurrentPLCFilename)
 
-    @RunInMain
-    def LoadPLC(self):
+    def _LoadPLC(self):
         """
         Load PLC library
         Declare all functions, arguments and return values
         """
         md5 = open(self._GetMD5FileName(), "r").read()
+        self.PLClibraryLock.acquire()
         try:
             self._PLClibraryHandle = dlopen(self._GetLibFileName())
             self.PLClibraryHandle = ctypes.CDLL(self.CurrentPLCFilename, handle=self._PLClibraryHandle)
@@ -335,14 +341,24 @@
 
             self._loading_error = None
 
-            self.PythonRuntimeInit()
-
-            return True
         except Exception:
             self._loading_error = traceback.format_exc()
             PLCprint(self._loading_error)
+            return False
+        finally:
+            self.PLClibraryLock.release()
+
+        return True
+
+    @RunInMain
+    def LoadPLC(self):
+        res = self._LoadPLC()
+        if res:
+            self.PythonRuntimeInit()
+        else:
             self._FreePLC()
-            return False
+
+        return res
 
     @RunInMain
     def UnLoadPLC(self):
@@ -375,15 +391,17 @@
         This is also called by __init__ to create dummy C func proxies
         """
         self.PLClibraryLock.acquire()
-
-        # Unload library explicitely
-        if getattr(self, "_PLClibraryHandle", None) is not None:
-            dlclose(self._PLClibraryHandle)
-
-        # Forget all refs to library
-        self._InitPLCStubCalls()
-
-        self.PLClibraryLock.release()
+        try:
+            # Unload library explicitely
+            if getattr(self, "_PLClibraryHandle", None) is not None:
+                dlclose(self._PLClibraryHandle)
+
+            # Forget all refs to library
+            self._InitPLCStubCalls()
+
+        finally:
+            self.PLClibraryLock.release()
+
         return False
 
     def PythonRuntimeCall(self, methodname):
@@ -396,6 +414,7 @@
             if exp is not None:
                 self.LogMessage(0, '\n'.join(traceback.format_exception(*exp)))
 
+    # used internaly 
     def PythonRuntimeInit(self):
         MethodNames = ["init", "start", "stop", "cleanup"]
         self.python_runtime_vars = globals().copy()
@@ -447,6 +466,7 @@
 
         self.PythonRuntimeCall("init")
 
+    # used internaly 
     def PythonRuntimeCleanup(self):
         if self.python_runtime_vars is not None:
             self.PythonRuntimeCall("cleanup")
@@ -509,12 +529,12 @@
             self.StatusChange()
             self.PythonRuntimeCall("stop")
             if self.TraceThread is not None:
-                self.TraceWakeup.set()
                 self.TraceThread.join()
                 self.TraceThread = None
             return True
         return False
 
+    @RunInMain
     def GetPLCstatus(self):
         return self.PLCStatus, map(self.GetLogCount, xrange(LogLevelsCount))
 
@@ -524,14 +544,25 @@
             NewFileName = md5sum + lib_ext
             extra_files_log = os.path.join(self.workingdir, "extra_files.txt")
 
-            self.UnLoadPLC()
+            old_PLC_filename = os.path.join(self.workingdir, \
+                                            self.CurrentPLCFilename) \
+                               if self.CurrentPLCFilename is not None \
+                               else None
+            new_PLC_filename = os.path.join(self.workingdir, NewFileName)
+
+            # Some platform (i.e. Xenomai) don't like reloading same .so file
+            replace_PLC_shared_object = new_PLC_filename != old_PLC_filename
+
+            if replace_PLC_shared_object:
+                self.UnLoadPLC()
 
             self.LogMessage("NewPLC (%s)" % md5sum)
             self.PLCStatus = "Empty"
 
+
             try:
-                os.remove(os.path.join(self.workingdir,
-                                       self.CurrentPLCFilename))
+                if replace_PLC_shared_object:
+                    os.remove(old_PLC_filename)
                 for filename in file(extra_files_log, "r").readlines() + [extra_files_log]:
                     try:
                         os.remove(os.path.join(self.workingdir, filename.strip()))
@@ -542,8 +573,8 @@
 
             try:
                 # Create new PLC file
-                open(os.path.join(self.workingdir, NewFileName),
-                     'wb').write(data)
+                if replace_PLC_shared_object:
+                    open(new_PLC_filename, 'wb').write(data)
 
                 # Store new PLC filename based on md5 key
                 open(self._GetMD5FileName(), "w").write(md5sum)
@@ -563,7 +594,9 @@
                 PLCprint(traceback.format_exc())
                 return False
 
-            if self.LoadPLC():
+            if not replace_PLC_shared_object:
+                self.PLCStatus = "Stopped"
+            elif self.LoadPLC():
                 self.PLCStatus = "Stopped"
             else:
                 self.PLCStatus = "Broken"
@@ -602,13 +635,6 @@
         else:
             self._suspendDebug(True)
 
-    def _TracesPush(self, trace):
-        self.TraceLock.acquire()
-        lT = len(self.Traces)
-        if lT != 0 and lT * len(self.Traces[0]) > 1024 * 1024:
-            self.Traces.pop(0)
-        self.Traces.append(trace)
-        self.TraceLock.release()
 
     def _TracesSwap(self):
         self.LastSwapTrace = time()
@@ -619,26 +645,9 @@
         Traces = self.Traces
         self.Traces = []
         self.TraceLock.release()
-        self.TraceWakeup.set()
         return Traces
 
-    def _TracesAutoSuspend(self):
-        # TraceProc stops here if Traces not polled for 3 seconds
-        traces_age = time() - self.LastSwapTrace
-        if traces_age > 3:
-            self.TraceLock.acquire()
-            self.Traces = []
-            self.TraceLock.release()
-            self._suspendDebug(True)  # Disable debugger
-            self.TraceWakeup.clear()
-            self.TraceWakeup.wait()
-            self._resumeDebug()  # Re-enable debugger
-
-    def _TracesFlush(self):
-        self.TraceLock.acquire()
-        self.Traces = []
-        self.TraceLock.release()
-
+    @RunInMain
     def GetTraceVariables(self):
         return self.PLCStatus, self._TracesSwap()
 
@@ -646,28 +655,47 @@
         """
         Return a list of traces, corresponding to the list of required idx
         """
+        self._resumeDebug()  # Re-enable debugger
         while self.PLCStatus == "Started":
             tick = ctypes.c_uint32()
             size = ctypes.c_uint32()
             buff = ctypes.c_void_p()
             TraceBuffer = None
-            if self.PLClibraryLock.acquire(False):
-                res = self._GetDebugData(ctypes.byref(tick),
-                                         ctypes.byref(size),
-                                         ctypes.byref(buff)) 
-                if res == 0:
-                    if size.value:
-                        TraceBuffer = ctypes.string_at(buff.value, size.value)
-                    self._FreeDebugData()
-                self.PLClibraryLock.release()
+
+            self.PLClibraryLock.acquire()
+
+            res = self._GetDebugData(ctypes.byref(tick),
+                                     ctypes.byref(size),
+                                     ctypes.byref(buff)) 
+            if res == 0:
+                if size.value:
+                    TraceBuffer = ctypes.string_at(buff.value, size.value)
+                self._FreeDebugData()
+
+            self.PLClibraryLock.release()
                 
-                if res != 0:
-                    break
+            # leave thread if GetDebugData isn't happy.
+            if res != 0:
+                break
 
             if TraceBuffer is not None:
-                self._TracesPush((tick.value, TraceBuffer))
-            self._TracesAutoSuspend()
-        self._TracesFlush()
+                self.TraceLock.acquire()
+                lT = len(self.Traces)
+                if lT != 0 and lT * len(self.Traces[0]) > 1024 * 1024:
+                    self.Traces.pop(0)
+                self.Traces.append((tick.value, TraceBuffer))
+                self.TraceLock.release()
+
+            # TraceProc stops here if Traces not polled for 3 seconds
+            traces_age = time() - self.LastSwapTrace
+            if traces_age > 3:
+                self.TraceLock.acquire()
+                self.Traces = []
+                self.TraceLock.release()
+                self._suspendDebug(True)  # Disable debugger
+                break
+
+        self.TraceThread = None
 
     def RemoteExec(self, script, *kwargs):
         try: