controls/DebugVariablePanel/DebugVariableItem.py
author Edouard Tisserant
Wed, 01 Mar 2023 10:54:54 +0100
changeset 3740 ac0e6de439b5
parent 3598 13677d093946
child 3750 f62625418bff
permissions -rw-r--r--
Linux runtime: overrun detection for real-time timers and for plc execution.

If real-time timer wakes-up PLC thread too late (10% over period), then
warning is logged.

If PLC code (IO retreive, execution, IO publish) takes longer than requested
PLC execution cycle, then warning is logged, and CPU hoogging is mitigated
by delaying next PLC execution a few cylces more until having at least
1ms minimal idle time.
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# This file is part of Beremiz, a Integrated Development Environment for
# programming IEC 61131-3 automates supporting plcopen standard and CanFestival.
#
# Copyright (C) 2012: Edouard TISSERANT and Laurent BESSARD
#
# See COPYING file for copyrights details.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


from __future__ import absolute_import
from datetime import timedelta
import binascii
import numpy as np
from graphics.DebugDataConsumer import DebugDataConsumer, TYPE_TRANSLATOR
from controls.DebugVariablePanel.RingBuffer import RingBuffer

# -------------------------------------------------------------------------------
#                 Constant for calculate CRC for string variables
# -------------------------------------------------------------------------------

STRING_CRC_SIZE = 8
STRING_CRC_MASK = 2 ** STRING_CRC_SIZE - 1

# -------------------------------------------------------------------------------
#                          Debug Variable Item Class
# -------------------------------------------------------------------------------


class DebugVariableItem(DebugDataConsumer):
    """
    Class that implements an element that consumes debug values for PLC variable and
    stores received values for displaying them in graphic panel or table
    """

    def __init__(self, parent, variable, store_data=False):
        """
        Constructor
        @param parent: Reference to debug variable panel
        @param variable: Path of variable to debug
        """
        DebugDataConsumer.__init__(self)

        self.Parent = parent
        self.Variable = variable
        self.StoreData = store_data

        # Get Variable data type
        self.RefreshVariableType()

    def __del__(self):
        """
        Destructor
        """
        # Reset reference to debug variable panel
        self.Parent = None

    def SetVariable(self, variable):
        """
        Set path of variable
        @param variable: Path of variable to debug
        """
        if self.Parent is not None and self.Variable != variable:
            # Store variable path
            self.Variable = variable
            # Get Variable data type
            self.RefreshVariableType()

            # Refresh debug variable panel
            self.Parent.RefreshView()

    def GetVariable(self, mask=None):
        """
        Return path of variable
        @param mask: Mask to apply to variable path [var_name, '*',...]
        @return: String containing masked variable path
        """
        # Apply mask to variable name
        if mask is not None:
            # '#' correspond to parts that are different between all items

            # Extract variable path parts
            parts = self.Variable.split('.')
            # Adjust mask size to size of variable path
            mask = mask + ['*'] * max(0, len(parts) - len(mask))

            # Store previous mask
            last = None
            # Init masked variable path
            variable = ""

            for m, p in zip(mask, parts):
                # Part is not masked, add part prefixed with '.' is previous
                # wasn't masked
                if m == '*':
                    variable += ('.' if last == '*' else '') + p

                # Part is mask, add '..' if first or previous wasn't masked
                elif last is None or last == '*':
                    variable += '..'

                last = m

            return variable

        return self.Variable

    def RefreshVariableType(self):
        """
        Get and store variable data type
        """
        self.VariableType = self.Parent.GetDataType(self.Variable)
        # Reset data stored
        self.ResetData()

    def GetVariableType(self):
        """
        Return variable data type
        @return: Variable data type
        """
        return self.VariableType

    def GetData(self, start_tick=None, end_tick=None):
        """
        Return data stored contained in given range
        @param start_tick: Start tick of given range (default None, first data)
        @param end_tick: end tick of given range (default None, last data)
        @return: Data as numpy.array([(tick, value, forced),...])
        """
        # Return immediately if data empty or none
        if self.Data is None or self.Data.count == 0:
            return None

        # Find nearest data outside given range indexes
        start_idx = (self.GetNearestData(start_tick, -1)
                     if start_tick is not None
                     else 0)
        end_idx = (self.GetNearestData(end_tick, 1)
                   if end_tick is not None
                   else self.Data.count)

        # Return data between indexes
        return self.Data.view[start_idx:end_idx]

    def GetRawValue(self, index):
        """
        Return raw value at given index for string variables
        @param index: Variable value index
        @return: Variable data type
        """
        if self.VariableType in ["STRING", "WSTRING"] and index < len(self.RawData):
            return self.RawData[index][0]
        return ""

    def GetValueRange(self):
        """
        Return variable value range
        @return: (minimum_value, maximum_value)
        """
        return self.MinValue, self.MaxValue

    def GetDataAndValueRange(self, start_tick, end_tick, full_range=True):
        """
        Return variable data and value range for a given tick range
        @param start_tick: Start tick of given range (default None, first data)
        @param end_tick: end tick of given range (default None, last data)
        @param full_range: Value range is calculated on whole data (False: only
        calculated on data in given range)
        @return: (numpy.array([(tick, value, forced),...]),
                  min_value, max_value)
        """
        # Get data in given tick range
        data = self.GetData(start_tick, end_tick)

        if data is None:
            return None, None, None

        # Value range is calculated on whole data
        if full_range:
            return data, self.MinValue, self.MaxValue

        # Check that data in given range is not empty
        values = data[:, 1]
        if len(values) > 0:
            # Return value range for data in given tick range
            return (data,
                    data[np.argmin(values), 1],
                    data[np.argmax(values), 1])

        # Return default values
        return data, None, None

    def ResetData(self):
        """
        Reset data stored when store data option enabled
        """
        if self.StoreData and self.IsNumVariable():
            # Init table storing data
            self.Data = RingBuffer(3)

            # Init table storing raw data if variable is strin
            self.RawData = ([]
                            if self.VariableType in ["STRING", "WSTRING"]
                            else None)

            # Init Value range variables
            self.MinValue = None
            self.MaxValue = None

        else:
            self.Data = None
            self.MinValue = None
            self.MaxValue = None
        # Init variable value
        self.Value = ""

    def IsNumVariable(self):
        """
        Return if variable data type is numeric. String variables are
        considered as numeric (string CRC). Time variables are considered
        as number of seconds
        @return: True if data type is numeric
        """
        return (self.Parent.IsNumType(self.VariableType) or
                self.VariableType in ["STRING", "WSTRING", "TIME", "TOD", "DT", "DATE"])

    def NewValues(self, ticks, values):
        """
        Function called by debug thread when a new debug value is available
        @param tick: PLC tick when value was captured
        @param value: Value captured
        @param forced: Forced flag, True if value is forced (default: False)
        """
        DebugDataConsumer.NewValues(self, ticks[-1], values[-1], raw=None)

        if self.Data is not None:

            if self.VariableType in ["STRING", "WSTRING"]:
                last_raw_data = (self.RawData[-1]
                                 if len(self.RawData) > 0 else None)
                last_raw_data_idx = len(self.RawData) - 1

            data_values = []
            for tick, (value, forced) in zip(ticks, values):
                # Translate forced flag to float for storing in Data table
                forced_value = float(forced)

                if self.VariableType in ["STRING", "WSTRING"]:
                    # String data value is CRC
                    num_value = (binascii.crc32(value) & STRING_CRC_MASK)
                elif self.VariableType in ["TIME", "TOD", "DT", "DATE"]:
                    # Numeric value of time type variables
                    # is represented in seconds
                    num_value = float(value.total_seconds())
                else:
                    num_value = float(value)

                # Update variable range values
                self.MinValue = (min(self.MinValue, num_value)
                                 if self.MinValue is not None
                                 else num_value)
                self.MaxValue = (max(self.MaxValue, num_value)
                                 if self.MaxValue is not None
                                 else num_value)

                # In the case of string variables, we store raw string value and
                # forced flag in raw data table. Only changes in this two values
                # are stored. Index to the corresponding raw value is stored in
                # data third column
                if self.VariableType in ["STRING", "WSTRING"]:
                    raw_data = (value, forced_value)
                    if len(self.RawData) == 0 or last_raw_data != raw_data:
                        last_raw_data_idx += 1
                        last_raw_data = raw_data
                        self.RawData.append(raw_data)
                    extra_value = last_raw_data_idx

                # In other case, data third column is forced flag
                else:
                    extra_value = forced_value

                data_values.append(
                    [float(tick), num_value, extra_value])

            # Add New data to stored data table
            self.Data.append(data_values)

            # Signal to debug variable panel to refresh
            self.Parent.HasNewData = True

    def SetForced(self, forced):
        """
        Update Forced flag
        @param forced: New forced flag
        """
        # Store forced flag
        if self.Forced != forced:
            self.Forced = forced

            # Signal to debug variable panel to refresh
            self.Parent.HasNewData = True

    def SetValue(self, value):
        """
        Update value.
        @param value: New value
        """
        # Remove quote and double quote surrounding string value to get raw value
        if self.VariableType == "STRING" and value.startswith("'") and value.endswith("'") or \
           self.VariableType == "WSTRING" and value.startswith('"') and value.endswith('"'):
            value = value[1:-1]

        # Store variable value
        if self.Value != value:
            self.Value = value

            # Signal to debug variable panel to refresh
            self.Parent.HasNewData = True

    def GetValue(self, tick=None, raw=False):
        """
        Return current value or value and forced flag for tick given
        @return: Current value or value and forced flag
        """
        # If tick given and stored data option enabled
        if tick is not None and self.Data is not None:

            # Return current value and forced flag if data empty
            if self.Data.count == 0:
                return self.Value, self.IsForced()

            # Get index of nearest data from tick given
            idx = self.GetNearestData(tick, 0)

            # Get value and forced flag at given index
            value, forced = \
                self.RawData[int(self.Data.view[idx, 2])] \
                if self.VariableType in ["STRING", "WSTRING"] \
                else self.Data.view[idx, 1:3]

            if self.VariableType in ["TIME", "TOD", "DT", "DATE"]:
                value = timedelta(seconds=value)

            # Get raw value if asked
            if not raw:
                value = TYPE_TRANSLATOR.get(
                    self.VariableType, str)(value)

            return value, forced

        # Return raw value if asked
        if not raw and self.VariableType in ["STRING", "WSTRING"]:
            return TYPE_TRANSLATOR.get(
                self.VariableType, str)(self.Value)
        return self.Value

    def GetNearestData(self, tick, adjust):
        """
        Return index of nearest data from tick given
        @param tick: Tick where find nearest data
        @param adjust: Constraint for data position from tick
                       -1: older than tick
                       1:  newer than tick
                       0:  doesn't matter
        @return: Index of nearest data
        """
        # Return immediately if data is empty
        if self.Data is None:
            return None

        # Extract data ticks
        ticks = self.Data.view[:, 0]

        # Get nearest data from tick
        idx = min(np.searchsorted(ticks, tick), self.Data.count - 1)

        # Adjust data index according to constraint
        if adjust < 0 and ticks[idx] > tick and idx > 0 or \
           adjust > 0 and ticks[idx] < tick and idx < len(ticks):
            idx += adjust

        return idx