Runtime: wx.app.mainLoop and twisted reactor now share main thread with runtime.Worker.
This fixes exception when invoking python interactive shell from Runtime's tray
icon right-click menu.
Probably a consequence of wxPython upgrade, together with initial bad idea to
move wx.app.mainloop in non-main thread.
--- a/Beremiz_service.py Sat Aug 13 16:12:39 2022 +0200
+++ b/Beremiz_service.py Tue Aug 16 19:52:49 2022 +0200
@@ -435,8 +435,6 @@
if havewx:
reactor.registerWxApp(app)
-twisted_reactor_thread_id = None
-ui_thread = None
if havewx:
wx_eval_lock = Semaphore(0)
@@ -451,24 +449,18 @@
obj.res = default_evaluator(tocall, *args, **kwargs)
wx_eval_lock.release()
+ main_thread_id = currentThread().ident
def evaluator(tocall, *args, **kwargs):
- # To prevent deadlocks, check if current thread is not one of the UI
- # UI threads can be either the one from WX main loop or
- # worker thread from twisted "threadselect" reactor
+ # To prevent deadlocks, check if current thread is not one already main
current_id = currentThread().ident
- if ui_thread is not None \
- and ui_thread.ident != current_id \
- and (not havetwisted or (
- twisted_reactor_thread_id is not None
- and twisted_reactor_thread_id != current_id)):
-
+ if main_thread_id != current_id:
o = type('', (object,), dict(call=(tocall, args, kwargs), res=None))
wx.CallAfter(wx_evaluator, o)
wx_eval_lock.acquire()
return o.res
else:
- # avoid dead lock if called from the wx mainloop
+ # avoid dead lock if called from main : do job immediately
return default_evaluator(tocall, *args, **kwargs)
else:
evaluator = default_evaluator
@@ -555,13 +547,39 @@
except Exception:
LogMessageAndException(_("WAMP client startup failed. "))
+if havetwisted or havewx:
+ if havetwisted:
+ # reactor._installSignalHandlersAgain()
+ waker_func = reactor._runInMainThread
+ def ui_blocking_call():
+ # FIXME: had to disable SignaHandlers install because
+ # signal not working in non-main thread
+ reactor.run(installSignalHandlers=False)
+ else:
+ waker_func = wx.CallAfter
+ ui_blocking_call = app.MainLoop
+
+ def ui_launched_report():
+ # IDE expects to see that string to stop waiting for runtime
+ # to be ready and connnect to it.
+ print("UI thread started successfully.")
+
+ # This orders ui loop to signal when ready on Stdout
+ if havetwisted:
+ reactor.callLater(0, ui_launched_report)
+ else:
+ wx.CallAfter(ui_launched_report)
+
+
+pyro_thread = None
+
def FirstWorkerJob():
"""
RPC through pyro/wamp/UI may lead to delegation to Worker,
then this function ensures that Worker is already
created when pyro starts
"""
- global pyro_thread, pyroserver, ui_thread, reactor, twisted_reactor_thread_id
+ global pyro_thread, pyroserver
pyro_thread_started = Lock()
pyro_thread_started.acquire()
@@ -581,41 +599,19 @@
sys.stdout.write(_("Current working directory :") + WorkingDir + "\n")
sys.stdout.flush()
- if not (havetwisted or havewx):
- return
-
- ui_thread_started = Lock()
- ui_thread_started.acquire()
- if havetwisted:
- # reactor._installSignalHandlersAgain()
- def ui_thread_target():
- # FIXME: had to disable SignaHandlers install because
- # signal not working in non-main thread
- reactor.run(installSignalHandlers=False)
+ runtime.GetPLCObjectSingleton().AutoLoad(autostart)
+
+try:
+ if havetwisted or havewx:
+ # worker that copes with wx and (wx)reactor
+ runtime.MainWorker.interleave(waker_func, FirstWorkerJob)
+ ui_blocking_call()
+ runtime.MainWorker.stop()
+
else:
- ui_thread_target = app.MainLoop
-
- ui_thread = Thread(target=ui_thread_target, name="UIThread")
- ui_thread.start()
-
- # This order ui loop to unblock main thread when ready.
- if havetwisted:
- def signal_uithread_started():
- global twisted_reactor_thread_id
- twisted_reactor_thread_id = currentThread().ident
- ui_thread_started.release()
- reactor.callLater(0, signal_uithread_started)
- else:
- wx.CallAfter(ui_thread_started.release)
-
- # Wait for ui thread to be effective
- ui_thread_started.acquire()
- print("UI thread started successfully.")
-
- runtime.GetPLCObjectSingleton().AutoLoad(autostart)
-
-try:
- runtime.MainWorker.runloop(FirstWorkerJob)
+ # blocking worker loop
+ runtime.MainWorker.runloop(FirstWorkerJob)
+
except KeyboardInterrupt:
pass
@@ -631,9 +627,7 @@
if havetwisted:
reactor.stop()
- ui_thread.join()
elif havewx:
app.ExitMainLoop()
- ui_thread.join()
sys.exit(0)
--- a/runtime/Worker.py Sat Aug 13 16:12:39 2022 +0200
+++ b/runtime/Worker.py Tue Aug 16 19:52:49 2022 +0200
@@ -9,7 +9,8 @@
from __future__ import absolute_import
import sys
-from threading import Lock, Condition
+from threading import Lock, Condition, Thread
+
import six
from six.moves import _thread
@@ -86,6 +87,57 @@
self.mutex.release()
+ def interleave(self, waker, *args, **kwargs):
+ """
+ as for twisted reactor's interleave, it passes all jobs to waker func
+ additionaly, it creates a new thread to wait for new job.
+ """
+ self.feed = Condition(self.mutex)
+ self._threadID = _thread.get_ident()
+
+ def wakerfeedingloop():
+ self.mutex.acquire()
+ self.enabled = True
+ if args or kwargs:
+ def first_job_todo():
+ _job = job(*args, **kwargs)
+ _job.do()
+ if not _job.success:
+ self.reraise(_job)
+ self.mutex.acquire()
+ self.feed.notify()
+ self.mutex.release()
+ waker(first_job_todo)
+ self.feed.wait()
+
+ while not self._finish:
+ self.todo.wait()
+ def job_todo():
+ self.mutex.acquire()
+ if self.job is not None:
+ self.job.do()
+ self.feed.notify()
+ self.done.notify()
+ self.mutex.release()
+ waker(job_todo)
+ self.feed.wait()
+
+ self.mutex.release()
+ self.own_thread = Thread(target = wakerfeedingloop).start()
+
+ def stop():
+ """
+ !interleave
+ """
+ self.mutex.acquire()
+ self._finish = True
+ self.enabled = False
+ self.job = None
+ self.todo.notify()
+ self.done.notify()
+ self.mutex.release()
+ self.own_thread.join()
+
def call(self, *args, **kwargs):
"""
creates a job, execute it in worker thread, and deliver result.