--- a/runtime/Worker.py Thu Sep 28 18:00:21 2023 +0200
+++ b/runtime/Worker.py Thu Sep 28 18:14:57 2023 +0200
@@ -8,9 +8,7 @@
# See COPYING.Runtime file for copyrights details.
-from threading import Lock, Condition, Thread
-
-import _thread
+from threading import Lock, Condition, Thread, get_ident
class job(object):
@@ -64,7 +62,7 @@
"""
meant to be called by worker thread (blocking)
"""
- self._threadID = _thread.get_ident()
+ self._threadID = get_ident()
self.mutex.acquire()
self.enabled = True
if args or kwargs:
@@ -75,13 +73,10 @@
self.reraise(_job)
while not self._finish:
- self.todo.wait()
- if self.job is not None:
- self.job.do()
- self.done.notify()
- else:
- break
-
+ self.todo.wait_for(self.job is not None)
+ self.job.do()
+ self.done.notify()
+
self.mutex.release()
def interleave(self, waker, stopper, *args, **kwargs):
@@ -90,39 +85,42 @@
additionaly, it creates a new thread to wait for new job.
"""
self.feed = Condition(self.mutex)
- self._threadID = _thread.get_ident()
+ self._threadID = get_ident()
self.stopper = stopper
+ def do_pending_job():
+ self.mutex.acquire()
+ if self.job is not None:
+ self.job.do()
+ self.done.notify_all()
+ self.mutex.release()
+
def wakerfeedingloop():
self.mutex.acquire()
self.enabled = True
+
+ # Handle first job
if args or kwargs:
- def first_job_todo():
- _job = job(*args, **kwargs)
- _job.do()
- if not _job.success:
- self.reraise(_job)
- self.mutex.acquire()
- self.feed.notify()
- self.mutex.release()
- waker(first_job_todo)
- self.feed.wait()
+ self.job = job(*args, **kwargs)
+ waker(do_pending_job)
+ self.done.wait_for(lambda: self.job.success is not None)
+ if not self.job.success:
+ self.reraise(_job)
+ self.job = None
+
+ self.free.notify()
while not self._finish:
- self.todo.wait()
- def job_todo():
- self.mutex.acquire()
- if self.job is not None:
- self.job.do()
- self.feed.notify()
- self.done.notify()
- self.mutex.release()
+ self.todo.wait_for(lambda: self.job is not None)
if self._finish:
break
- waker(job_todo)
- self.feed.wait()
+ waker(do_pending_job)
+ self.done.wait_for(lambda: self.job.success is not None)
+ self.job = None
+ self.free.notify()
self.mutex.release()
+
self.own_thread = Thread(target = wakerfeedingloop)
self.own_thread.start()
@@ -135,7 +133,7 @@
self.enabled = False
self.job = None
self.todo.notify()
- self.done.notify()
+ self.done.notify_all()
self.mutex.release()
self.own_thread.join()
@@ -149,7 +147,7 @@
_job = job(*args, **kwargs)
- if self._threadID == _thread.get_ident():
+ if self._threadID == get_ident():
# if caller is worker thread execute immediately
_job.do()
else:
@@ -159,13 +157,11 @@
self.mutex.release()
raise EOFError("Worker is disabled")
- while self.job is not None:
- self.free.wait()
+ self.free.wait_for(lambda: self.job is None)
self.job = _job
self.todo.notify()
- self.done.wait()
- self.job = None
+ self.done.wait_for(lambda: _job.success is not None)
self.free.notify()
self.mutex.release()