etherlab/etherlab.py
author Laurent Bessard
Tue, 24 Sep 2013 15:18:25 +0200
changeset 2159 93797d4303a3
parent 2157 a2385e535cf5
child 2160 75349c51a34b
permissions -rw-r--r--
Fixed bug when compiling
import os, shutil
from lxml import etree

import wx
import csv

from xmlclass import *

from ConfigTreeNode import ConfigTreeNode
from PLCControler import UndoBuffer, LOCATION_CONFNODE, LOCATION_MODULE, LOCATION_GROUP, LOCATION_VAR_INPUT, LOCATION_VAR_OUTPUT, LOCATION_VAR_MEMORY

from EthercatSlave import ExtractHexDecValue, ExtractName
from EthercatMaster import _EthercatCTN
from ConfigEditor import LibraryEditor, ETHERCAT_VENDOR, ETHERCAT_GROUP, ETHERCAT_DEVICE

ScriptDirectory = os.path.split(os.path.realpath(__file__))[0]

#--------------------------------------------------
#                 Ethercat ConfNode
#--------------------------------------------------

EtherCATInfoParser = GenerateParserFromXSD(os.path.join(os.path.dirname(__file__), "EtherCATInfo.xsd")) 
EtherCATInfo_XPath = lambda xpath: etree.XPath(xpath)

def extract_param(el):
    if el.tag == "Index":
        return "#x%4.4X" % int(el.text)
    elif el.tag == "BitSize":
        if el.text is None:
            return 0
        return int(el.text)
    elif el.tag == "PDOMapping":
        if el.text is None:
            return ""
        return el.text.upper()
    if el.text is None:
        return ""
    return el.text

def extract_pdo_infos(pdo_infos):
    return {
        pdo_infos.tag + " " + el.tag: extract_param(el)
        for el in pdo_infos}

def HexDecValue(ctxt, values):
    return str(ExtractHexDecValue(values[0]))

def EntryName(ctxt, values):
    default=None
    names = []
    for element in values:
        if element.tag == "Default":
            default = element.text
        else:
            names.append(element)
    return ExtractName(names, default)

class AddEntry(etree.XSLTExtension):

    def __init__(self, entries):
        etree.XSLTExtension.__init__(self)
        self.Entries = entries
    
    def execute(self, context, self_node, input_node, output_parent):
        infos = etree.Element('entry_infos')
        self.process_children(context, infos)
        index, subindex = map(
            lambda x: int(infos.find(x).text),
            ["Index", "SubIndex"])
        new_entry_infos = {
            el.tag: extract_param(el)
            for el in infos if el.tag != "PDO"}
        if (index, subindex) != (0, 0):
            entry_infos = self.Entries.get((index, subindex))
            if entry_infos is not None:
                PDO_infos = infos.find("PDO")
                if PDO_infos is not None:
                    entry_infos.update(extract_pdo_infos(PDO_infos))
            else:
                self.Entries[(index, subindex)] = new_entry_infos

entries_list_xslt = etree.parse(
    os.path.join(ScriptDirectory, "entries_list.xslt"))

cls = EtherCATInfoParser.GetElementClass("DeviceType")
if cls:
    
    profile_numbers_xpath = EtherCATInfo_XPath("Profile/ProfileNo")
    def GetProfileNumbers(self):
        return [number.text for number in profile_numbers_xpath(self)]
    setattr(cls, "GetProfileNumbers", GetProfileNumbers)
    
    def getCoE(self):
        mailbox = self.getMailbox()
        if mailbox is not None:
            return mailbox.getCoE()
        return None
    setattr(cls, "getCoE", getCoE)

    def GetEntriesList(self, limits=None):
        entries = {}
        
        entries_list_xslt_tree = etree.XSLT(
            entries_list_xslt, extensions = {
                ("entries_list_ns", "add_entry"): AddEntry(entries),
                ("entries_list_ns", "HexDecValue"): HexDecValue,
                ("entries_list_ns", "EntryName"): EntryName})
        entries_list_xslt_tree(self, **dict(zip(
            ["min_index", "max_index"], 
            map(lambda x: etree.XSLT.strparam(str(x)),
                limits if limits is not None else [0x0000, 0xFFFF])
            )))
        
        return entries
    setattr(cls, "GetEntriesList", GetEntriesList)

    def GetSyncManagers(self):
        sync_managers = []
        for sync_manager in self.getSm():
            sync_manager_infos = {}
            for name, value in [("Name", sync_manager.getcontent()),
                                ("Start Address", sync_manager.getStartAddress()),
                                ("Default Size", sync_manager.getDefaultSize()),
                                ("Control Byte", sync_manager.getControlByte()),
                                ("Enable", sync_manager.getEnable())]:
                if value is None:
                    value =""
                sync_manager_infos[name] = value
            sync_managers.append(sync_manager_infos)
        return sync_managers
    setattr(cls, "GetSyncManagers", GetSyncManagers)

def GroupItemCompare(x, y):
    if x["type"] == y["type"]:
        if x["type"] == ETHERCAT_GROUP:
            return cmp(x["order"], y["order"])
        else:
            return cmp(x["name"], y["name"])
    elif x["type"] == ETHERCAT_GROUP:
        return -1
    return 1

def SortGroupItems(group):
    for item in group["children"]:
        if item["type"] == ETHERCAT_GROUP:
            SortGroupItems(item)
    group["children"].sort(GroupItemCompare)

class ModulesLibrary:

    MODULES_EXTRA_PARAMS = [
        ("pdo_alignment", {
            "column_label": _("PDO alignment"), 
            "column_size": 150,
            "default": 8,
            "description": _(
"Minimal size in bits between 2 pdo entries")}),
        ("max_pdo_size", {
            "column_label": _("Max entries by PDO"),
            "column_size": 150,
            "default": 255,
            "description": _(
"""Maximal number of entries mapped in a PDO
including empty entries used for PDO alignment""")}),
        ("add_pdo", {
            "column_label": _("Creating new PDO"), 
            "column_size": 150,
            "default": 0,
            "description": _(
"""Adding a PDO not defined in default configuration
for mapping needed location variables
(1 if possible)""")})
    ]
    
    def __init__(self, path, parent_library=None):
        self.Path = path
        if not os.path.exists(self.Path):
            os.makedirs(self.Path)
        self.ParentLibrary = parent_library
        
        if parent_library is not None:
            self.LoadModules()
        else:
            self.Library = None
        self.LoadModulesExtraParams()
    
    def GetPath(self):
        return self.Path
    
    def GetModulesExtraParamsFilePath(self):
        return os.path.join(self.Path, "modules_extra_params.cfg")
    
    groups_xpath = EtherCATInfo_XPath("Descriptions/Groups/Group")
    devices_xpath = EtherCATInfo_XPath("Descriptions/Devices/Device")
    def LoadModules(self):
        self.Library = {}
        
        files = os.listdir(self.Path)
        for file in files:
            filepath = os.path.join(self.Path, file)
            if os.path.isfile(filepath) and os.path.splitext(filepath)[-1] == ".xml":
                self.modules_infos = None
                
                xmlfile = open(filepath, 'r')
                try:
                    self.modules_infos = etree.fromstring(
                        xmlfile.read(), EtherCATInfoParser)
                except:
                    pass
                xmlfile.close()
                
                if self.modules_infos is not None:
                    vendor = self.modules_infos.getVendor()
                    
                    vendor_category = self.Library.setdefault(
                        ExtractHexDecValue(vendor.getId()), 
                        {"name": ExtractName(vendor.getName(), _("Miscellaneous")), 
                         "groups": {}})
                    
                    for group in self.groups_xpath(self.modules_infos):
                        group_type = group.getType()
                        
                        vendor_category["groups"].setdefault(group_type, 
                            {"name": ExtractName(group.getName(), group_type), 
                             "parent": group.getParentGroup(),
                             "order": group.getSortOrder(), 
                             #"value": group.getcontent()["value"],
                             "devices": []})
                    
                    for device in self.devices_xpath(self.modules_infos):
                        device_group = device.getGroupType()
                        if not vendor_category["groups"].has_key(device_group):
                            raise ValueError, "Not such group \"%\"" % device_group
                        vendor_category["groups"][device_group]["devices"].append(
                            (device.getType().getcontent(), device))

        return self.Library 

    def GetModulesLibrary(self, profile_filter=None):
        if self.Library is None:
            self.LoadModules()
        library = []
        for vendor_id, vendor in self.Library.iteritems():
            groups = []
            children_dict = {}
            for group_type, group in vendor["groups"].iteritems():
                group_infos = {"name": group["name"],
                               "order": group["order"],
                               "type": ETHERCAT_GROUP,
                               "infos": None,
                               "children": children_dict.setdefault(group_type, [])}
                device_dict = {}
                for device_type, device in group["devices"]:
                    if profile_filter is None or profile_filter in device.GetProfileNumbers():
                        product_code = device.getType().getProductCode()
                        revision_number = device.getType().getRevisionNo()
                        module_infos = {"device_type": device_type,
                                        "vendor": vendor_id,
                                        "product_code": product_code,
                                        "revision_number": revision_number}
                        module_infos.update(self.GetModuleExtraParams(vendor_id, product_code, revision_number))
                        device_infos = {"name": ExtractName(device.getName()),
                                        "type": ETHERCAT_DEVICE,
                                        "infos": module_infos,
                                        "children": []}
                        group_infos["children"].append(device_infos)
                        device_type_occurrences = device_dict.setdefault(device_type, [])
                        device_type_occurrences.append(device_infos)
                for device_type_occurrences in device_dict.itervalues():
                    if len(device_type_occurrences) > 1:
                        for occurrence in device_type_occurrences:
                            occurrence["name"] += _(" (rev. %s)") % occurrence["infos"]["revision_number"]
                if len(group_infos["children"]) > 0:
                    if group["parent"] is not None:
                        parent_children = children_dict.setdefault(group["parent"], [])
                        parent_children.append(group_infos)
                    else:
                        groups.append(group_infos)
            if len(groups) > 0:
                library.append({"name": vendor["name"],
                                "type": ETHERCAT_VENDOR,
                                "infos": None,
                                "children": groups})
        library.sort(lambda x, y: cmp(x["name"], y["name"]))
        return library

    def GetVendors(self):
        return [(vendor_id, vendor["name"]) for vendor_id, vendor in self.Library.items()]
    
    def GetModuleInfos(self, module_infos):
        vendor = ExtractHexDecValue(module_infos["vendor"])
        vendor_infos = self.Library.get(vendor)
        if vendor_infos is not None:
            for group_name, group_infos in vendor_infos["groups"].iteritems():
                for device_type, device_infos in group_infos["devices"]:
                    product_code = ExtractHexDecValue(device_infos.getType().getProductCode())
                    revision_number = ExtractHexDecValue(device_infos.getType().getRevisionNo())
                    if (product_code == ExtractHexDecValue(module_infos["product_code"]) and
                        revision_number == ExtractHexDecValue(module_infos["revision_number"])):
                        self.cntdevice = device_infos 
                        self.cntdeviceType = device_type  
                        return device_infos, self.GetModuleExtraParams(vendor, product_code, revision_number)
        return None, None
    
    def ImportModuleLibrary(self, filepath):
        if os.path.isfile(filepath):
            shutil.copy(filepath, self.Path)
            self.LoadModules()
            return True
        return False
    
    def LoadModulesExtraParams(self):
        self.ModulesExtraParams = {}
        
        csvfile_path = self.GetModulesExtraParamsFilePath()
        if os.path.exists(csvfile_path):
            csvfile = open(csvfile_path, "rb")
            sample = csvfile.read(1024)
            csvfile.seek(0)
            dialect = csv.Sniffer().sniff(sample)
            has_header = csv.Sniffer().has_header(sample)
            reader = csv.reader(csvfile, dialect)
            for row in reader:
                if has_header:
                    has_header = False
                else:
                    params_values = {}
                    for (param, param_infos), value in zip(
                        self.MODULES_EXTRA_PARAMS, row[3:]):
                        if value != "":
                            params_values[param] = int(value)
                    self.ModulesExtraParams[
                        tuple(map(int, row[:3]))] = params_values
            csvfile.close()
    
    def SaveModulesExtraParams(self):
        csvfile = open(self.GetModulesExtraParamsFilePath(), "wb")
        extra_params = [param for param, params_infos in self.MODULES_EXTRA_PARAMS]
        writer = csv.writer(csvfile, delimiter=';')
        writer.writerow(['Vendor', 'product_code', 'revision_number'] + extra_params)
        for (vendor, product_code, revision_number), module_extra_params in self.ModulesExtraParams.iteritems():
            writer.writerow([vendor, product_code, revision_number] + 
                            [module_extra_params.get(param, '') 
                             for param in extra_params])
        csvfile.close()
    
    def SetModuleExtraParam(self, vendor, product_code, revision_number, param, value):
        vendor = ExtractHexDecValue(vendor)
        product_code = ExtractHexDecValue(product_code)
        revision_number = ExtractHexDecValue(revision_number)
        
        module_infos = (vendor, product_code, revision_number)
        self.ModulesExtraParams.setdefault(module_infos, {})
        self.ModulesExtraParams[module_infos][param] = value
        
        self.SaveModulesExtraParams()
    
    def GetModuleExtraParams(self, vendor, product_code, revision_number):
        vendor = ExtractHexDecValue(vendor)
        product_code = ExtractHexDecValue(product_code)
        revision_number = ExtractHexDecValue(revision_number)
        
        if self.ParentLibrary is not None:
            extra_params = self.ParentLibrary.GetModuleExtraParams(vendor, product_code, revision_number)
        else:
            extra_params = {}
        
        extra_params.update(self.ModulesExtraParams.get((vendor, product_code, revision_number), {}))
        
        for param, param_infos in self.MODULES_EXTRA_PARAMS:
            extra_params.setdefault(param, param_infos["default"])
        
        return extra_params

USERDATA_DIR = wx.StandardPaths.Get().GetUserDataDir()
if wx.Platform != '__WXMSW__':
    USERDATA_DIR += '_files'

ModulesDatabase = ModulesLibrary(
    os.path.join(USERDATA_DIR, "ethercat_modules"))

class RootClass:
    
    CTNChildrenTypes = [("EthercatNode",_EthercatCTN,"Ethercat Master")]
    EditorType = LibraryEditor
       
    
    def __init__(self):
        self.ModulesLibrary = None
        self.LoadModulesLibrary()
    
    def GetIconName(self):
        return "Ethercat"
    
    def GetModulesLibraryPath(self, project_path=None):
        if project_path is None:
            project_path = self.CTNPath()
        return os.path.join(project_path, "modules") 
    
    def OnCTNSave(self, from_project_path=None):
        if from_project_path is not None:
            shutil.copytree(self.GetModulesLibraryPath(from_project_path),
                            self.GetModulesLibraryPath())
        return True
    
    def CTNGenerate_C(self, buildpath, locations):
        return [],"",False
    
    def LoadModulesLibrary(self):
        if self.ModulesLibrary is None:
            self.ModulesLibrary = ModulesLibrary(self.GetModulesLibraryPath(), ModulesDatabase)
        else:
            self.ModulesLibrary.LoadModulesLibrary()
    
    def GetModulesDatabaseInstance(self):
        return ModulesDatabase
    
    def GetModulesLibraryInstance(self):
        return self.ModulesLibrary
    
    def GetModulesLibrary(self, profile_filter=None):
        return self.ModulesLibrary.GetModulesLibrary(profile_filter)
    
    def GetVendors(self):
        return self.ModulesLibrary.GetVendors()
    
    def GetModuleInfos(self, module_infos):
        return self.ModulesLibrary.GetModuleInfos(module_infos)