runtime/Worker.py
changeset 2270 d9175daf6522
child 2271 985973ed701b
equal deleted inserted replaced
2269:2e38b5ec4753 2270:d9175daf6522
       
     1 #!/usr/bin/env python
       
     2 # -*- coding: utf-8 -*-
       
     3 
       
     4 # This file is part of Beremiz runtime.
       
     5 #
       
     6 # Copyright (C) 2018: Edouard TISSERANT
       
     7 #
       
     8 # See COPYING.Runtime file for copyrights details.
       
     9 
       
    10 from __future__ import absolute_import
       
    11 import thread
       
    12 from threading import Lock, Condition
       
    13 
       
    14 class job(object):
       
    15     """
       
    16     job to be executed by a worker
       
    17     """
       
    18     def __init__(self, call, *args, **kwargs):
       
    19         self.job = (call, args, kwargs)
       
    20         self.result = None
       
    21         self.success = False
       
    22         self.exc_info = None
       
    23 
       
    24     def do(self):
       
    25         """
       
    26         do the job by executing the call, and deal with exceptions
       
    27         """
       
    28         try:
       
    29             call, args, kwargs = self.job
       
    30             self.result = call(*args, **kwargs)
       
    31             self.success = True
       
    32         except Exception:
       
    33             self.success = False
       
    34             self.exc_info = sys.exc_info()
       
    35 
       
    36 
       
    37 class worker(object):
       
    38     """
       
    39     serialize main thread load/unload of PLC shared objects
       
    40     """
       
    41     def __init__(self):
       
    42         # Only one job at a time
       
    43         self._finish = False
       
    44         self._threadID = None
       
    45         self.mutex = Lock()
       
    46         self.todo = Condition(self.mutex)
       
    47         self.done = Condition(self.mutex)
       
    48         self.free = Condition(self.mutex)
       
    49         self.job = None
       
    50 
       
    51     def runloop(self, *args, **kwargs):
       
    52         """
       
    53         meant to be called by worker thread (blocking)
       
    54         """
       
    55         self._threadID = thread.get_ident()
       
    56         if args or kwargs:
       
    57             job(*args, **kwargs).do()
       
    58             # result is ignored
       
    59         self.mutex.acquire()
       
    60         while not self._finish:
       
    61             self.todo.wait()
       
    62             if self.job is not None:
       
    63                 self.job.do()
       
    64                 self.done.notify()
       
    65             else:
       
    66                 self.free.notify()
       
    67         self.mutex.release()
       
    68 
       
    69     def call(self, *args, **kwargs):
       
    70         """
       
    71         creates a job, execute it in worker thread, and deliver result.
       
    72         if job execution raise exception, re-raise same exception
       
    73         meant to be called by non-worker threads, but this is accepted.
       
    74         blocking until job done
       
    75         """
       
    76 
       
    77         _job = job(*args, **kwargs)
       
    78 
       
    79         if self._threadID == thread.get_ident() or self._threadID is None:
       
    80             # if caller is worker thread execute immediately
       
    81             _job.do()
       
    82         else:
       
    83             # otherwise notify and wait for completion
       
    84             self.mutex.acquire()
       
    85 
       
    86             while self.job is not None:
       
    87                 self.free.wait()
       
    88 
       
    89             self.job = _job
       
    90             self.todo.notify()
       
    91             self.done.wait()
       
    92             _job = self.job
       
    93             self.job = None
       
    94             self.mutex.release()
       
    95 
       
    96         if _job.success:
       
    97             return _job.result
       
    98         else:
       
    99             raise _job.exc_info[0], _job.exc_info[1], _job.exc_info[2]
       
   100 
       
   101     def quit(self):
       
   102         """
       
   103         unblocks main thread, and terminate execution of runloop()
       
   104         """
       
   105         # mark queue
       
   106         self._finish = True
       
   107         self.mutex.acquire()
       
   108         self.job = None
       
   109         self.todo.notify()
       
   110         self.mutex.release()