etherlab/runtime_etherlab.py
author Mario de Sousa <msousa@fe.up.pt>
Thu, 28 May 2020 10:54:48 +0100
changeset 2647 990004083eb8
parent 2422 45aa510d7a2a
child 2643 b98d9e08231f
permissions -rw-r--r--
Modbus plugin: Add "exec. req. flag" and "write on change" features
from __future__ import absolute_import
import os
import signal
import subprocess
import ctypes
from threading import Thread
import time
import re

import runtime.PLCObject as PLCObject
from runtime.loglevels import LogLevelsDict

SDOAnswered = PLCBinary.SDOAnswered
SDOAnswered.restype = None
SDOAnswered.argtypes = []

SDOThread = None
SDOProc = None
Result = None


def SDOThreadProc(*params):
    global Result, SDOProc
    if params[0] == "upload":
        cmdfmt = "ethercat upload -p %d -t %s 0x%.4x 0x%.2x"
    else:
        cmdfmt = "ethercat download -p %d -t %s 0x%.4x 0x%.2x %s"

    command = cmdfmt % params[1:]
    SDOProc = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
    res = SDOProc.wait()
    output = SDOProc.communicate()[0]

    if params[0] == "upload":
        Result = None
        if res == 0:
            if params[2] in ["float", "double"]:
                Result = float(output)
            elif params[2] in ["string", "octet_string", "unicode_string"]:
                Result = output
            else:
                hex_value, dec_value = output.split()
                if int(hex_value, 16) == int(dec_value):
                    Result = int(dec_value)
    else:
        Result = res == 0

    SDOAnswered()
    if res != 0:
        PLCObject.LogMessage(
            LogLevelsDict["WARNING"],
            "%s : %s" % (command, output))


def EthercatSDOUpload(pos, index, subindex, var_type):
    global SDOThread
    SDOThread = Thread(target=SDOThreadProc, args=["upload", pos, var_type, index, subindex])
    SDOThread.start()


def EthercatSDODownload(pos, index, subindex, var_type, value):
    global SDOThread
    SDOThread = Thread(target=SDOThreadProc, args=["download", pos, var_type, index, subindex, value])
    SDOThread.start()


def GetResult():
    return Result


KMSGPollThread = None
StopKMSGThread = False


def KMSGPollThreadProc():
    """
    Logs Kernel messages starting with EtherCAT
    Uses GLibc wrapper to Linux syscall "klogctl"
    Last 4 KB are polled, and lines compared to last
    captured line to detect new lines
    """
    libc = ctypes.CDLL("libc.so.6")
    klog = libc.klogctl
    klog.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
    klog.restype = ctypes.c_int
    s = ctypes.create_string_buffer(4*1024)
    last = None
    while not StopKMSGThread:
        bytes_to_read = klog(3, s, len(s)-1)
        log = s.value[:bytes_to_read-1]
        if last:
            log = log.rpartition(last)[2]
        if log:
            last = log.rpartition('\n')[2]
            for lvl, msg in re.findall(
                    r'<(\d)>\[\s*\d*\.\d*\]\s*(EtherCAT\s*.*)$',
                    log, re.MULTILINE):
                PLCObject.LogMessage(
                    LogLevelsDict[{
                        "4": "WARNING",
                        "3": "CRITICAL"}.get(lvl, "DEBUG")],
                    msg)
        time.sleep(0.5)


def _runtime_etherlab_init():
    global KMSGPollThread, StopKMSGThread
    StopKMSGThread = False
    KMSGPollThread = Thread(target=KMSGPollThreadProc)
    KMSGPollThread.start()


def _runtime_etherlab_cleanup():
    global KMSGPollThread, StopKMSGThread, SDOThread
    try:
        os.kill(SDOProc.pid, signal.SIGTERM)
    except Exception:
        pass
    SDOThread = None
    StopKMSGThread = True
    KMSGPollThread = None