Runtime: wx.app.mainLoop and twisted reactor now share main thread with runtime.Worker. wxPython4
authorEdouard Tisserant <edouard.tisserant@gmail.com>
Tue, 16 Aug 2022 19:52:49 +0200
branchwxPython4
changeset 3584 8a54fd58a552
parent 3583 27e7679ddb99
child 3585 efdefbad49eb
Runtime: wx.app.mainLoop and twisted reactor now share main thread with runtime.Worker.

This fixes exception when invoking python interactive shell from Runtime's tray
icon right-click menu.

Probably a consequence of wxPython upgrade, together with initial bad idea to
move wx.app.mainloop in non-main thread.
Beremiz_service.py
runtime/Worker.py
--- a/Beremiz_service.py	Sat Aug 13 16:12:39 2022 +0200
+++ b/Beremiz_service.py	Tue Aug 16 19:52:49 2022 +0200
@@ -435,8 +435,6 @@
     if havewx:
         reactor.registerWxApp(app)
 
-twisted_reactor_thread_id = None
-ui_thread = None
 
 if havewx:
     wx_eval_lock = Semaphore(0)
@@ -451,24 +449,18 @@
         obj.res = default_evaluator(tocall, *args, **kwargs)
         wx_eval_lock.release()
 
+    main_thread_id = currentThread().ident
     def evaluator(tocall, *args, **kwargs):
-        # To prevent deadlocks, check if current thread is not one of the UI
-        # UI threads can be either the one from WX main loop or
-        # worker thread from twisted "threadselect" reactor
+        # To prevent deadlocks, check if current thread is not one already main
         current_id = currentThread().ident
 
-        if ui_thread is not None \
-            and ui_thread.ident != current_id \
-            and (not havetwisted or (
-                    twisted_reactor_thread_id is not None
-                    and twisted_reactor_thread_id != current_id)):
-
+        if main_thread_id != current_id:
             o = type('', (object,), dict(call=(tocall, args, kwargs), res=None))
             wx.CallAfter(wx_evaluator, o)
             wx_eval_lock.acquire()
             return o.res
         else:
-            # avoid dead lock if called from the wx mainloop
+            # avoid dead lock if called from main : do job immediately
             return default_evaluator(tocall, *args, **kwargs)
 else:
     evaluator = default_evaluator
@@ -555,13 +547,39 @@
         except Exception:
             LogMessageAndException(_("WAMP client startup failed. "))
 
+if havetwisted or havewx:
+    if havetwisted:
+        # reactor._installSignalHandlersAgain()
+        waker_func = reactor._runInMainThread
+        def ui_blocking_call():
+            # FIXME: had to disable SignaHandlers install because
+            # signal not working in non-main thread
+            reactor.run(installSignalHandlers=False)
+    else:
+        waker_func = wx.CallAfter
+        ui_blocking_call = app.MainLoop
+
+    def ui_launched_report():
+        # IDE expects to see that string to stop waiting for runtime
+        # to be ready and connnect to it.
+        print("UI thread started successfully.")
+
+    # This orders ui loop to signal when ready on Stdout
+    if havetwisted:
+        reactor.callLater(0, ui_launched_report)
+    else:
+        wx.CallAfter(ui_launched_report)
+
+
+pyro_thread = None
+
 def FirstWorkerJob():
     """
     RPC through pyro/wamp/UI may lead to delegation to Worker,
     then this function ensures that Worker is already
     created when pyro starts
     """
-    global pyro_thread, pyroserver, ui_thread, reactor, twisted_reactor_thread_id
+    global pyro_thread, pyroserver
 
     pyro_thread_started = Lock()
     pyro_thread_started.acquire()
@@ -581,41 +599,19 @@
     sys.stdout.write(_("Current working directory :") + WorkingDir + "\n")
     sys.stdout.flush()
 
-    if not (havetwisted or havewx):
-        return
-
-    ui_thread_started = Lock()
-    ui_thread_started.acquire()
-    if havetwisted:
-        # reactor._installSignalHandlersAgain()
-        def ui_thread_target():
-            # FIXME: had to disable SignaHandlers install because
-            # signal not working in non-main thread
-            reactor.run(installSignalHandlers=False)
+    runtime.GetPLCObjectSingleton().AutoLoad(autostart)
+
+try:
+    if havetwisted or havewx:
+        # worker that copes with wx and (wx)reactor
+        runtime.MainWorker.interleave(waker_func, FirstWorkerJob)
+        ui_blocking_call()
+        runtime.MainWorker.stop()
+
     else:
-        ui_thread_target = app.MainLoop
-
-    ui_thread = Thread(target=ui_thread_target, name="UIThread")
-    ui_thread.start()
-
-    # This order ui loop to unblock main thread when ready.
-    if havetwisted:
-        def signal_uithread_started():
-            global twisted_reactor_thread_id
-            twisted_reactor_thread_id = currentThread().ident
-            ui_thread_started.release()
-        reactor.callLater(0, signal_uithread_started)
-    else:
-        wx.CallAfter(ui_thread_started.release)
-
-    # Wait for ui thread to be effective
-    ui_thread_started.acquire()
-    print("UI thread started successfully.")
-
-    runtime.GetPLCObjectSingleton().AutoLoad(autostart)
-
-try:
-    runtime.MainWorker.runloop(FirstWorkerJob)
+        # blocking worker loop
+        runtime.MainWorker.runloop(FirstWorkerJob)
+
 except KeyboardInterrupt:
     pass
 
@@ -631,9 +627,7 @@
 
 if havetwisted:
     reactor.stop()
-    ui_thread.join()
 elif havewx:
     app.ExitMainLoop()
-    ui_thread.join()
 
 sys.exit(0)
--- a/runtime/Worker.py	Sat Aug 13 16:12:39 2022 +0200
+++ b/runtime/Worker.py	Tue Aug 16 19:52:49 2022 +0200
@@ -9,7 +9,8 @@
 
 from __future__ import absolute_import
 import sys
-from threading import Lock, Condition
+from threading import Lock, Condition, Thread
+
 import six
 from six.moves import _thread
 
@@ -86,6 +87,57 @@
 
         self.mutex.release()
 
+    def interleave(self, waker, *args, **kwargs):
+        """
+        as for twisted reactor's interleave, it passes all jobs to waker func
+        additionaly, it creates a new thread to wait for new job.
+        """
+        self.feed = Condition(self.mutex)
+        self._threadID = _thread.get_ident()
+
+        def wakerfeedingloop():
+            self.mutex.acquire()
+            self.enabled = True
+            if args or kwargs:
+                def first_job_todo():
+                    _job = job(*args, **kwargs)
+                    _job.do()
+                    if not _job.success:
+                        self.reraise(_job)
+                    self.mutex.acquire()
+                    self.feed.notify()
+                    self.mutex.release()
+                waker(first_job_todo)
+                self.feed.wait()
+
+            while not self._finish:
+                self.todo.wait()
+                def job_todo():
+                    self.mutex.acquire()
+                    if self.job is not None:
+                        self.job.do()
+                        self.feed.notify()
+                        self.done.notify()
+                    self.mutex.release()
+                waker(job_todo)
+                self.feed.wait()
+
+            self.mutex.release()
+        self.own_thread = Thread(target = wakerfeedingloop).start()
+
+    def stop():
+        """
+        !interleave
+        """
+        self.mutex.acquire()
+        self._finish = True
+        self.enabled = False
+        self.job = None
+        self.todo.notify()
+        self.done.notify()
+        self.mutex.release()
+        self.own_thread.join()
+
     def call(self, *args, **kwargs):
         """
         creates a job, execute it in worker thread, and deliver result.