runtime/Worker.py
author Edouard Tisserant <edouard.tisserant@gmail.com>
Tue, 15 Nov 2022 20:43:39 +0100
branchwxPython4
changeset 3677 6d9040e07c32
parent 3642 cd3d15e8ef42
child 3750 f62625418bff
permissions -rw-r--r--
OPC-UA: only support the encryption policy selected in config.

By default open62541 client accepts all supported policies, but in makes problem
when negociating with some servers while most clients seems to only support
one policy at a time.
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# This file is part of Beremiz runtime.
#
# Copyright (C) 2018: Edouard TISSERANT
#
# See COPYING.Runtime file for copyrights details.

from __future__ import absolute_import
import sys
from threading import Lock, Condition, Thread

import six
from six.moves import _thread


class job(object):
    """
    job to be executed by a worker
    """
    def __init__(self, call, *args, **kwargs):
        self.job = (call, args, kwargs)
        self.result = None
        self.success = None
        self.exc_info = None

    def do(self):
        """
        do the job by executing the call, and deal with exceptions
        """
        try:
            call, args, kwargs = self.job
            self.result = call(*args, **kwargs)
            self.success = True
        except Exception:
            self.success = False
            self.exc_info = sys.exc_info()


class worker(object):
    """
    serialize main thread load/unload of PLC shared objects
    """
    def __init__(self):
        # Only one job at a time
        self._finish = False
        self._threadID = None
        self.mutex = Lock()
        self.todo = Condition(self.mutex)
        self.done = Condition(self.mutex)
        self.free = Condition(self.mutex)
        self.job = None
        self.enabled = False
        self.stopper = None
        self.own_thread = None

    def reraise(self, job):
        """
        reraise exception happend in a job
        @param job: job where original exception happend
        """
        exc_type = job.exc_info[0]
        exc_value = job.exc_info[1]
        exc_traceback = job.exc_info[2]
        six.reraise(exc_type, exc_value, exc_traceback)

    def runloop(self, *args, **kwargs):
        """
        meant to be called by worker thread (blocking)
        """
        self._threadID = _thread.get_ident()
        self.mutex.acquire()
        self.enabled = True
        if args or kwargs:
            _job = job(*args, **kwargs)
            _job.do()
            # _job.success can't be None after do()
            if not _job.success:
                self.reraise(_job)

        while not self._finish:
            self.todo.wait()
            if self.job is not None:
                self.job.do()
                self.done.notify()
            else:
                break

        self.mutex.release()

    def interleave(self, waker, stopper, *args, **kwargs):
        """
        as for twisted reactor's interleave, it passes all jobs to waker func
        additionaly, it creates a new thread to wait for new job.
        """
        self.feed = Condition(self.mutex)
        self._threadID = _thread.get_ident()
        self.stopper = stopper

        def wakerfeedingloop():
            self.mutex.acquire()
            self.enabled = True
            if args or kwargs:
                def first_job_todo():
                    _job = job(*args, **kwargs)
                    _job.do()
                    if not _job.success:
                        self.reraise(_job)
                    self.mutex.acquire()
                    self.feed.notify()
                    self.mutex.release()
                waker(first_job_todo)
                self.feed.wait()

            while not self._finish:
                self.todo.wait()
                def job_todo():
                    self.mutex.acquire()
                    if self.job is not None:
                        self.job.do()
                        self.feed.notify()
                        self.done.notify()
                    self.mutex.release()
                if self._finish:
                    break
                waker(job_todo)
                self.feed.wait()

            self.mutex.release()
        self.own_thread = Thread(target = wakerfeedingloop)
        self.own_thread.start()

    def stop(self):
        """
        !interleave
        """
        self.mutex.acquire()
        self._finish = True
        self.enabled = False
        self.job = None
        self.todo.notify()
        self.done.notify()
        self.mutex.release()
        self.own_thread.join()

    def call(self, *args, **kwargs):
        """
        creates a job, execute it in worker thread, and deliver result.
        if job execution raise exception, re-raise same exception
        meant to be called by non-worker threads, but this is accepted.
        blocking until job done
        """

        _job = job(*args, **kwargs)

        if self._threadID == _thread.get_ident():
            # if caller is worker thread execute immediately
            _job.do()
        else:
            # otherwise notify and wait for completion
            self.mutex.acquire()
            if not self.enabled:
                self.mutex.release()
                raise EOFError("Worker is disabled")

            while self.job is not None:
                self.free.wait()

            self.job = _job
            self.todo.notify()
            self.done.wait()
            self.job = None
            self.free.notify()
            self.mutex.release()

        if _job.success is None:
            raise EOFError("Worker job was interrupted")

        if _job.success:
            return _job.result
        else:
            self.reraise(_job)

    def quit(self):
        """
        unblocks main thread, and terminate execution of runloop()
        """
        # mark queue
        self._finish = True
        self.mutex.acquire()
        self.enabled = False
        self.job = None
        self.todo.notify()
        self.done.notify()
        self.mutex.release()

    def finish(self):
        if self.own_thread is None:
            self.quit()
        if self.stopper is not None:
            self.stopper()