--- a/runtime/Worker.py Sat Aug 13 16:12:39 2022 +0200
+++ b/runtime/Worker.py Tue Aug 16 19:52:49 2022 +0200
@@ -9,7 +9,8 @@
from __future__ import absolute_import
import sys
-from threading import Lock, Condition
+from threading import Lock, Condition, Thread
+
import six
from six.moves import _thread
@@ -86,6 +87,57 @@
self.mutex.release()
+ def interleave(self, waker, *args, **kwargs):
+ """
+ as for twisted reactor's interleave, it passes all jobs to waker func
+ additionaly, it creates a new thread to wait for new job.
+ """
+ self.feed = Condition(self.mutex)
+ self._threadID = _thread.get_ident()
+
+ def wakerfeedingloop():
+ self.mutex.acquire()
+ self.enabled = True
+ if args or kwargs:
+ def first_job_todo():
+ _job = job(*args, **kwargs)
+ _job.do()
+ if not _job.success:
+ self.reraise(_job)
+ self.mutex.acquire()
+ self.feed.notify()
+ self.mutex.release()
+ waker(first_job_todo)
+ self.feed.wait()
+
+ while not self._finish:
+ self.todo.wait()
+ def job_todo():
+ self.mutex.acquire()
+ if self.job is not None:
+ self.job.do()
+ self.feed.notify()
+ self.done.notify()
+ self.mutex.release()
+ waker(job_todo)
+ self.feed.wait()
+
+ self.mutex.release()
+ self.own_thread = Thread(target = wakerfeedingloop).start()
+
+ def stop():
+ """
+ !interleave
+ """
+ self.mutex.acquire()
+ self._finish = True
+ self.enabled = False
+ self.job = None
+ self.todo.notify()
+ self.done.notify()
+ self.mutex.release()
+ self.own_thread.join()
+
def call(self, *args, **kwargs):
"""
creates a job, execute it in worker thread, and deliver result.