# HG changeset patch # User Edouard Tisserant # Date 1660672369 -7200 # Node ID 8a54fd58a55224e0448516b682e7e9a691b80b27 # Parent 27e7679ddb993a00d1d4ee9f97402b3b37008de1 Runtime: wx.app.mainLoop and twisted reactor now share main thread with runtime.Worker. This fixes exception when invoking python interactive shell from Runtime's tray icon right-click menu. Probably a consequence of wxPython upgrade, together with initial bad idea to move wx.app.mainloop in non-main thread. diff -r 27e7679ddb99 -r 8a54fd58a552 Beremiz_service.py --- a/Beremiz_service.py Sat Aug 13 16:12:39 2022 +0200 +++ b/Beremiz_service.py Tue Aug 16 19:52:49 2022 +0200 @@ -435,8 +435,6 @@ if havewx: reactor.registerWxApp(app) -twisted_reactor_thread_id = None -ui_thread = None if havewx: wx_eval_lock = Semaphore(0) @@ -451,24 +449,18 @@ obj.res = default_evaluator(tocall, *args, **kwargs) wx_eval_lock.release() + main_thread_id = currentThread().ident def evaluator(tocall, *args, **kwargs): - # To prevent deadlocks, check if current thread is not one of the UI - # UI threads can be either the one from WX main loop or - # worker thread from twisted "threadselect" reactor + # To prevent deadlocks, check if current thread is not one already main current_id = currentThread().ident - if ui_thread is not None \ - and ui_thread.ident != current_id \ - and (not havetwisted or ( - twisted_reactor_thread_id is not None - and twisted_reactor_thread_id != current_id)): - + if main_thread_id != current_id: o = type('', (object,), dict(call=(tocall, args, kwargs), res=None)) wx.CallAfter(wx_evaluator, o) wx_eval_lock.acquire() return o.res else: - # avoid dead lock if called from the wx mainloop + # avoid dead lock if called from main : do job immediately return default_evaluator(tocall, *args, **kwargs) else: evaluator = default_evaluator @@ -555,13 +547,39 @@ except Exception: LogMessageAndException(_("WAMP client startup failed. ")) +if havetwisted or havewx: + if havetwisted: + # reactor._installSignalHandlersAgain() + waker_func = reactor._runInMainThread + def ui_blocking_call(): + # FIXME: had to disable SignaHandlers install because + # signal not working in non-main thread + reactor.run(installSignalHandlers=False) + else: + waker_func = wx.CallAfter + ui_blocking_call = app.MainLoop + + def ui_launched_report(): + # IDE expects to see that string to stop waiting for runtime + # to be ready and connnect to it. + print("UI thread started successfully.") + + # This orders ui loop to signal when ready on Stdout + if havetwisted: + reactor.callLater(0, ui_launched_report) + else: + wx.CallAfter(ui_launched_report) + + +pyro_thread = None + def FirstWorkerJob(): """ RPC through pyro/wamp/UI may lead to delegation to Worker, then this function ensures that Worker is already created when pyro starts """ - global pyro_thread, pyroserver, ui_thread, reactor, twisted_reactor_thread_id + global pyro_thread, pyroserver pyro_thread_started = Lock() pyro_thread_started.acquire() @@ -581,41 +599,19 @@ sys.stdout.write(_("Current working directory :") + WorkingDir + "\n") sys.stdout.flush() - if not (havetwisted or havewx): - return - - ui_thread_started = Lock() - ui_thread_started.acquire() - if havetwisted: - # reactor._installSignalHandlersAgain() - def ui_thread_target(): - # FIXME: had to disable SignaHandlers install because - # signal not working in non-main thread - reactor.run(installSignalHandlers=False) + runtime.GetPLCObjectSingleton().AutoLoad(autostart) + +try: + if havetwisted or havewx: + # worker that copes with wx and (wx)reactor + runtime.MainWorker.interleave(waker_func, FirstWorkerJob) + ui_blocking_call() + runtime.MainWorker.stop() + else: - ui_thread_target = app.MainLoop - - ui_thread = Thread(target=ui_thread_target, name="UIThread") - ui_thread.start() - - # This order ui loop to unblock main thread when ready. - if havetwisted: - def signal_uithread_started(): - global twisted_reactor_thread_id - twisted_reactor_thread_id = currentThread().ident - ui_thread_started.release() - reactor.callLater(0, signal_uithread_started) - else: - wx.CallAfter(ui_thread_started.release) - - # Wait for ui thread to be effective - ui_thread_started.acquire() - print("UI thread started successfully.") - - runtime.GetPLCObjectSingleton().AutoLoad(autostart) - -try: - runtime.MainWorker.runloop(FirstWorkerJob) + # blocking worker loop + runtime.MainWorker.runloop(FirstWorkerJob) + except KeyboardInterrupt: pass @@ -631,9 +627,7 @@ if havetwisted: reactor.stop() - ui_thread.join() elif havewx: app.ExitMainLoop() - ui_thread.join() sys.exit(0) diff -r 27e7679ddb99 -r 8a54fd58a552 runtime/Worker.py --- a/runtime/Worker.py Sat Aug 13 16:12:39 2022 +0200 +++ b/runtime/Worker.py Tue Aug 16 19:52:49 2022 +0200 @@ -9,7 +9,8 @@ from __future__ import absolute_import import sys -from threading import Lock, Condition +from threading import Lock, Condition, Thread + import six from six.moves import _thread @@ -86,6 +87,57 @@ self.mutex.release() + def interleave(self, waker, *args, **kwargs): + """ + as for twisted reactor's interleave, it passes all jobs to waker func + additionaly, it creates a new thread to wait for new job. + """ + self.feed = Condition(self.mutex) + self._threadID = _thread.get_ident() + + def wakerfeedingloop(): + self.mutex.acquire() + self.enabled = True + if args or kwargs: + def first_job_todo(): + _job = job(*args, **kwargs) + _job.do() + if not _job.success: + self.reraise(_job) + self.mutex.acquire() + self.feed.notify() + self.mutex.release() + waker(first_job_todo) + self.feed.wait() + + while not self._finish: + self.todo.wait() + def job_todo(): + self.mutex.acquire() + if self.job is not None: + self.job.do() + self.feed.notify() + self.done.notify() + self.mutex.release() + waker(job_todo) + self.feed.wait() + + self.mutex.release() + self.own_thread = Thread(target = wakerfeedingloop).start() + + def stop(): + """ + !interleave + """ + self.mutex.acquire() + self._finish = True + self.enabled = False + self.job = None + self.todo.notify() + self.done.notify() + self.mutex.release() + self.own_thread.join() + def call(self, *args, **kwargs): """ creates a job, execute it in worker thread, and deliver result.