# HG changeset patch # User Edouard Tisserant # Date 1695917697 -7200 # Node ID cf027bfe2653a8103b3a40cebefcce748a0bccd3 # Parent d7f9b6af98ef821693b11ed4e2368ac741623fb8 Runtime/MainWorker: fix blocking race condition diff -r d7f9b6af98ef -r cf027bfe2653 runtime/Worker.py --- a/runtime/Worker.py Thu Sep 28 18:00:21 2023 +0200 +++ b/runtime/Worker.py Thu Sep 28 18:14:57 2023 +0200 @@ -8,9 +8,7 @@ # See COPYING.Runtime file for copyrights details. -from threading import Lock, Condition, Thread - -import _thread +from threading import Lock, Condition, Thread, get_ident class job(object): @@ -64,7 +62,7 @@ """ meant to be called by worker thread (blocking) """ - self._threadID = _thread.get_ident() + self._threadID = get_ident() self.mutex.acquire() self.enabled = True if args or kwargs: @@ -75,13 +73,10 @@ self.reraise(_job) while not self._finish: - self.todo.wait() - if self.job is not None: - self.job.do() - self.done.notify() - else: - break - + self.todo.wait_for(self.job is not None) + self.job.do() + self.done.notify() + self.mutex.release() def interleave(self, waker, stopper, *args, **kwargs): @@ -90,39 +85,42 @@ additionaly, it creates a new thread to wait for new job. """ self.feed = Condition(self.mutex) - self._threadID = _thread.get_ident() + self._threadID = get_ident() self.stopper = stopper + def do_pending_job(): + self.mutex.acquire() + if self.job is not None: + self.job.do() + self.done.notify_all() + self.mutex.release() + def wakerfeedingloop(): self.mutex.acquire() self.enabled = True + + # Handle first job if args or kwargs: - def first_job_todo(): - _job = job(*args, **kwargs) - _job.do() - if not _job.success: - self.reraise(_job) - self.mutex.acquire() - self.feed.notify() - self.mutex.release() - waker(first_job_todo) - self.feed.wait() + self.job = job(*args, **kwargs) + waker(do_pending_job) + self.done.wait_for(lambda: self.job.success is not None) + if not self.job.success: + self.reraise(_job) + self.job = None + + self.free.notify() while not self._finish: - self.todo.wait() - def job_todo(): - self.mutex.acquire() - if self.job is not None: - self.job.do() - self.feed.notify() - self.done.notify() - self.mutex.release() + self.todo.wait_for(lambda: self.job is not None) if self._finish: break - waker(job_todo) - self.feed.wait() + waker(do_pending_job) + self.done.wait_for(lambda: self.job.success is not None) + self.job = None + self.free.notify() self.mutex.release() + self.own_thread = Thread(target = wakerfeedingloop) self.own_thread.start() @@ -135,7 +133,7 @@ self.enabled = False self.job = None self.todo.notify() - self.done.notify() + self.done.notify_all() self.mutex.release() self.own_thread.join() @@ -149,7 +147,7 @@ _job = job(*args, **kwargs) - if self._threadID == _thread.get_ident(): + if self._threadID == get_ident(): # if caller is worker thread execute immediately _job.do() else: @@ -159,13 +157,11 @@ self.mutex.release() raise EOFError("Worker is disabled") - while self.job is not None: - self.free.wait() + self.free.wait_for(lambda: self.job is None) self.job = _job self.todo.notify() - self.done.wait() - self.job = None + self.done.wait_for(lambda: _job.success is not None) self.free.notify() self.mutex.release()