author Andrey Skvortsov <>
Wed, 13 Mar 2019 11:47:03 +0300
changeset 2537 eb4a4cc41914
parent 2422 45aa510d7a2a
child 2643 b98d9e08231f
permissions -rw-r--r--
Fix various pylint and pep8 errors

Check basic code-style problems for PEP-8
pep8 version: 2.4.0
./connectors/PYRO/ E261 at least two spaces before inline comment
./connectors/ E128 continuation line under-indented for visual indent
./controls/ E127 continuation line over-indented for visual indent
./controls/ E127 continuation line over-indented for visual indent

Check for problems using pylint ...
No config file found, using default configuration
pylint 1.9.4,
astroid 1.6.5
Python 2.7.16rc1 (default, Feb 18 2019, 11:05:09)
[GCC 8.2.0]
Use multiple threads for pylint
Using config file /home/developer/WorkData/PLC/beremiz/beremiz/.pylint
************* Module connectors.PYRO_dialog
connectors/ [W0611(unused-import), ] Unused import wx
************* Module connectors
connectors/ [W1652(deprecated-types-field), ] Accessing a deprecated fields on the types module
connectors/ [C0411(wrong-import-order), ] standard import "from types import ClassType" should be placed before "from connectors.ConnectorBase import ConnectorBase"
************* Module connectors.PYRO.PSK_Adapter
connectors/PYRO/ [C0411(wrong-import-order), ] standard import "import ssl" should be placed before "import sslpsk"
************* Module connectors.SchemeEditor
connectors/ [C0330(bad-continuation), ] Wrong continued indentation (add 1 space).
connectors/ [W0631(undefined-loop-variable), SchemeEditor.__init__] Using possibly undefined loop variable 'tag'
************* Module runtime.WampClient
runtime/ [W1612(unicode-builtin), WampSession.onJoin] unicode built-in referenced
runtime/ [W1612(unicode-builtin), WampSession.publishWithOwnID] unicode built-in referenced
runtime/ [W1612(unicode-builtin), PublishEvent] unicode built-in referenced
runtime/ [W1612(unicode-builtin), PublishEventWithOwnID] unicode built-in referenced
runtime/ [W0611(unused-import), ] Unused str imported from builtins as text
************* Module runtime.PLCObject
runtime/ [W1648(bad-python3-import), ] Module moved in Python 3
runtime/ [C0411(wrong-import-order), ] standard import "import md5" should be placed before "from six.moves import xrange"
runtime/ [C0411(wrong-import-order), ] standard import "from tempfile import mkstemp" should be placed before "from six.moves import xrange"
runtime/ [C0411(wrong-import-order), ] standard import "import shutil" should be placed before "from six.moves import xrange"
runtime/ [C0411(wrong-import-order), ] standard import "from functools import wraps, partial" should be placed before "from six.moves import xrange"
************* Module runtime.Worker
runtime/ [W1648(bad-python3-import), ] Module moved in Python 3
************* Module runtime.spawn_subprocess
runtime/ [C0325(superfluous-parens), ] Unnecessary parens after 'print' keyword
runtime/ [C0325(superfluous-parens), ] Unnecessary parens after 'print' keyword
runtime/ [E1601(print-statement), ] print statement used
runtime/ [E1601(print-statement), ] print statement used
************* Module controls.IDBrowser
controls/ [C0330(bad-continuation), ] Wrong continued indentation (remove 5 spaces).
if self.isManager
| ^
controls/ [C0330(bad-continuation), ] Wrong continued indentation (remove 5 spaces).
| ^
************* Module Beremiz_service [W0611(unused-import), ] Unused import __builtin__
from __future__ import absolute_import
import os
import signal
import subprocess
import ctypes
from threading import Thread
import time
import re

import runtime.PLCObject as PLCObject
from runtime.loglevels import LogLevelsDict

SDOAnswered = PLCBinary.SDOAnswered
SDOAnswered.restype = None
SDOAnswered.argtypes = []

SDOThread = None
SDOProc = None
Result = None

def SDOThreadProc(*params):
    global Result, SDOProc
    if params[0] == "upload":
        cmdfmt = "ethercat upload -p %d -t %s 0x%.4x 0x%.2x"
        cmdfmt = "ethercat download -p %d -t %s 0x%.4x 0x%.2x %s"

    command = cmdfmt % params[1:]
    SDOProc = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
    res = SDOProc.wait()
    output = SDOProc.communicate()[0]

    if params[0] == "upload":
        Result = None
        if res == 0:
            if params[2] in ["float", "double"]:
                Result = float(output)
            elif params[2] in ["string", "octet_string", "unicode_string"]:
                Result = output
                hex_value, dec_value = output.split()
                if int(hex_value, 16) == int(dec_value):
                    Result = int(dec_value)
        Result = res == 0

    if res != 0:
            "%s : %s" % (command, output))

def EthercatSDOUpload(pos, index, subindex, var_type):
    global SDOThread
    SDOThread = Thread(target=SDOThreadProc, args=["upload", pos, var_type, index, subindex])

def EthercatSDODownload(pos, index, subindex, var_type, value):
    global SDOThread
    SDOThread = Thread(target=SDOThreadProc, args=["download", pos, var_type, index, subindex, value])

def GetResult():
    return Result

KMSGPollThread = None
StopKMSGThread = False

def KMSGPollThreadProc():
    Logs Kernel messages starting with EtherCAT
    Uses GLibc wrapper to Linux syscall "klogctl"
    Last 4 KB are polled, and lines compared to last
    captured line to detect new lines
    libc = ctypes.CDLL("")
    klog = libc.klogctl
    klog.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
    klog.restype = ctypes.c_int
    s = ctypes.create_string_buffer(4*1024)
    last = None
    while not StopKMSGThread:
        bytes_to_read = klog(3, s, len(s)-1)
        log = s.value[:bytes_to_read-1]
        if last:
            log = log.rpartition(last)[2]
        if log:
            last = log.rpartition('\n')[2]
            for lvl, msg in re.findall(
                    log, re.MULTILINE):
                        "4": "WARNING",
                        "3": "CRITICAL"}.get(lvl, "DEBUG")],

def _runtime_etherlab_init():
    global KMSGPollThread, StopKMSGThread
    StopKMSGThread = False
    KMSGPollThread = Thread(target=KMSGPollThreadProc)

def _runtime_etherlab_cleanup():
    global KMSGPollThread, StopKMSGThread, SDOThread
        os.kill(, signal.SIGTERM)
    except Exception:
    SDOThread = None
    StopKMSGThread = True
    KMSGPollThread = None