runtime/Worker.py
changeset 3846 cf027bfe2653
parent 3753 5256e4bd92e6
child 3849 c3f4e114af38
equal deleted inserted replaced
3845:d7f9b6af98ef 3846:cf027bfe2653
     6 # Copyright (C) 2018: Edouard TISSERANT
     6 # Copyright (C) 2018: Edouard TISSERANT
     7 #
     7 #
     8 # See COPYING.Runtime file for copyrights details.
     8 # See COPYING.Runtime file for copyrights details.
     9 
     9 
    10 
    10 
    11 from threading import Lock, Condition, Thread
    11 from threading import Lock, Condition, Thread, get_ident
    12 
       
    13 import _thread
       
    14 
    12 
    15 
    13 
    16 class job(object):
    14 class job(object):
    17     """
    15     """
    18     job to be executed by a worker
    16     job to be executed by a worker
    62 
    60 
    63     def runloop(self, *args, **kwargs):
    61     def runloop(self, *args, **kwargs):
    64         """
    62         """
    65         meant to be called by worker thread (blocking)
    63         meant to be called by worker thread (blocking)
    66         """
    64         """
    67         self._threadID = _thread.get_ident()
    65         self._threadID = get_ident()
    68         self.mutex.acquire()
    66         self.mutex.acquire()
    69         self.enabled = True
    67         self.enabled = True
    70         if args or kwargs:
    68         if args or kwargs:
    71             _job = job(*args, **kwargs)
    69             _job = job(*args, **kwargs)
    72             _job.do()
    70             _job.do()
    73             # _job.success can't be None after do()
    71             # _job.success can't be None after do()
    74             if not _job.success:
    72             if not _job.success:
    75                 self.reraise(_job)
    73                 self.reraise(_job)
    76 
    74 
    77         while not self._finish:
    75         while not self._finish:
    78             self.todo.wait()
    76             self.todo.wait_for(self.job is not None)
    79             if self.job is not None:
    77             self.job.do()
    80                 self.job.do()
    78             self.done.notify()
    81                 self.done.notify()
    79             
    82             else:
       
    83                 break
       
    84 
       
    85         self.mutex.release()
    80         self.mutex.release()
    86 
    81 
    87     def interleave(self, waker, stopper, *args, **kwargs):
    82     def interleave(self, waker, stopper, *args, **kwargs):
    88         """
    83         """
    89         as for twisted reactor's interleave, it passes all jobs to waker func
    84         as for twisted reactor's interleave, it passes all jobs to waker func
    90         additionaly, it creates a new thread to wait for new job.
    85         additionaly, it creates a new thread to wait for new job.
    91         """
    86         """
    92         self.feed = Condition(self.mutex)
    87         self.feed = Condition(self.mutex)
    93         self._threadID = _thread.get_ident()
    88         self._threadID = get_ident()
    94         self.stopper = stopper
    89         self.stopper = stopper
       
    90 
       
    91         def do_pending_job():
       
    92             self.mutex.acquire()
       
    93             if self.job is not None:
       
    94                 self.job.do()
       
    95                 self.done.notify_all()
       
    96             self.mutex.release()
    95 
    97 
    96         def wakerfeedingloop():
    98         def wakerfeedingloop():
    97             self.mutex.acquire()
    99             self.mutex.acquire()
    98             self.enabled = True
   100             self.enabled = True
       
   101 
       
   102             # Handle first job
    99             if args or kwargs:
   103             if args or kwargs:
   100                 def first_job_todo():
   104                 self.job = job(*args, **kwargs)
   101                     _job = job(*args, **kwargs)
   105                 waker(do_pending_job)
   102                     _job.do()
   106                 self.done.wait_for(lambda: self.job.success is not None)
   103                     if not _job.success:
   107                 if not self.job.success:
   104                         self.reraise(_job)
   108                     self.reraise(_job)
   105                     self.mutex.acquire()
   109                 self.job = None
   106                     self.feed.notify()
   110 
   107                     self.mutex.release()
   111             self.free.notify()
   108                 waker(first_job_todo)
       
   109                 self.feed.wait()
       
   110 
   112 
   111             while not self._finish:
   113             while not self._finish:
   112                 self.todo.wait()
   114                 self.todo.wait_for(lambda: self.job is not None)
   113                 def job_todo():
       
   114                     self.mutex.acquire()
       
   115                     if self.job is not None:
       
   116                         self.job.do()
       
   117                         self.feed.notify()
       
   118                         self.done.notify()
       
   119                     self.mutex.release()
       
   120                 if self._finish:
   115                 if self._finish:
   121                     break
   116                     break
   122                 waker(job_todo)
   117                 waker(do_pending_job)
   123                 self.feed.wait()
   118                 self.done.wait_for(lambda: self.job.success is not None)
       
   119                 self.job = None
       
   120                 self.free.notify()
   124 
   121 
   125             self.mutex.release()
   122             self.mutex.release()
       
   123 
   126         self.own_thread = Thread(target = wakerfeedingloop)
   124         self.own_thread = Thread(target = wakerfeedingloop)
   127         self.own_thread.start()
   125         self.own_thread.start()
   128 
   126 
   129     def stop(self):
   127     def stop(self):
   130         """
   128         """
   133         self.mutex.acquire()
   131         self.mutex.acquire()
   134         self._finish = True
   132         self._finish = True
   135         self.enabled = False
   133         self.enabled = False
   136         self.job = None
   134         self.job = None
   137         self.todo.notify()
   135         self.todo.notify()
   138         self.done.notify()
   136         self.done.notify_all()
   139         self.mutex.release()
   137         self.mutex.release()
   140         self.own_thread.join()
   138         self.own_thread.join()
   141 
   139 
   142     def call(self, *args, **kwargs):
   140     def call(self, *args, **kwargs):
   143         """
   141         """
   147         blocking until job done
   145         blocking until job done
   148         """
   146         """
   149 
   147 
   150         _job = job(*args, **kwargs)
   148         _job = job(*args, **kwargs)
   151 
   149 
   152         if self._threadID == _thread.get_ident():
   150         if self._threadID == get_ident():
   153             # if caller is worker thread execute immediately
   151             # if caller is worker thread execute immediately
   154             _job.do()
   152             _job.do()
   155         else:
   153         else:
   156             # otherwise notify and wait for completion
   154             # otherwise notify and wait for completion
   157             self.mutex.acquire()
   155             self.mutex.acquire()
   158             if not self.enabled:
   156             if not self.enabled:
   159                 self.mutex.release()
   157                 self.mutex.release()
   160                 raise EOFError("Worker is disabled")
   158                 raise EOFError("Worker is disabled")
   161 
   159 
   162             while self.job is not None:
   160             self.free.wait_for(lambda: self.job is None)
   163                 self.free.wait()
       
   164 
   161 
   165             self.job = _job
   162             self.job = _job
   166             self.todo.notify()
   163             self.todo.notify()
   167             self.done.wait()
   164             self.done.wait_for(lambda: _job.success is not None)
   168             self.job = None
       
   169             self.free.notify()
   165             self.free.notify()
   170             self.mutex.release()
   166             self.mutex.release()
   171 
   167 
   172         if _job.success is None:
   168         if _job.success is None:
   173             raise EOFError("Worker job was interrupted")
   169             raise EOFError("Worker job was interrupted")