Edouard@2270: #!/usr/bin/env python Edouard@2270: # -*- coding: utf-8 -*- Edouard@2270: Edouard@2270: # This file is part of Beremiz runtime. Edouard@2270: # Edouard@2270: # Copyright (C) 2018: Edouard TISSERANT Edouard@2270: # Edouard@2270: # See COPYING.Runtime file for copyrights details. Edouard@2270: Edouard@2270: from __future__ import absolute_import Edouard@2307: import sys Edouard@2463: import six Edouard@2270: import thread Edouard@2270: from threading import Lock, Condition Edouard@2270: edouard@2309: Edouard@2270: class job(object): Edouard@2270: """ Edouard@2270: job to be executed by a worker Edouard@2270: """ Edouard@2270: def __init__(self, call, *args, **kwargs): Edouard@2270: self.job = (call, args, kwargs) Edouard@2270: self.result = None Edouard@2270: self.success = False Edouard@2270: self.exc_info = None Edouard@2270: Edouard@2270: def do(self): Edouard@2270: """ Edouard@2270: do the job by executing the call, and deal with exceptions Edouard@2270: """ Edouard@2270: try: Edouard@2270: call, args, kwargs = self.job Edouard@2270: self.result = call(*args, **kwargs) Edouard@2270: self.success = True Edouard@2270: except Exception: Edouard@2270: self.success = False Edouard@2270: self.exc_info = sys.exc_info() Edouard@2270: Edouard@2270: Edouard@2270: class worker(object): Edouard@2270: """ Edouard@2270: serialize main thread load/unload of PLC shared objects Edouard@2270: """ Edouard@2270: def __init__(self): Edouard@2270: # Only one job at a time Edouard@2270: self._finish = False Edouard@2270: self._threadID = None Edouard@2270: self.mutex = Lock() Edouard@2270: self.todo = Condition(self.mutex) Edouard@2270: self.done = Condition(self.mutex) Edouard@2270: self.free = Condition(self.mutex) Edouard@2270: self.job = None Edouard@2270: Edouard@2270: def runloop(self, *args, **kwargs): Edouard@2270: """ Edouard@2270: meant to be called by worker thread (blocking) Edouard@2270: """ Edouard@2270: self._threadID = thread.get_ident() Edouard@2270: if args or kwargs: Edouard@2307: _job = job(*args, **kwargs) Edouard@2307: _job.do() Edouard@2307: if _job.success: Edouard@2307: # result is ignored Edouard@2307: pass Edouard@2307: else: Edouard@2307: raise _job.exc_info[0], _job.exc_info[1], _job.exc_info[2] Edouard@2270: self.mutex.acquire() Edouard@2270: while not self._finish: Edouard@2270: self.todo.wait() Edouard@2270: if self.job is not None: Edouard@2270: self.job.do() Edouard@2270: self.done.notify() Edouard@2270: else: Edouard@2270: self.free.notify() Edouard@2270: self.mutex.release() Edouard@2270: Edouard@2270: def call(self, *args, **kwargs): Edouard@2270: """ Edouard@2270: creates a job, execute it in worker thread, and deliver result. Edouard@2270: if job execution raise exception, re-raise same exception Edouard@2270: meant to be called by non-worker threads, but this is accepted. Edouard@2270: blocking until job done Edouard@2270: """ Edouard@2270: Edouard@2270: _job = job(*args, **kwargs) Edouard@2270: Edouard@2271: if self._threadID == thread.get_ident(): Edouard@2270: # if caller is worker thread execute immediately Edouard@2270: _job.do() Edouard@2270: else: Edouard@2270: # otherwise notify and wait for completion Edouard@2270: self.mutex.acquire() Edouard@2270: Edouard@2270: while self.job is not None: Edouard@2270: self.free.wait() Edouard@2270: Edouard@2270: self.job = _job Edouard@2270: self.todo.notify() Edouard@2270: self.done.wait() Edouard@2270: _job = self.job Edouard@2270: self.job = None Edouard@2270: self.mutex.release() Edouard@2270: Edouard@2270: if _job.success: Edouard@2270: return _job.result Edouard@2270: else: edouard@2429: exc_type = _job.exc_info[0] edouard@2429: exc_value = _job.exc_info[1] edouard@2429: exc_traceback = _job.exc_info[2] edouard@2429: six.reraise(exc_type, exc_value, exc_traceback) Edouard@2270: Edouard@2270: def quit(self): Edouard@2270: """ Edouard@2270: unblocks main thread, and terminate execution of runloop() Edouard@2270: """ Edouard@2270: # mark queue Edouard@2270: self._finish = True Edouard@2270: self.mutex.acquire() Edouard@2270: self.job = None Edouard@2270: self.todo.notify() Edouard@2270: self.mutex.release()