--- a/Beremiz_service.py Tue Apr 17 11:19:18 2018 +0200
+++ b/Beremiz_service.py Thu Apr 19 12:22:40 2018 +0200
@@ -30,9 +30,10 @@
import sys
import getopt
import threading
-from threading import Thread, currentThread, Semaphore
+from threading import Thread, currentThread, Semaphore, Lock
import traceback
import __builtin__
+import Pyro
import Pyro.core as pyro
from runtime import PLCObject, ServicePublisher, MainWorker
@@ -399,6 +400,7 @@
return res
+
class Server(object):
def __init__(self, servicename, ip_addr, port,
workdir, argv,
@@ -436,8 +438,9 @@
sys.stdout.flush()
- def PyroLoop(self):
+ def PyroLoop(self, when_ready):
while self.continueloop:
+ Pyro.config.PYRO_MULTITHREADED = 0
pyro.initServer()
self.daemon = pyro.Daemon(host=self.ip_addr, port=self.port)
@@ -452,6 +455,7 @@
self.servicepublisher = ServicePublisher.ServicePublisher()
self.servicepublisher.RegisterService(self.servicename, self.ip_addr, self.port)
+ when_ready()
self.daemon.requestLoop()
self.daemon.sock.close()
@@ -631,12 +635,20 @@
except Exception:
LogMessageAndException(_("WAMP client startup failed. "))
-pyro_thread = Thread(target=pyroserver.PyroLoop)
+pyro_thread_started = Lock()
+pyro_thread_started.acquire()
+pyro_thread = Thread(target=pyroserver.PyroLoop,
+ kwargs=dict(when_ready=pyro_thread_started.release))
pyro_thread.start()
+# Wait for pyro thread to be effective
+pyro_thread_started.acquire()
+
pyroserver.PrintServerInfo()
if havetwisted or havewx:
+ ui_thread_started = Lock()
+ ui_thread_started.acquire()
if havetwisted:
# reactor._installSignalHandlersAgain()
def ui_thread_target():
@@ -649,6 +661,16 @@
ui_thread = Thread(target = ui_thread_target)
ui_thread.start()
+ # This order ui loop to unblock main thread when ready.
+ if havetwisted:
+ reactor.callLater(0,ui_thread_started.release)
+ else :
+ wx.CallAfter(ui_thread_started.release)
+
+ # Wait for ui thread to be effective
+ ui_thread_started.acquire()
+ print("UI thread started successfully.")
+
try:
MainWorker.runloop(pyroserver.AutoLoad)
except KeyboardInterrupt:
--- a/runtime/PLCObject.py Tue Apr 17 11:19:18 2018 +0200
+++ b/runtime/PLCObject.py Thu Apr 19 12:22:40 2018 +0200
@@ -95,6 +95,7 @@
self.mutex = Lock()
self.todo = Condition(self.mutex)
self.done = Condition(self.mutex)
+ self.free = Condition(self.mutex)
self.job = None
def runloop(self,*args,**kwargs):
@@ -110,7 +111,9 @@
self.todo.wait()
if self.job is not None:
self.job.do()
- self.done.notify_all()
+ self.done.notify()
+ else:
+ self.free.notify()
self.mutex.release()
def call(self, *args, **kwargs):
@@ -123,7 +126,7 @@
_job = job(*args,**kwargs)
- if self._threadID == thread.get_ident():
+ if self._threadID == thread.get_ident() or self._threadID is None:
# if caller is worker thread execute immediately
_job.do()
else:
@@ -131,7 +134,7 @@
self.mutex.acquire()
while self.job is not None:
- self.done.wait()
+ self.free.wait()
self.job = _job
self.todo.notify()
@@ -168,7 +171,6 @@
class PLCObject(pyro.ObjBase):
def __init__(self, server):
-
pyro.ObjBase.__init__(self)
self.evaluator = server.evaluator
self.argv = [server.workdir] + server.argv # force argv[0] to be "path" to exec...
@@ -187,11 +189,11 @@
self.python_runtime_vars = None
self.TraceThread = None
self.TraceLock = Lock()
- self.TraceWakeup = Event()
self.Traces = []
+ # First task of worker -> no @RunInMain
def AutoLoad(self):
- # Get the last transfered PLC if connector must be restart
+ # Get the last transfered PLC
try:
self.CurrentPLCFilename = open(
self._GetMD5FileName(),
@@ -207,6 +209,7 @@
for callee in self.statuschange:
callee(self.PLCStatus)
+ @RunInMain
def LogMessage(self, *args):
if len(args) == 2:
level, msg = args
@@ -218,16 +221,19 @@
return self._LogMessage(level, msg, len(msg))
return None
+ @RunInMain
def ResetLogCount(self):
if self._ResetLogCount is not None:
self._ResetLogCount()
+ # used internaly
def GetLogCount(self, level):
if self._GetLogCount is not None:
return int(self._GetLogCount(level))
elif self._loading_error is not None and level == 0:
return 1
+ @RunInMain
def GetLogMessage(self, level, msgid):
tick = ctypes.c_uint32()
tv_sec = ctypes.c_uint32()
@@ -252,13 +258,13 @@
def _GetLibFileName(self):
return os.path.join(self.workingdir, self.CurrentPLCFilename)
- @RunInMain
- def LoadPLC(self):
+ def _LoadPLC(self):
"""
Load PLC library
Declare all functions, arguments and return values
"""
md5 = open(self._GetMD5FileName(), "r").read()
+ self.PLClibraryLock.acquire()
try:
self._PLClibraryHandle = dlopen(self._GetLibFileName())
self.PLClibraryHandle = ctypes.CDLL(self.CurrentPLCFilename, handle=self._PLClibraryHandle)
@@ -335,14 +341,24 @@
self._loading_error = None
- self.PythonRuntimeInit()
-
- return True
except Exception:
self._loading_error = traceback.format_exc()
PLCprint(self._loading_error)
+ return False
+ finally:
+ self.PLClibraryLock.release()
+
+ return True
+
+ @RunInMain
+ def LoadPLC(self):
+ res = self._LoadPLC()
+ if res:
+ self.PythonRuntimeInit()
+ else:
self._FreePLC()
- return False
+
+ return res
@RunInMain
def UnLoadPLC(self):
@@ -375,15 +391,17 @@
This is also called by __init__ to create dummy C func proxies
"""
self.PLClibraryLock.acquire()
-
- # Unload library explicitely
- if getattr(self, "_PLClibraryHandle", None) is not None:
- dlclose(self._PLClibraryHandle)
-
- # Forget all refs to library
- self._InitPLCStubCalls()
-
- self.PLClibraryLock.release()
+ try:
+ # Unload library explicitely
+ if getattr(self, "_PLClibraryHandle", None) is not None:
+ dlclose(self._PLClibraryHandle)
+
+ # Forget all refs to library
+ self._InitPLCStubCalls()
+
+ finally:
+ self.PLClibraryLock.release()
+
return False
def PythonRuntimeCall(self, methodname):
@@ -396,6 +414,7 @@
if exp is not None:
self.LogMessage(0, '\n'.join(traceback.format_exception(*exp)))
+ # used internaly
def PythonRuntimeInit(self):
MethodNames = ["init", "start", "stop", "cleanup"]
self.python_runtime_vars = globals().copy()
@@ -447,6 +466,7 @@
self.PythonRuntimeCall("init")
+ # used internaly
def PythonRuntimeCleanup(self):
if self.python_runtime_vars is not None:
self.PythonRuntimeCall("cleanup")
@@ -509,12 +529,12 @@
self.StatusChange()
self.PythonRuntimeCall("stop")
if self.TraceThread is not None:
- self.TraceWakeup.set()
self.TraceThread.join()
self.TraceThread = None
return True
return False
+ @RunInMain
def GetPLCstatus(self):
return self.PLCStatus, map(self.GetLogCount, xrange(LogLevelsCount))
@@ -524,14 +544,25 @@
NewFileName = md5sum + lib_ext
extra_files_log = os.path.join(self.workingdir, "extra_files.txt")
- self.UnLoadPLC()
+ old_PLC_filename = os.path.join(self.workingdir, \
+ self.CurrentPLCFilename) \
+ if self.CurrentPLCFilename is not None \
+ else None
+ new_PLC_filename = os.path.join(self.workingdir, NewFileName)
+
+ # Some platform (i.e. Xenomai) don't like reloading same .so file
+ replace_PLC_shared_object = new_PLC_filename != old_PLC_filename
+
+ if replace_PLC_shared_object:
+ self.UnLoadPLC()
self.LogMessage("NewPLC (%s)" % md5sum)
self.PLCStatus = "Empty"
+
try:
- os.remove(os.path.join(self.workingdir,
- self.CurrentPLCFilename))
+ if replace_PLC_shared_object:
+ os.remove(old_PLC_filename)
for filename in file(extra_files_log, "r").readlines() + [extra_files_log]:
try:
os.remove(os.path.join(self.workingdir, filename.strip()))
@@ -542,8 +573,8 @@
try:
# Create new PLC file
- open(os.path.join(self.workingdir, NewFileName),
- 'wb').write(data)
+ if replace_PLC_shared_object:
+ open(new_PLC_filename, 'wb').write(data)
# Store new PLC filename based on md5 key
open(self._GetMD5FileName(), "w").write(md5sum)
@@ -563,7 +594,9 @@
PLCprint(traceback.format_exc())
return False
- if self.LoadPLC():
+ if not replace_PLC_shared_object:
+ self.PLCStatus = "Stopped"
+ elif self.LoadPLC():
self.PLCStatus = "Stopped"
else:
self.PLCStatus = "Broken"
@@ -602,13 +635,6 @@
else:
self._suspendDebug(True)
- def _TracesPush(self, trace):
- self.TraceLock.acquire()
- lT = len(self.Traces)
- if lT != 0 and lT * len(self.Traces[0]) > 1024 * 1024:
- self.Traces.pop(0)
- self.Traces.append(trace)
- self.TraceLock.release()
def _TracesSwap(self):
self.LastSwapTrace = time()
@@ -619,26 +645,9 @@
Traces = self.Traces
self.Traces = []
self.TraceLock.release()
- self.TraceWakeup.set()
return Traces
- def _TracesAutoSuspend(self):
- # TraceProc stops here if Traces not polled for 3 seconds
- traces_age = time() - self.LastSwapTrace
- if traces_age > 3:
- self.TraceLock.acquire()
- self.Traces = []
- self.TraceLock.release()
- self._suspendDebug(True) # Disable debugger
- self.TraceWakeup.clear()
- self.TraceWakeup.wait()
- self._resumeDebug() # Re-enable debugger
-
- def _TracesFlush(self):
- self.TraceLock.acquire()
- self.Traces = []
- self.TraceLock.release()
-
+ @RunInMain
def GetTraceVariables(self):
return self.PLCStatus, self._TracesSwap()
@@ -646,28 +655,47 @@
"""
Return a list of traces, corresponding to the list of required idx
"""
+ self._resumeDebug() # Re-enable debugger
while self.PLCStatus == "Started":
tick = ctypes.c_uint32()
size = ctypes.c_uint32()
buff = ctypes.c_void_p()
TraceBuffer = None
- if self.PLClibraryLock.acquire(False):
- res = self._GetDebugData(ctypes.byref(tick),
- ctypes.byref(size),
- ctypes.byref(buff))
- if res == 0:
- if size.value:
- TraceBuffer = ctypes.string_at(buff.value, size.value)
- self._FreeDebugData()
- self.PLClibraryLock.release()
+
+ self.PLClibraryLock.acquire()
+
+ res = self._GetDebugData(ctypes.byref(tick),
+ ctypes.byref(size),
+ ctypes.byref(buff))
+ if res == 0:
+ if size.value:
+ TraceBuffer = ctypes.string_at(buff.value, size.value)
+ self._FreeDebugData()
+
+ self.PLClibraryLock.release()
- if res != 0:
- break
+ # leave thread if GetDebugData isn't happy.
+ if res != 0:
+ break
if TraceBuffer is not None:
- self._TracesPush((tick.value, TraceBuffer))
- self._TracesAutoSuspend()
- self._TracesFlush()
+ self.TraceLock.acquire()
+ lT = len(self.Traces)
+ if lT != 0 and lT * len(self.Traces[0]) > 1024 * 1024:
+ self.Traces.pop(0)
+ self.Traces.append((tick.value, TraceBuffer))
+ self.TraceLock.release()
+
+ # TraceProc stops here if Traces not polled for 3 seconds
+ traces_age = time() - self.LastSwapTrace
+ if traces_age > 3:
+ self.TraceLock.acquire()
+ self.Traces = []
+ self.TraceLock.release()
+ self._suspendDebug(True) # Disable debugger
+ break
+
+ self.TraceThread = None
def RemoteExec(self, script, *kwargs):
try: