runtime/Worker.py
changeset 2550 f2af2a655868
parent 2537 eb4a4cc41914
child 2604 c8a25a3a7f8b
equal deleted inserted replaced
2530:02d09fc6eb90 2550:f2af2a655868
     7 #
     7 #
     8 # See COPYING.Runtime file for copyrights details.
     8 # See COPYING.Runtime file for copyrights details.
     9 
     9 
    10 from __future__ import absolute_import
    10 from __future__ import absolute_import
    11 import sys
    11 import sys
    12 import thread
       
    13 from threading import Lock, Condition
    12 from threading import Lock, Condition
    14 import six
    13 import six
       
    14 from six.moves import _thread
    15 
    15 
    16 
    16 
    17 class job(object):
    17 class job(object):
    18     """
    18     """
    19     job to be executed by a worker
    19     job to be executed by a worker
    49         self.todo = Condition(self.mutex)
    49         self.todo = Condition(self.mutex)
    50         self.done = Condition(self.mutex)
    50         self.done = Condition(self.mutex)
    51         self.free = Condition(self.mutex)
    51         self.free = Condition(self.mutex)
    52         self.job = None
    52         self.job = None
    53 
    53 
       
    54     def reraise(self, job):
       
    55         """
       
    56         reraise exception happend in a job
       
    57         @param job: job where original exception happend
       
    58         """
       
    59         exc_type = job.exc_info[0]
       
    60         exc_value = job.exc_info[1]
       
    61         exc_traceback = job.exc_info[2]
       
    62         six.reraise(exc_type, exc_value, exc_traceback)
       
    63 
    54     def runloop(self, *args, **kwargs):
    64     def runloop(self, *args, **kwargs):
    55         """
    65         """
    56         meant to be called by worker thread (blocking)
    66         meant to be called by worker thread (blocking)
    57         """
    67         """
    58         self._threadID = thread.get_ident()
    68         self._threadID = _thread.get_ident()
    59         self.mutex.acquire()
    69         self.mutex.acquire()
    60         if args or kwargs:
    70         if args or kwargs:
    61             _job = job(*args, **kwargs)
    71             _job = job(*args, **kwargs)
    62             _job.do()
    72             _job.do()
    63             if _job.success:
    73             if not _job.success:
    64                 # result is ignored
    74                 self.reraise(_job)
    65                 pass
       
    66             else:
       
    67                 raise _job.exc_info[0], _job.exc_info[1], _job.exc_info[2]
       
    68 
    75 
    69         while not self._finish:
    76         while not self._finish:
    70             self.todo.wait()
    77             self.todo.wait()
    71             if self.job is not None:
    78             if self.job is not None:
    72                 self.job.do()
    79                 self.job.do()
    84         blocking until job done
    91         blocking until job done
    85         """
    92         """
    86 
    93 
    87         _job = job(*args, **kwargs)
    94         _job = job(*args, **kwargs)
    88 
    95 
    89         if self._threadID == thread.get_ident():
    96         if self._threadID == _thread.get_ident():
    90             # if caller is worker thread execute immediately
    97             # if caller is worker thread execute immediately
    91             _job.do()
    98             _job.do()
    92         else:
    99         else:
    93             # otherwise notify and wait for completion
   100             # otherwise notify and wait for completion
    94             self.mutex.acquire()
   101             self.mutex.acquire()
   104             self.mutex.release()
   111             self.mutex.release()
   105 
   112 
   106         if _job.success:
   113         if _job.success:
   107             return _job.result
   114             return _job.result
   108         else:
   115         else:
   109             exc_type = _job.exc_info[0]
   116             self.reraise(_job)
   110             exc_value = _job.exc_info[1]
       
   111             exc_traceback = _job.exc_info[2]
       
   112             six.reraise(exc_type, exc_value, exc_traceback)
       
   113 
   117 
   114     def quit(self):
   118     def quit(self):
   115         """
   119         """
   116         unblocks main thread, and terminate execution of runloop()
   120         unblocks main thread, and terminate execution of runloop()
   117         """
   121         """