runtime/Worker.py
branchwxPython4
changeset 3642 cd3d15e8ef42
parent 3585 efdefbad49eb
child 3750 f62625418bff
equal deleted inserted replaced
3641:d8dc29dfc344 3642:cd3d15e8ef42
    50         self.todo = Condition(self.mutex)
    50         self.todo = Condition(self.mutex)
    51         self.done = Condition(self.mutex)
    51         self.done = Condition(self.mutex)
    52         self.free = Condition(self.mutex)
    52         self.free = Condition(self.mutex)
    53         self.job = None
    53         self.job = None
    54         self.enabled = False
    54         self.enabled = False
       
    55         self.stopper = None
       
    56         self.own_thread = None
    55 
    57 
    56     def reraise(self, job):
    58     def reraise(self, job):
    57         """
    59         """
    58         reraise exception happend in a job
    60         reraise exception happend in a job
    59         @param job: job where original exception happend
    61         @param job: job where original exception happend
    85             else:
    87             else:
    86                 break
    88                 break
    87 
    89 
    88         self.mutex.release()
    90         self.mutex.release()
    89 
    91 
    90     def interleave(self, waker, *args, **kwargs):
    92     def interleave(self, waker, stopper, *args, **kwargs):
    91         """
    93         """
    92         as for twisted reactor's interleave, it passes all jobs to waker func
    94         as for twisted reactor's interleave, it passes all jobs to waker func
    93         additionaly, it creates a new thread to wait for new job.
    95         additionaly, it creates a new thread to wait for new job.
    94         """
    96         """
    95         self.feed = Condition(self.mutex)
    97         self.feed = Condition(self.mutex)
    96         self._threadID = _thread.get_ident()
    98         self._threadID = _thread.get_ident()
       
    99         self.stopper = stopper
    97 
   100 
    98         def wakerfeedingloop():
   101         def wakerfeedingloop():
    99             self.mutex.acquire()
   102             self.mutex.acquire()
   100             self.enabled = True
   103             self.enabled = True
   101             if args or kwargs:
   104             if args or kwargs:
   117                     if self.job is not None:
   120                     if self.job is not None:
   118                         self.job.do()
   121                         self.job.do()
   119                         self.feed.notify()
   122                         self.feed.notify()
   120                         self.done.notify()
   123                         self.done.notify()
   121                     self.mutex.release()
   124                     self.mutex.release()
       
   125                 if self._finish:
       
   126                     break
   122                 waker(job_todo)
   127                 waker(job_todo)
   123                 self.feed.wait()
   128                 self.feed.wait()
   124 
   129 
   125             self.mutex.release()
   130             self.mutex.release()
   126         self.own_thread = Thread(target = wakerfeedingloop)
   131         self.own_thread = Thread(target = wakerfeedingloop)
   187         self.enabled = False
   192         self.enabled = False
   188         self.job = None
   193         self.job = None
   189         self.todo.notify()
   194         self.todo.notify()
   190         self.done.notify()
   195         self.done.notify()
   191         self.mutex.release()
   196         self.mutex.release()
       
   197 
       
   198     def finish(self):
       
   199         if self.own_thread is None:
       
   200             self.quit()
       
   201         if self.stopper is not None:
       
   202             self.stopper()