Edouard@2270: #!/usr/bin/env python Edouard@2270: # -*- coding: utf-8 -*- Edouard@2270: Edouard@2270: # This file is part of Beremiz runtime. Edouard@2270: # Edouard@2270: # Copyright (C) 2018: Edouard TISSERANT Edouard@2270: # Edouard@2270: # See COPYING.Runtime file for copyrights details. Edouard@2270: Edouard@2270: from __future__ import absolute_import Edouard@2307: import sys Edouard@2270: from threading import Lock, Condition edouard@2492: import six andrej@2537: from six.moves import _thread Edouard@2270: edouard@2309: Edouard@2270: class job(object): Edouard@2270: """ Edouard@2270: job to be executed by a worker Edouard@2270: """ Edouard@2270: def __init__(self, call, *args, **kwargs): Edouard@2270: self.job = (call, args, kwargs) Edouard@2270: self.result = None Edouard@2604: self.success = None Edouard@2270: self.exc_info = None Edouard@2270: Edouard@2270: def do(self): Edouard@2270: """ Edouard@2270: do the job by executing the call, and deal with exceptions Edouard@2270: """ Edouard@2270: try: Edouard@2270: call, args, kwargs = self.job Edouard@2270: self.result = call(*args, **kwargs) Edouard@2270: self.success = True Edouard@2270: except Exception: Edouard@2270: self.success = False Edouard@2270: self.exc_info = sys.exc_info() Edouard@2270: Edouard@2270: Edouard@2270: class worker(object): Edouard@2270: """ Edouard@2270: serialize main thread load/unload of PLC shared objects Edouard@2270: """ Edouard@2270: def __init__(self): Edouard@2270: # Only one job at a time Edouard@2270: self._finish = False Edouard@2270: self._threadID = None Edouard@2270: self.mutex = Lock() Edouard@2270: self.todo = Condition(self.mutex) Edouard@2270: self.done = Condition(self.mutex) Edouard@2270: self.free = Condition(self.mutex) Edouard@2270: self.job = None Edouard@2611: self.enabled = False Edouard@2270: andrej@2536: def reraise(self, job): andrej@2536: """ andrej@2536: reraise exception happend in a job andrej@2536: @param job: job where original exception happend andrej@2536: """ andrej@2536: exc_type = job.exc_info[0] andrej@2536: exc_value = job.exc_info[1] andrej@2536: exc_traceback = job.exc_info[2] andrej@2536: six.reraise(exc_type, exc_value, exc_traceback) andrej@2536: Edouard@2270: def runloop(self, *args, **kwargs): Edouard@2270: """ Edouard@2270: meant to be called by worker thread (blocking) Edouard@2270: """ andrej@2537: self._threadID = _thread.get_ident() Edouard@2467: self.mutex.acquire() Edouard@2604: self.enabled = True Edouard@2270: if args or kwargs: Edouard@2307: _job = job(*args, **kwargs) Edouard@2307: _job.do() Edouard@2604: # _job.success can't be None after do() andrej@2536: if not _job.success: andrej@2536: self.reraise(_job) Edouard@2486: Edouard@2270: while not self._finish: Edouard@2270: self.todo.wait() Edouard@2270: if self.job is not None: Edouard@2270: self.job.do() Edouard@2270: self.done.notify() Edouard@2270: else: Edouard@2486: break Edouard@2486: Edouard@2270: self.mutex.release() Edouard@2270: Edouard@2270: def call(self, *args, **kwargs): Edouard@2270: """ Edouard@2270: creates a job, execute it in worker thread, and deliver result. Edouard@2270: if job execution raise exception, re-raise same exception Edouard@2270: meant to be called by non-worker threads, but this is accepted. Edouard@2270: blocking until job done Edouard@2270: """ Edouard@2270: Edouard@2270: _job = job(*args, **kwargs) Edouard@2270: andrej@2537: if self._threadID == _thread.get_ident(): Edouard@2270: # if caller is worker thread execute immediately Edouard@2270: _job.do() Edouard@2270: else: Edouard@2270: # otherwise notify and wait for completion Edouard@2270: self.mutex.acquire() Edouard@2604: if not self.enabled: Edouard@2604: self.mutex.release() Edouard@2604: raise EOFError("Worker is disabled") Edouard@2270: Edouard@2270: while self.job is not None: Edouard@2270: self.free.wait() Edouard@2270: Edouard@2270: self.job = _job Edouard@2270: self.todo.notify() Edouard@2270: self.done.wait() Edouard@2270: self.job = None Edouard@2486: self.free.notify() Edouard@2270: self.mutex.release() Edouard@2270: Edouard@2604: if _job.success is None: Edouard@2604: raise EOFError("Worker job was interrupted") Edouard@2604: Edouard@2270: if _job.success: Edouard@2270: return _job.result Edouard@2270: else: andrej@2536: self.reraise(_job) Edouard@2270: Edouard@2270: def quit(self): Edouard@2270: """ Edouard@2270: unblocks main thread, and terminate execution of runloop() Edouard@2270: """ Edouard@2270: # mark queue Edouard@2270: self._finish = True Edouard@2270: self.mutex.acquire() Edouard@2604: self.enabled = False Edouard@2270: self.job = None Edouard@2270: self.todo.notify() Edouard@2604: self.done.notify() Edouard@2270: self.mutex.release()