Runtime worker : unblock the last waiting job and prevent any new job to wait when Main Worker is being shut down. fix_PLC_runtime_shutdown
authorEdouard Tisserant
Thu, 18 Apr 2019 14:35:42 +0200
branchfix_PLC_runtime_shutdown
changeset 2604 c8a25a3a7f8b
parent 2603 1ffdc62784cf
child 2605 0615137bf515
Runtime worker : unblock the last waiting job and prevent any new job to wait when Main Worker is being shut down.
runtime/Worker.py
--- a/runtime/Worker.py	Thu Apr 18 14:34:22 2019 +0200
+++ b/runtime/Worker.py	Thu Apr 18 14:35:42 2019 +0200
@@ -21,8 +21,9 @@
     def __init__(self, call, *args, **kwargs):
         self.job = (call, args, kwargs)
         self.result = None
-        self.success = False
+        self.success = None
         self.exc_info = None
+        self.enabled = False
 
     def do(self):
         """
@@ -67,9 +68,11 @@
         """
         self._threadID = _thread.get_ident()
         self.mutex.acquire()
+        self.enabled = True
         if args or kwargs:
             _job = job(*args, **kwargs)
             _job.do()
+            # _job.success can't be None after do()
             if not _job.success:
                 self.reraise(_job)
 
@@ -99,6 +102,9 @@
         else:
             # otherwise notify and wait for completion
             self.mutex.acquire()
+            if not self.enabled:
+                self.mutex.release()
+                raise EOFError("Worker is disabled")
 
             while self.job is not None:
                 self.free.wait()
@@ -110,6 +116,9 @@
             self.free.notify()
             self.mutex.release()
 
+        if _job.success is None:
+            raise EOFError("Worker job was interrupted")
+
         if _job.success:
             return _job.result
         else:
@@ -122,6 +131,8 @@
         # mark queue
         self._finish = True
         self.mutex.acquire()
+        self.enabled = False
         self.job = None
         self.todo.notify()
+        self.done.notify()
         self.mutex.release()