etherlab/etherlab.py
author Laurent Bessard
Fri, 27 Sep 2013 17:49:40 +0200
changeset 2160 75349c51a34b
parent 2159 93797d4303a3
child 2162 43ab74687f45
permissions -rw-r--r--
Added support for loading XML file even if not following XSD schema (but still following XML syntax), warning user of errors in XML file
import os, shutil
from lxml import etree

import wx
import csv

from xmlclass import *

from ConfigTreeNode import ConfigTreeNode, XSDSchemaErrorMessage
from PLCControler import UndoBuffer, LOCATION_CONFNODE, LOCATION_MODULE, LOCATION_GROUP, LOCATION_VAR_INPUT, LOCATION_VAR_OUTPUT, LOCATION_VAR_MEMORY

from EthercatSlave import ExtractHexDecValue, ExtractName
from EthercatMaster import _EthercatCTN
from ConfigEditor import LibraryEditor, ETHERCAT_VENDOR, ETHERCAT_GROUP, ETHERCAT_DEVICE

ScriptDirectory = os.path.split(os.path.realpath(__file__))[0]

#--------------------------------------------------
#                 Ethercat ConfNode
#--------------------------------------------------

EtherCATInfoParser = GenerateParserFromXSD(os.path.join(os.path.dirname(__file__), "EtherCATInfo.xsd")) 
EtherCATInfo_XPath = lambda xpath: etree.XPath(xpath)

def extract_param(el):
    if el.tag == "Index":
        return "#x%4.4X" % int(el.text)
    elif el.tag == "BitSize":
        if el.text is None:
            return 0
        return int(el.text)
    elif el.tag == "PDOMapping":
        if el.text is None:
            return ""
        return el.text.upper()
    if el.text is None:
        return ""
    return el.text

def extract_pdo_infos(pdo_infos):
    return {
        pdo_infos.tag + " " + el.tag: extract_param(el)
        for el in pdo_infos}

def HexDecValue(ctxt, values):
    return str(ExtractHexDecValue(values[0]))

def EntryName(ctxt, values):
    default=None
    names = []
    for element in values:
        if element.tag == "Default":
            default = element.text
        else:
            names.append(element)
    return ExtractName(names, default)

class AddEntry(etree.XSLTExtension):

    def __init__(self, entries):
        etree.XSLTExtension.__init__(self)
        self.Entries = entries
    
    def execute(self, context, self_node, input_node, output_parent):
        infos = etree.Element('entry_infos')
        self.process_children(context, infos)
        index, subindex = map(
            lambda x: int(infos.find(x).text),
            ["Index", "SubIndex"])
        new_entry_infos = {
            el.tag: extract_param(el)
            for el in infos if el.tag != "PDO"}
        if (index, subindex) != (0, 0):
            entry_infos = self.Entries.get((index, subindex))
            if entry_infos is not None:
                PDO_infos = infos.find("PDO")
                if PDO_infos is not None:
                    entry_infos.update(extract_pdo_infos(PDO_infos))
            else:
                self.Entries[(index, subindex)] = new_entry_infos

entries_list_xslt = etree.parse(
    os.path.join(ScriptDirectory, "entries_list.xslt"))

cls = EtherCATInfoParser.GetElementClass("DeviceType")
if cls:
    
    profile_numbers_xpath = EtherCATInfo_XPath("Profile/ProfileNo")
    def GetProfileNumbers(self):
        return [number.text for number in profile_numbers_xpath(self)]
    setattr(cls, "GetProfileNumbers", GetProfileNumbers)
    
    def getCoE(self):
        mailbox = self.getMailbox()
        if mailbox is not None:
            return mailbox.getCoE()
        return None
    setattr(cls, "getCoE", getCoE)

    def GetEntriesList(self, limits=None):
        entries = {}
        
        entries_list_xslt_tree = etree.XSLT(
            entries_list_xslt, extensions = {
                ("entries_list_ns", "add_entry"): AddEntry(entries),
                ("entries_list_ns", "HexDecValue"): HexDecValue,
                ("entries_list_ns", "EntryName"): EntryName})
        entries_list_xslt_tree(self, **dict(zip(
            ["min_index", "max_index"], 
            map(lambda x: etree.XSLT.strparam(str(x)),
                limits if limits is not None else [0x0000, 0xFFFF])
            )))
        
        return entries
    setattr(cls, "GetEntriesList", GetEntriesList)

    def GetSyncManagers(self):
        sync_managers = []
        for sync_manager in self.getSm():
            sync_manager_infos = {}
            for name, value in [("Name", sync_manager.getcontent()),
                                ("Start Address", sync_manager.getStartAddress()),
                                ("Default Size", sync_manager.getDefaultSize()),
                                ("Control Byte", sync_manager.getControlByte()),
                                ("Enable", sync_manager.getEnable())]:
                if value is None:
                    value =""
                sync_manager_infos[name] = value
            sync_managers.append(sync_manager_infos)
        return sync_managers
    setattr(cls, "GetSyncManagers", GetSyncManagers)

def GroupItemCompare(x, y):
    if x["type"] == y["type"]:
        if x["type"] == ETHERCAT_GROUP:
            return cmp(x["order"], y["order"])
        else:
            return cmp(x["name"], y["name"])
    elif x["type"] == ETHERCAT_GROUP:
        return -1
    return 1

def SortGroupItems(group):
    for item in group["children"]:
        if item["type"] == ETHERCAT_GROUP:
            SortGroupItems(item)
    group["children"].sort(GroupItemCompare)

class ModulesLibrary:

    MODULES_EXTRA_PARAMS = [
        ("pdo_alignment", {
            "column_label": _("PDO alignment"), 
            "column_size": 150,
            "default": 8,
            "description": _(
"Minimal size in bits between 2 pdo entries")}),
        ("max_pdo_size", {
            "column_label": _("Max entries by PDO"),
            "column_size": 150,
            "default": 255,
            "description": _(
"""Maximal number of entries mapped in a PDO
including empty entries used for PDO alignment""")}),
        ("add_pdo", {
            "column_label": _("Creating new PDO"), 
            "column_size": 150,
            "default": 0,
            "description": _(
"""Adding a PDO not defined in default configuration
for mapping needed location variables
(1 if possible)""")})
    ]
    
    def __init__(self, path, parent_library=None):
        self.Path = path
        if not os.path.exists(self.Path):
            os.makedirs(self.Path)
        self.ParentLibrary = parent_library
        
        if parent_library is not None:
            self.LoadModules()
        else:
            self.Library = None
        self.LoadModulesExtraParams()
    
    def GetPath(self):
        return self.Path
    
    def GetModulesExtraParamsFilePath(self):
        return os.path.join(self.Path, "modules_extra_params.cfg")
    
    groups_xpath = EtherCATInfo_XPath("Descriptions/Groups/Group")
    devices_xpath = EtherCATInfo_XPath("Descriptions/Devices/Device")
    def LoadModules(self):
        self.Library = {}
        
        files = os.listdir(self.Path)
        for file in files:
            filepath = os.path.join(self.Path, file)
            if os.path.isfile(filepath) and os.path.splitext(filepath)[-1] == ".xml":
                self.modules_infos = None
                
                xmlfile = open(filepath, 'r')
                try:
                    self.modules_infos, error = EtherCATInfoParser.LoadXMLString(xmlfile.read())
                    if error is not None:
                        self.GetCTRoot().logger.write_warning(
                            XSDSchemaErrorMessage % (filepath + error))
                except Exception, exc:
                    self.modules_infos, error = None, unicode(exc)
                xmlfile.close()
                
                if self.modules_infos is not None:
                    vendor = self.modules_infos.getVendor()
                    
                    vendor_category = self.Library.setdefault(
                        ExtractHexDecValue(vendor.getId()), 
                        {"name": ExtractName(vendor.getName(), _("Miscellaneous")), 
                         "groups": {}})
                    
                    for group in self.groups_xpath(self.modules_infos):
                        group_type = group.getType()
                        
                        vendor_category["groups"].setdefault(group_type, 
                            {"name": ExtractName(group.getName(), group_type), 
                             "parent": group.getParentGroup(),
                             "order": group.getSortOrder(), 
                             #"value": group.getcontent()["value"],
                             "devices": []})
                    
                    for device in self.devices_xpath(self.modules_infos):
                        device_group = device.getGroupType()
                        if not vendor_category["groups"].has_key(device_group):
                            raise ValueError, "Not such group \"%\"" % device_group
                        vendor_category["groups"][device_group]["devices"].append(
                            (device.getType().getcontent(), device))
                
                else:
                        
                    self.GetCTRoot().logger.write_error(
                        _("Couldn't load %s XML file:\n%s") % (filepath, error))
                
        return self.Library

    def GetModulesLibrary(self, profile_filter=None):
        if self.Library is None:
            self.LoadModules()
        library = []
        for vendor_id, vendor in self.Library.iteritems():
            groups = []
            children_dict = {}
            for group_type, group in vendor["groups"].iteritems():
                group_infos = {"name": group["name"],
                               "order": group["order"],
                               "type": ETHERCAT_GROUP,
                               "infos": None,
                               "children": children_dict.setdefault(group_type, [])}
                device_dict = {}
                for device_type, device in group["devices"]:
                    if profile_filter is None or profile_filter in device.GetProfileNumbers():
                        product_code = device.getType().getProductCode()
                        revision_number = device.getType().getRevisionNo()
                        module_infos = {"device_type": device_type,
                                        "vendor": vendor_id,
                                        "product_code": product_code,
                                        "revision_number": revision_number}
                        module_infos.update(self.GetModuleExtraParams(vendor_id, product_code, revision_number))
                        device_infos = {"name": ExtractName(device.getName()),
                                        "type": ETHERCAT_DEVICE,
                                        "infos": module_infos,
                                        "children": []}
                        group_infos["children"].append(device_infos)
                        device_type_occurrences = device_dict.setdefault(device_type, [])
                        device_type_occurrences.append(device_infos)
                for device_type_occurrences in device_dict.itervalues():
                    if len(device_type_occurrences) > 1:
                        for occurrence in device_type_occurrences:
                            occurrence["name"] += _(" (rev. %s)") % occurrence["infos"]["revision_number"]
                if len(group_infos["children"]) > 0:
                    if group["parent"] is not None:
                        parent_children = children_dict.setdefault(group["parent"], [])
                        parent_children.append(group_infos)
                    else:
                        groups.append(group_infos)
            if len(groups) > 0:
                library.append({"name": vendor["name"],
                                "type": ETHERCAT_VENDOR,
                                "infos": None,
                                "children": groups})
        library.sort(lambda x, y: cmp(x["name"], y["name"]))
        return library

    def GetVendors(self):
        return [(vendor_id, vendor["name"]) for vendor_id, vendor in self.Library.items()]
    
    def GetModuleInfos(self, module_infos):
        vendor = ExtractHexDecValue(module_infos["vendor"])
        vendor_infos = self.Library.get(vendor)
        if vendor_infos is not None:
            for group_name, group_infos in vendor_infos["groups"].iteritems():
                for device_type, device_infos in group_infos["devices"]:
                    product_code = ExtractHexDecValue(device_infos.getType().getProductCode())
                    revision_number = ExtractHexDecValue(device_infos.getType().getRevisionNo())
                    if (product_code == ExtractHexDecValue(module_infos["product_code"]) and
                        revision_number == ExtractHexDecValue(module_infos["revision_number"])):
                        self.cntdevice = device_infos 
                        self.cntdeviceType = device_type  
                        return device_infos, self.GetModuleExtraParams(vendor, product_code, revision_number)
        return None, None
    
    def ImportModuleLibrary(self, filepath):
        if os.path.isfile(filepath):
            shutil.copy(filepath, self.Path)
            self.LoadModules()
            return True
        return False
    
    def LoadModulesExtraParams(self):
        self.ModulesExtraParams = {}
        
        csvfile_path = self.GetModulesExtraParamsFilePath()
        if os.path.exists(csvfile_path):
            csvfile = open(csvfile_path, "rb")
            sample = csvfile.read(1024)
            csvfile.seek(0)
            dialect = csv.Sniffer().sniff(sample)
            has_header = csv.Sniffer().has_header(sample)
            reader = csv.reader(csvfile, dialect)
            for row in reader:
                if has_header:
                    has_header = False
                else:
                    params_values = {}
                    for (param, param_infos), value in zip(
                        self.MODULES_EXTRA_PARAMS, row[3:]):
                        if value != "":
                            params_values[param] = int(value)
                    self.ModulesExtraParams[
                        tuple(map(int, row[:3]))] = params_values
            csvfile.close()
    
    def SaveModulesExtraParams(self):
        csvfile = open(self.GetModulesExtraParamsFilePath(), "wb")
        extra_params = [param for param, params_infos in self.MODULES_EXTRA_PARAMS]
        writer = csv.writer(csvfile, delimiter=';')
        writer.writerow(['Vendor', 'product_code', 'revision_number'] + extra_params)
        for (vendor, product_code, revision_number), module_extra_params in self.ModulesExtraParams.iteritems():
            writer.writerow([vendor, product_code, revision_number] + 
                            [module_extra_params.get(param, '') 
                             for param in extra_params])
        csvfile.close()
    
    def SetModuleExtraParam(self, vendor, product_code, revision_number, param, value):
        vendor = ExtractHexDecValue(vendor)
        product_code = ExtractHexDecValue(product_code)
        revision_number = ExtractHexDecValue(revision_number)
        
        module_infos = (vendor, product_code, revision_number)
        self.ModulesExtraParams.setdefault(module_infos, {})
        self.ModulesExtraParams[module_infos][param] = value
        
        self.SaveModulesExtraParams()
    
    def GetModuleExtraParams(self, vendor, product_code, revision_number):
        vendor = ExtractHexDecValue(vendor)
        product_code = ExtractHexDecValue(product_code)
        revision_number = ExtractHexDecValue(revision_number)
        
        if self.ParentLibrary is not None:
            extra_params = self.ParentLibrary.GetModuleExtraParams(vendor, product_code, revision_number)
        else:
            extra_params = {}
        
        extra_params.update(self.ModulesExtraParams.get((vendor, product_code, revision_number), {}))
        
        for param, param_infos in self.MODULES_EXTRA_PARAMS:
            extra_params.setdefault(param, param_infos["default"])
        
        return extra_params

USERDATA_DIR = wx.StandardPaths.Get().GetUserDataDir()
if wx.Platform != '__WXMSW__':
    USERDATA_DIR += '_files'

ModulesDatabase = ModulesLibrary(
    os.path.join(USERDATA_DIR, "ethercat_modules"))

class RootClass:
    
    CTNChildrenTypes = [("EthercatNode",_EthercatCTN,"Ethercat Master")]
    EditorType = LibraryEditor
       
    
    def __init__(self):
        self.ModulesLibrary = None
        self.LoadModulesLibrary()
    
    def GetIconName(self):
        return "Ethercat"
    
    def GetModulesLibraryPath(self, project_path=None):
        if project_path is None:
            project_path = self.CTNPath()
        return os.path.join(project_path, "modules") 
    
    def OnCTNSave(self, from_project_path=None):
        if from_project_path is not None:
            shutil.copytree(self.GetModulesLibraryPath(from_project_path),
                            self.GetModulesLibraryPath())
        return True
    
    def CTNGenerate_C(self, buildpath, locations):
        return [],"",False
    
    def LoadModulesLibrary(self):
        if self.ModulesLibrary is None:
            self.ModulesLibrary = ModulesLibrary(self.GetModulesLibraryPath(), ModulesDatabase)
        else:
            self.ModulesLibrary.LoadModulesLibrary()
    
    def GetModulesDatabaseInstance(self):
        return ModulesDatabase
    
    def GetModulesLibraryInstance(self):
        return self.ModulesLibrary
    
    def GetModulesLibrary(self, profile_filter=None):
        return self.ModulesLibrary.GetModulesLibrary(profile_filter)
    
    def GetVendors(self):
        return self.ModulesLibrary.GetVendors()
    
    def GetModuleInfos(self, module_infos):
        return self.ModulesLibrary.GetModuleInfos(module_infos)