# HG changeset patch # User Edouard Tisserant # Date 1555590942 -7200 # Node ID c8a25a3a7f8b3b79a57ee0ca3eae6a33e62236bb # Parent 1ffdc62784cf9f2689a12ad78d914cd59c09b5ab Runtime worker : unblock the last waiting job and prevent any new job to wait when Main Worker is being shut down. diff -r 1ffdc62784cf -r c8a25a3a7f8b runtime/Worker.py --- a/runtime/Worker.py Thu Apr 18 14:34:22 2019 +0200 +++ b/runtime/Worker.py Thu Apr 18 14:35:42 2019 +0200 @@ -21,8 +21,9 @@ def __init__(self, call, *args, **kwargs): self.job = (call, args, kwargs) self.result = None - self.success = False + self.success = None self.exc_info = None + self.enabled = False def do(self): """ @@ -67,9 +68,11 @@ """ self._threadID = _thread.get_ident() self.mutex.acquire() + self.enabled = True if args or kwargs: _job = job(*args, **kwargs) _job.do() + # _job.success can't be None after do() if not _job.success: self.reraise(_job) @@ -99,6 +102,9 @@ else: # otherwise notify and wait for completion self.mutex.acquire() + if not self.enabled: + self.mutex.release() + raise EOFError("Worker is disabled") while self.job is not None: self.free.wait() @@ -110,6 +116,9 @@ self.free.notify() self.mutex.release() + if _job.success is None: + raise EOFError("Worker job was interrupted") + if _job.success: return _job.result else: @@ -122,6 +131,8 @@ # mark queue self._finish = True self.mutex.acquire() + self.enabled = False self.job = None self.todo.notify() + self.done.notify() self.mutex.release()