runtime/Worker.py
branchfix_PLC_runtime_shutdown
changeset 2604 c8a25a3a7f8b
parent 2537 eb4a4cc41914
child 2611 a1bf03277cec
equal deleted inserted replaced
2603:1ffdc62784cf 2604:c8a25a3a7f8b
    19     job to be executed by a worker
    19     job to be executed by a worker
    20     """
    20     """
    21     def __init__(self, call, *args, **kwargs):
    21     def __init__(self, call, *args, **kwargs):
    22         self.job = (call, args, kwargs)
    22         self.job = (call, args, kwargs)
    23         self.result = None
    23         self.result = None
    24         self.success = False
    24         self.success = None
    25         self.exc_info = None
    25         self.exc_info = None
       
    26         self.enabled = False
    26 
    27 
    27     def do(self):
    28     def do(self):
    28         """
    29         """
    29         do the job by executing the call, and deal with exceptions
    30         do the job by executing the call, and deal with exceptions
    30         """
    31         """
    65         """
    66         """
    66         meant to be called by worker thread (blocking)
    67         meant to be called by worker thread (blocking)
    67         """
    68         """
    68         self._threadID = _thread.get_ident()
    69         self._threadID = _thread.get_ident()
    69         self.mutex.acquire()
    70         self.mutex.acquire()
       
    71         self.enabled = True
    70         if args or kwargs:
    72         if args or kwargs:
    71             _job = job(*args, **kwargs)
    73             _job = job(*args, **kwargs)
    72             _job.do()
    74             _job.do()
       
    75             # _job.success can't be None after do()
    73             if not _job.success:
    76             if not _job.success:
    74                 self.reraise(_job)
    77                 self.reraise(_job)
    75 
    78 
    76         while not self._finish:
    79         while not self._finish:
    77             self.todo.wait()
    80             self.todo.wait()
    97             # if caller is worker thread execute immediately
   100             # if caller is worker thread execute immediately
    98             _job.do()
   101             _job.do()
    99         else:
   102         else:
   100             # otherwise notify and wait for completion
   103             # otherwise notify and wait for completion
   101             self.mutex.acquire()
   104             self.mutex.acquire()
       
   105             if not self.enabled:
       
   106                 self.mutex.release()
       
   107                 raise EOFError("Worker is disabled")
   102 
   108 
   103             while self.job is not None:
   109             while self.job is not None:
   104                 self.free.wait()
   110                 self.free.wait()
   105 
   111 
   106             self.job = _job
   112             self.job = _job
   107             self.todo.notify()
   113             self.todo.notify()
   108             self.done.wait()
   114             self.done.wait()
   109             self.job = None
   115             self.job = None
   110             self.free.notify()
   116             self.free.notify()
   111             self.mutex.release()
   117             self.mutex.release()
       
   118 
       
   119         if _job.success is None:
       
   120             raise EOFError("Worker job was interrupted")
   112 
   121 
   113         if _job.success:
   122         if _job.success:
   114             return _job.result
   123             return _job.result
   115         else:
   124         else:
   116             self.reraise(_job)
   125             self.reraise(_job)
   120         unblocks main thread, and terminate execution of runloop()
   129         unblocks main thread, and terminate execution of runloop()
   121         """
   130         """
   122         # mark queue
   131         # mark queue
   123         self._finish = True
   132         self._finish = True
   124         self.mutex.acquire()
   133         self.mutex.acquire()
       
   134         self.enabled = False
   125         self.job = None
   135         self.job = None
   126         self.todo.notify()
   136         self.todo.notify()
       
   137         self.done.notify()
   127         self.mutex.release()
   138         self.mutex.release()