Edouard@2270: #!/usr/bin/env python
Edouard@2270: # -*- coding: utf-8 -*-
Edouard@2270: 
Edouard@2270: # This file is part of Beremiz runtime.
Edouard@2270: #
Edouard@2270: # Copyright (C) 2018: Edouard TISSERANT
Edouard@2270: #
Edouard@2270: # See COPYING.Runtime file for copyrights details.
Edouard@2270: 
Edouard@2270: from __future__ import absolute_import
Edouard@2307: import sys
Edouard@2270: from threading import Lock, Condition
edouard@2492: import six
andrej@2537: from six.moves import _thread
Edouard@2270: 
edouard@2309: 
Edouard@2270: class job(object):
Edouard@2270:     """
Edouard@2270:     job to be executed by a worker
Edouard@2270:     """
Edouard@2270:     def __init__(self, call, *args, **kwargs):
Edouard@2270:         self.job = (call, args, kwargs)
Edouard@2270:         self.result = None
Edouard@2604:         self.success = None
Edouard@2270:         self.exc_info = None
Edouard@2270: 
Edouard@2270:     def do(self):
Edouard@2270:         """
Edouard@2270:         do the job by executing the call, and deal with exceptions
Edouard@2270:         """
Edouard@2270:         try:
Edouard@2270:             call, args, kwargs = self.job
Edouard@2270:             self.result = call(*args, **kwargs)
Edouard@2270:             self.success = True
Edouard@2270:         except Exception:
Edouard@2270:             self.success = False
Edouard@2270:             self.exc_info = sys.exc_info()
Edouard@2270: 
Edouard@2270: 
Edouard@2270: class worker(object):
Edouard@2270:     """
Edouard@2270:     serialize main thread load/unload of PLC shared objects
Edouard@2270:     """
Edouard@2270:     def __init__(self):
Edouard@2270:         # Only one job at a time
Edouard@2270:         self._finish = False
Edouard@2270:         self._threadID = None
Edouard@2270:         self.mutex = Lock()
Edouard@2270:         self.todo = Condition(self.mutex)
Edouard@2270:         self.done = Condition(self.mutex)
Edouard@2270:         self.free = Condition(self.mutex)
Edouard@2270:         self.job = None
Edouard@2611:         self.enabled = False
Edouard@2270: 
andrej@2536:     def reraise(self, job):
andrej@2536:         """
andrej@2536:         reraise exception happend in a job
andrej@2536:         @param job: job where original exception happend
andrej@2536:         """
andrej@2536:         exc_type = job.exc_info[0]
andrej@2536:         exc_value = job.exc_info[1]
andrej@2536:         exc_traceback = job.exc_info[2]
andrej@2536:         six.reraise(exc_type, exc_value, exc_traceback)
andrej@2536: 
Edouard@2270:     def runloop(self, *args, **kwargs):
Edouard@2270:         """
Edouard@2270:         meant to be called by worker thread (blocking)
Edouard@2270:         """
andrej@2537:         self._threadID = _thread.get_ident()
Edouard@2467:         self.mutex.acquire()
Edouard@2604:         self.enabled = True
Edouard@2270:         if args or kwargs:
Edouard@2307:             _job = job(*args, **kwargs)
Edouard@2307:             _job.do()
Edouard@2604:             # _job.success can't be None after do()
andrej@2536:             if not _job.success:
andrej@2536:                 self.reraise(_job)
Edouard@2486: 
Edouard@2270:         while not self._finish:
Edouard@2270:             self.todo.wait()
Edouard@2270:             if self.job is not None:
Edouard@2270:                 self.job.do()
Edouard@2270:                 self.done.notify()
Edouard@2270:             else:
Edouard@2486:                 break
Edouard@2486: 
Edouard@2270:         self.mutex.release()
Edouard@2270: 
Edouard@2270:     def call(self, *args, **kwargs):
Edouard@2270:         """
Edouard@2270:         creates a job, execute it in worker thread, and deliver result.
Edouard@2270:         if job execution raise exception, re-raise same exception
Edouard@2270:         meant to be called by non-worker threads, but this is accepted.
Edouard@2270:         blocking until job done
Edouard@2270:         """
Edouard@2270: 
Edouard@2270:         _job = job(*args, **kwargs)
Edouard@2270: 
andrej@2537:         if self._threadID == _thread.get_ident():
Edouard@2270:             # if caller is worker thread execute immediately
Edouard@2270:             _job.do()
Edouard@2270:         else:
Edouard@2270:             # otherwise notify and wait for completion
Edouard@2270:             self.mutex.acquire()
Edouard@2604:             if not self.enabled:
Edouard@2604:                 self.mutex.release()
Edouard@2604:                 raise EOFError("Worker is disabled")
Edouard@2270: 
Edouard@2270:             while self.job is not None:
Edouard@2270:                 self.free.wait()
Edouard@2270: 
Edouard@2270:             self.job = _job
Edouard@2270:             self.todo.notify()
Edouard@2270:             self.done.wait()
Edouard@2270:             self.job = None
Edouard@2486:             self.free.notify()
Edouard@2270:             self.mutex.release()
Edouard@2270: 
Edouard@2604:         if _job.success is None:
Edouard@2604:             raise EOFError("Worker job was interrupted")
Edouard@2604: 
Edouard@2270:         if _job.success:
Edouard@2270:             return _job.result
Edouard@2270:         else:
andrej@2536:             self.reraise(_job)
Edouard@2270: 
Edouard@2270:     def quit(self):
Edouard@2270:         """
Edouard@2270:         unblocks main thread, and terminate execution of runloop()
Edouard@2270:         """
Edouard@2270:         # mark queue
Edouard@2270:         self._finish = True
Edouard@2270:         self.mutex.acquire()
Edouard@2604:         self.enabled = False
Edouard@2270:         self.job = None
Edouard@2270:         self.todo.notify()
Edouard@2604:         self.done.notify()
Edouard@2270:         self.mutex.release()