Edouard@2270: #!/usr/bin/env python
Edouard@2270: # -*- coding: utf-8 -*-
Edouard@2270: 
Edouard@2270: # This file is part of Beremiz runtime.
Edouard@2270: #
Edouard@2270: # Copyright (C) 2018: Edouard TISSERANT
Edouard@2270: #
Edouard@2270: # See COPYING.Runtime file for copyrights details.
Edouard@2270: 
Edouard@2270: from __future__ import absolute_import
Edouard@2307: import sys
Edouard@2270: import thread
Edouard@2270: from threading import Lock, Condition
Edouard@2270: 
edouard@2309: 
Edouard@2270: class job(object):
Edouard@2270:     """
Edouard@2270:     job to be executed by a worker
Edouard@2270:     """
Edouard@2270:     def __init__(self, call, *args, **kwargs):
Edouard@2270:         self.job = (call, args, kwargs)
Edouard@2270:         self.result = None
Edouard@2270:         self.success = False
Edouard@2270:         self.exc_info = None
Edouard@2270: 
Edouard@2270:     def do(self):
Edouard@2270:         """
Edouard@2270:         do the job by executing the call, and deal with exceptions
Edouard@2270:         """
Edouard@2270:         try:
Edouard@2270:             call, args, kwargs = self.job
Edouard@2270:             self.result = call(*args, **kwargs)
Edouard@2270:             self.success = True
Edouard@2270:         except Exception:
Edouard@2270:             self.success = False
Edouard@2270:             self.exc_info = sys.exc_info()
Edouard@2270: 
Edouard@2270: 
Edouard@2270: class worker(object):
Edouard@2270:     """
Edouard@2270:     serialize main thread load/unload of PLC shared objects
Edouard@2270:     """
Edouard@2270:     def __init__(self):
Edouard@2270:         # Only one job at a time
Edouard@2270:         self._finish = False
Edouard@2270:         self._threadID = None
Edouard@2270:         self.mutex = Lock()
Edouard@2270:         self.todo = Condition(self.mutex)
Edouard@2270:         self.done = Condition(self.mutex)
Edouard@2270:         self.free = Condition(self.mutex)
Edouard@2270:         self.job = None
Edouard@2270: 
Edouard@2270:     def runloop(self, *args, **kwargs):
Edouard@2270:         """
Edouard@2270:         meant to be called by worker thread (blocking)
Edouard@2270:         """
Edouard@2270:         self._threadID = thread.get_ident()
Edouard@2270:         if args or kwargs:
Edouard@2307:             _job = job(*args, **kwargs)
Edouard@2307:             _job.do()
Edouard@2307:             if _job.success:
Edouard@2307:                 # result is ignored
Edouard@2307:                 pass
Edouard@2307:             else:
Edouard@2307:                 raise _job.exc_info[0], _job.exc_info[1], _job.exc_info[2]
Edouard@2270:         self.mutex.acquire()
Edouard@2270:         while not self._finish:
Edouard@2270:             self.todo.wait()
Edouard@2270:             if self.job is not None:
Edouard@2270:                 self.job.do()
Edouard@2270:                 self.done.notify()
Edouard@2270:             else:
Edouard@2270:                 self.free.notify()
Edouard@2270:         self.mutex.release()
Edouard@2270: 
Edouard@2270:     def call(self, *args, **kwargs):
Edouard@2270:         """
Edouard@2270:         creates a job, execute it in worker thread, and deliver result.
Edouard@2270:         if job execution raise exception, re-raise same exception
Edouard@2270:         meant to be called by non-worker threads, but this is accepted.
Edouard@2270:         blocking until job done
Edouard@2270:         """
Edouard@2270: 
Edouard@2270:         _job = job(*args, **kwargs)
Edouard@2270: 
Edouard@2271:         if self._threadID == thread.get_ident():
Edouard@2270:             # if caller is worker thread execute immediately
Edouard@2270:             _job.do()
Edouard@2270:         else:
Edouard@2270:             # otherwise notify and wait for completion
Edouard@2270:             self.mutex.acquire()
Edouard@2270: 
Edouard@2270:             while self.job is not None:
Edouard@2270:                 self.free.wait()
Edouard@2270: 
Edouard@2270:             self.job = _job
Edouard@2270:             self.todo.notify()
Edouard@2270:             self.done.wait()
Edouard@2270:             _job = self.job
Edouard@2270:             self.job = None
Edouard@2270:             self.mutex.release()
Edouard@2270: 
Edouard@2270:         if _job.success:
Edouard@2270:             return _job.result
Edouard@2270:         else:
Edouard@2270:             raise _job.exc_info[0], _job.exc_info[1], _job.exc_info[2]
Edouard@2270: 
Edouard@2270:     def quit(self):
Edouard@2270:         """
Edouard@2270:         unblocks main thread, and terminate execution of runloop()
Edouard@2270:         """
Edouard@2270:         # mark queue
Edouard@2270:         self._finish = True
Edouard@2270:         self.mutex.acquire()
Edouard@2270:         self.job = None
Edouard@2270:         self.todo.notify()
Edouard@2270:         self.mutex.release()