Edouard@2270: #!/usr/bin/env python
Edouard@2270: # -*- coding: utf-8 -*-
Edouard@2270: 
Edouard@2270: # This file is part of Beremiz runtime.
Edouard@2270: #
Edouard@2270: # Copyright (C) 2018: Edouard TISSERANT
Edouard@2270: #
Edouard@2270: # See COPYING.Runtime file for copyrights details.
Edouard@2270: 
kinsamanka@3750: 
edouard@3846: from threading import Lock, Condition, Thread, get_ident
Edouard@2270: 
edouard@2309: 
Edouard@2270: class job(object):
Edouard@2270:     """
Edouard@2270:     job to be executed by a worker
Edouard@2270:     """
Edouard@2270:     def __init__(self, call, *args, **kwargs):
Edouard@2270:         self.job = (call, args, kwargs)
Edouard@2270:         self.result = None
Edouard@2604:         self.success = None
Edouard@2270:         self.exc_info = None
Edouard@2270: 
Edouard@2270:     def do(self):
Edouard@2270:         """
Edouard@2270:         do the job by executing the call, and deal with exceptions
Edouard@2270:         """
Edouard@2270:         try:
Edouard@2270:             call, args, kwargs = self.job
Edouard@2270:             self.result = call(*args, **kwargs)
Edouard@2270:             self.success = True
kinsamanka@3753:         except Exception as e:
Edouard@2270:             self.success = False
kinsamanka@3753:             self.exc_info = e
Edouard@2270: 
Edouard@2270: 
Edouard@2270: class worker(object):
Edouard@2270:     """
Edouard@2270:     serialize main thread load/unload of PLC shared objects
Edouard@2270:     """
Edouard@2270:     def __init__(self):
Edouard@2270:         # Only one job at a time
Edouard@2270:         self._finish = False
Edouard@2270:         self._threadID = None
Edouard@2270:         self.mutex = Lock()
Edouard@2270:         self.todo = Condition(self.mutex)
Edouard@2270:         self.done = Condition(self.mutex)
Edouard@2270:         self.free = Condition(self.mutex)
Edouard@2270:         self.job = None
Edouard@2611:         self.enabled = False
edouard@3642:         self.stopper = None
edouard@3642:         self.own_thread = None
Edouard@2270: 
andrej@2536:     def reraise(self, job):
andrej@2536:         """
andrej@2536:         reraise exception happend in a job
andrej@2536:         @param job: job where original exception happend
andrej@2536:         """
kinsamanka@3753:         raise job.exc_info
andrej@2536: 
Edouard@2270:     def runloop(self, *args, **kwargs):
Edouard@2270:         """
Edouard@2270:         meant to be called by worker thread (blocking)
Edouard@2270:         """
edouard@3846:         self._threadID = get_ident()
Edouard@2467:         self.mutex.acquire()
Edouard@2604:         self.enabled = True
Edouard@2270:         if args or kwargs:
edouard@3851:             self.job = job(*args, **kwargs)
edouard@3851:             self.job.do()
edouard@3851:             # fail if first job fails
edouard@3851:             if not self.job.success:
edouard@3851:                 self.reraise(self.job)
edouard@3851:             self.job = None
edouard@3851: 
edouard@3851:         self.free.notify()
Edouard@2486: 
Edouard@2270:         while not self._finish:
edouard@3849:             self.todo.wait_for(lambda: self.job is not None)
edouard@3846:             self.job.do()
edouard@3846:             self.done.notify()
edouard@3851:             self.job = None
edouard@3851:             self.free.notify()
edouard@3846:             
Edouard@2270:         self.mutex.release()
Edouard@2270: 
edouard@3642:     def interleave(self, waker, stopper, *args, **kwargs):
edouard@3584:         """
edouard@3584:         as for twisted reactor's interleave, it passes all jobs to waker func
edouard@3584:         additionaly, it creates a new thread to wait for new job.
edouard@3584:         """
edouard@3584:         self.feed = Condition(self.mutex)
edouard@3846:         self._threadID = get_ident()
edouard@3642:         self.stopper = stopper
edouard@3584: 
edouard@3846:         def do_pending_job():
edouard@3846:             self.mutex.acquire()
edouard@3851:             self.job.do()
edouard@3851:             self.done.notify_all()
edouard@3846:             self.mutex.release()
edouard@3846: 
edouard@3584:         def wakerfeedingloop():
edouard@3584:             self.mutex.acquire()
edouard@3584:             self.enabled = True
edouard@3846: 
edouard@3846:             # Handle first job
edouard@3584:             if args or kwargs:
edouard@3846:                 self.job = job(*args, **kwargs)
edouard@3846:                 waker(do_pending_job)
edouard@3846:                 self.done.wait_for(lambda: self.job.success is not None)
edouard@3851:                 # fail if first job fails
edouard@3846:                 if not self.job.success:
edouard@3851:                     self.reraise(self.job)
edouard@3846:                 self.job = None
edouard@3846: 
edouard@3846:             self.free.notify()
edouard@3584: 
edouard@3584:             while not self._finish:
edouard@3846:                 self.todo.wait_for(lambda: self.job is not None)
edouard@3642:                 if self._finish:
edouard@3642:                     break
edouard@3846:                 waker(do_pending_job)
edouard@3846:                 self.done.wait_for(lambda: self.job.success is not None)
edouard@3846:                 self.job = None
edouard@3846:                 self.free.notify()
edouard@3584: 
edouard@3584:             self.mutex.release()
edouard@3846: 
edouard@3585:         self.own_thread = Thread(target = wakerfeedingloop)
edouard@3585:         self.own_thread.start()
edouard@3584: 
edouard@3585:     def stop(self):
edouard@3584:         """
edouard@3584:         !interleave
edouard@3584:         """
edouard@3584:         self.mutex.acquire()
edouard@3584:         self._finish = True
edouard@3584:         self.enabled = False
edouard@3584:         self.job = None
edouard@3584:         self.todo.notify()
edouard@3846:         self.done.notify_all()
edouard@3584:         self.mutex.release()
edouard@3584:         self.own_thread.join()
edouard@3584: 
Edouard@2270:     def call(self, *args, **kwargs):
Edouard@2270:         """
Edouard@2270:         creates a job, execute it in worker thread, and deliver result.
Edouard@2270:         if job execution raise exception, re-raise same exception
Edouard@2270:         meant to be called by non-worker threads, but this is accepted.
Edouard@2270:         blocking until job done
Edouard@2270:         """
Edouard@2270: 
Edouard@2270:         _job = job(*args, **kwargs)
Edouard@2270: 
edouard@3846:         if self._threadID == get_ident():
Edouard@2270:             # if caller is worker thread execute immediately
Edouard@2270:             _job.do()
Edouard@2270:         else:
Edouard@2270:             # otherwise notify and wait for completion
Edouard@2270:             self.mutex.acquire()
Edouard@2604:             if not self.enabled:
Edouard@2604:                 self.mutex.release()
Edouard@2604:                 raise EOFError("Worker is disabled")
Edouard@2270: 
edouard@3846:             self.free.wait_for(lambda: self.job is None)
Edouard@2270: 
Edouard@2270:             self.job = _job
Edouard@2270:             self.todo.notify()
edouard@3846:             self.done.wait_for(lambda: _job.success is not None)
Edouard@2486:             self.free.notify()
Edouard@2270:             self.mutex.release()
Edouard@2270: 
Edouard@2604:         if _job.success is None:
Edouard@2604:             raise EOFError("Worker job was interrupted")
Edouard@2604: 
Edouard@2270:         if _job.success:
Edouard@2270:             return _job.result
Edouard@2270:         else:
andrej@2536:             self.reraise(_job)
Edouard@2270: 
Edouard@2270:     def quit(self):
Edouard@2270:         """
Edouard@2270:         unblocks main thread, and terminate execution of runloop()
Edouard@2270:         """
Edouard@2270:         # mark queue
Edouard@2270:         self._finish = True
Edouard@2270:         self.mutex.acquire()
Edouard@2604:         self.enabled = False
Edouard@2270:         self.job = None
Edouard@2270:         self.todo.notify()
Edouard@2604:         self.done.notify()
Edouard@2270:         self.mutex.release()
edouard@3642: 
edouard@3642:     def finish(self):
edouard@3642:         if self.own_thread is None:
edouard@3642:             self.quit()
edouard@3642:         if self.stopper is not None:
edouard@3642:             self.stopper()