etherlab/etherlab.py
changeset 2165 02a2b5dee5e3
parent 2162 43ab74687f45
child 2355 fec77f2b9e07
child 2641 c9deff128c37
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/etherlab/etherlab.py	Sat Jun 23 09:08:13 2018 +0200
@@ -0,0 +1,433 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# This file is part of Beremiz
+#
+# Copyright (C) 2011-2014: Laurent BESSARD, Edouard TISSERANT
+#                          RTES Lab : CRKim, JBLee, youcu
+#                          Higen Motor : Donggu Kang
+#
+# See COPYING file for copyrights details.
+
+import os, shutil
+from lxml import etree
+
+import wx
+import csv
+
+from xmlclass import *
+
+from ConfigTreeNode import ConfigTreeNode, XSDSchemaErrorMessage
+from PLCControler import UndoBuffer, LOCATION_CONFNODE, LOCATION_MODULE, LOCATION_GROUP, LOCATION_VAR_INPUT, LOCATION_VAR_OUTPUT, LOCATION_VAR_MEMORY
+
+from EthercatSlave import ExtractHexDecValue, ExtractName
+from EthercatMaster import _EthercatCTN
+from ConfigEditor import LibraryEditor, ETHERCAT_VENDOR, ETHERCAT_GROUP, ETHERCAT_DEVICE
+
+ScriptDirectory = os.path.split(os.path.realpath(__file__))[0]
+
+#--------------------------------------------------
+#                 Ethercat ConfNode
+#--------------------------------------------------
+
+EtherCATInfoParser = GenerateParserFromXSD(os.path.join(os.path.dirname(__file__), "EtherCATInfo.xsd")) 
+EtherCATInfo_XPath = lambda xpath: etree.XPath(xpath)
+
+def HexDecValue(context, *args):
+    return str(ExtractHexDecValue(args[0][0]))
+
+def EntryName(context, *args):
+    return ExtractName(args[0], 
+        args[1][0] if len(args) > 1 else None)
+
+ENTRY_INFOS_KEYS = [
+    ("Index", lambda x: "#x%4.4X" % int(x), "#x0000"),
+    ("SubIndex", str, "0"),
+    ("Name", str, ""),
+    ("Type", str, ""),
+    ("BitSize", int, 0),
+    ("Access", str, ""),
+    ("PDOMapping", str, ""),
+    ("PDO index", str, ""),
+    ("PDO name", str, ""),
+    ("PDO type", str, "")]
+
+class EntryListFactory:
+
+    def __init__(self, entries):
+        self.Entries = entries
+    
+    def AddEntry(self, context, *args):
+        index, subindex = map(lambda x: int(x[0]), args[:2])
+        new_entry_infos = {
+            key: translate(arg[0]) if len(arg) > 0 else default
+            for (key, translate, default), arg
+            in zip(ENTRY_INFOS_KEYS, args)}
+        
+        if (index, subindex) != (0, 0):
+            entry_infos = self.Entries.get((index, subindex))
+            if entry_infos is not None:
+                for param in ["PDO index", "PDO name", "PDO type"]:
+                    value = new_entry_infos.get(param)
+                    if value is not None:
+                        entry_infos[param] = value
+            else:
+                self.Entries[(index, subindex)] = new_entry_infos
+
+entries_list_xslt = etree.parse(
+    os.path.join(ScriptDirectory, "entries_list.xslt"))
+
+cls = EtherCATInfoParser.GetElementClass("DeviceType")
+if cls:
+    
+    profile_numbers_xpath = EtherCATInfo_XPath("Profile/ProfileNo")
+    def GetProfileNumbers(self):
+        return [number.text for number in profile_numbers_xpath(self)]
+    setattr(cls, "GetProfileNumbers", GetProfileNumbers)
+    
+    def getCoE(self):
+        mailbox = self.getMailbox()
+        if mailbox is not None:
+            return mailbox.getCoE()
+        return None
+    setattr(cls, "getCoE", getCoE)
+
+    def GetEntriesList(self, limits=None):
+        entries = {}
+        
+        factory = EntryListFactory(entries)
+        
+        entries_list_xslt_tree = etree.XSLT(
+            entries_list_xslt, extensions = {
+                ("entries_list_ns", "AddEntry"): factory.AddEntry,
+                ("entries_list_ns", "HexDecValue"): HexDecValue,
+                ("entries_list_ns", "EntryName"): EntryName})
+        entries_list_xslt_tree(self, **dict(zip(
+            ["min_index", "max_index"], 
+            map(lambda x: etree.XSLT.strparam(str(x)),
+                limits if limits is not None else [0x0000, 0xFFFF])
+            )))
+        
+        return entries
+    setattr(cls, "GetEntriesList", GetEntriesList)
+
+    def GetSyncManagers(self):
+        sync_managers = []
+        for sync_manager in self.getSm():
+            sync_manager_infos = {}
+            for name, value in [("Name", sync_manager.getcontent()),
+                                ("Start Address", sync_manager.getStartAddress()),
+                                ("Default Size", sync_manager.getDefaultSize()),
+                                ("Control Byte", sync_manager.getControlByte()),
+                                ("Enable", sync_manager.getEnable())]:
+                if value is None:
+                    value =""
+                sync_manager_infos[name] = value
+            sync_managers.append(sync_manager_infos)
+        return sync_managers
+    setattr(cls, "GetSyncManagers", GetSyncManagers)
+
+def GroupItemCompare(x, y):
+    if x["type"] == y["type"]:
+        if x["type"] == ETHERCAT_GROUP:
+            return cmp(x["order"], y["order"])
+        else:
+            return cmp(x["name"], y["name"])
+    elif x["type"] == ETHERCAT_GROUP:
+        return -1
+    return 1
+
+def SortGroupItems(group):
+    for item in group["children"]:
+        if item["type"] == ETHERCAT_GROUP:
+            SortGroupItems(item)
+    group["children"].sort(GroupItemCompare)
+
+class ModulesLibrary:
+
+    MODULES_EXTRA_PARAMS = [
+        ("pdo_alignment", {
+            "column_label": _("PDO alignment"), 
+            "column_size": 150,
+            "default": 8,
+            "description": _(
+"Minimal size in bits between 2 pdo entries")}),
+        ("max_pdo_size", {
+            "column_label": _("Max entries by PDO"),
+            "column_size": 150,
+            "default": 255,
+            "description": _(
+"""Maximal number of entries mapped in a PDO
+including empty entries used for PDO alignment""")}),
+        ("add_pdo", {
+            "column_label": _("Creating new PDO"), 
+            "column_size": 150,
+            "default": 0,
+            "description": _(
+"""Adding a PDO not defined in default configuration
+for mapping needed location variables
+(1 if possible)""")})
+    ]
+    
+    def __init__(self, path, parent_library=None):
+        self.Path = path
+        if not os.path.exists(self.Path):
+            os.makedirs(self.Path)
+        self.ParentLibrary = parent_library
+        
+        if parent_library is not None:
+            self.LoadModules()
+        else:
+            self.Library = None
+        self.LoadModulesExtraParams()
+    
+    def GetPath(self):
+        return self.Path
+    
+    def GetModulesExtraParamsFilePath(self):
+        return os.path.join(self.Path, "modules_extra_params.cfg")
+    
+    groups_xpath = EtherCATInfo_XPath("Descriptions/Groups/Group")
+    devices_xpath = EtherCATInfo_XPath("Descriptions/Devices/Device")
+    def LoadModules(self):
+        self.Library = {}
+        
+        files = os.listdir(self.Path)
+        for file in files:
+            filepath = os.path.join(self.Path, file)
+            if os.path.isfile(filepath) and os.path.splitext(filepath)[-1] == ".xml":
+                self.modules_infos = None
+                
+                xmlfile = open(filepath, 'r')
+                try:
+                    self.modules_infos, error = EtherCATInfoParser.LoadXMLString(xmlfile.read())
+                    if error is not None:
+                        self.GetCTRoot().logger.write_warning(
+                            XSDSchemaErrorMessage % (filepath + error))
+                except Exception, exc:
+                    self.modules_infos, error = None, unicode(exc)
+                xmlfile.close()
+                
+                if self.modules_infos is not None:
+                    vendor = self.modules_infos.getVendor()
+                    
+                    vendor_category = self.Library.setdefault(
+                        ExtractHexDecValue(vendor.getId()), 
+                        {"name": ExtractName(vendor.getName(), _("Miscellaneous")), 
+                         "groups": {}})
+                    
+                    for group in self.groups_xpath(self.modules_infos):
+                        group_type = group.getType()
+                        
+                        vendor_category["groups"].setdefault(group_type, 
+                            {"name": ExtractName(group.getName(), group_type), 
+                             "parent": group.getParentGroup(),
+                             "order": group.getSortOrder(), 
+                             #"value": group.getcontent()["value"],
+                             "devices": []})
+                    
+                    for device in self.devices_xpath(self.modules_infos):
+                        device_group = device.getGroupType()
+                        if not vendor_category["groups"].has_key(device_group):
+                            raise ValueError, "Not such group \"%\"" % device_group
+                        vendor_category["groups"][device_group]["devices"].append(
+                            (device.getType().getcontent(), device))
+                
+                else:
+                        
+                    self.GetCTRoot().logger.write_error(
+                        _("Couldn't load %s XML file:\n%s") % (filepath, error))
+                
+        return self.Library
+
+    def GetModulesLibrary(self, profile_filter=None):
+        if self.Library is None:
+            self.LoadModules()
+        library = []
+        for vendor_id, vendor in self.Library.iteritems():
+            groups = []
+            children_dict = {}
+            for group_type, group in vendor["groups"].iteritems():
+                group_infos = {"name": group["name"],
+                               "order": group["order"],
+                               "type": ETHERCAT_GROUP,
+                               "infos": None,
+                               "children": children_dict.setdefault(group_type, [])}
+                device_dict = {}
+                for device_type, device in group["devices"]:
+                    if profile_filter is None or profile_filter in device.GetProfileNumbers():
+                        product_code = device.getType().getProductCode()
+                        revision_number = device.getType().getRevisionNo()
+                        module_infos = {"device_type": device_type,
+                                        "vendor": vendor_id,
+                                        "product_code": product_code,
+                                        "revision_number": revision_number}
+                        module_infos.update(self.GetModuleExtraParams(vendor_id, product_code, revision_number))
+                        device_infos = {"name": ExtractName(device.getName()),
+                                        "type": ETHERCAT_DEVICE,
+                                        "infos": module_infos,
+                                        "children": []}
+                        group_infos["children"].append(device_infos)
+                        device_type_occurrences = device_dict.setdefault(device_type, [])
+                        device_type_occurrences.append(device_infos)
+                for device_type_occurrences in device_dict.itervalues():
+                    if len(device_type_occurrences) > 1:
+                        for occurrence in device_type_occurrences:
+                            occurrence["name"] += _(" (rev. %s)") % occurrence["infos"]["revision_number"]
+                if len(group_infos["children"]) > 0:
+                    if group["parent"] is not None:
+                        parent_children = children_dict.setdefault(group["parent"], [])
+                        parent_children.append(group_infos)
+                    else:
+                        groups.append(group_infos)
+            if len(groups) > 0:
+                library.append({"name": vendor["name"],
+                                "type": ETHERCAT_VENDOR,
+                                "infos": None,
+                                "children": groups})
+        library.sort(lambda x, y: cmp(x["name"], y["name"]))
+        return library
+
+    def GetVendors(self):
+        return [(vendor_id, vendor["name"]) for vendor_id, vendor in self.Library.items()]
+    
+    def GetModuleInfos(self, module_infos):
+        vendor = ExtractHexDecValue(module_infos["vendor"])
+        vendor_infos = self.Library.get(vendor)
+        if vendor_infos is not None:
+            for group_name, group_infos in vendor_infos["groups"].iteritems():
+                for device_type, device_infos in group_infos["devices"]:
+                    product_code = ExtractHexDecValue(device_infos.getType().getProductCode())
+                    revision_number = ExtractHexDecValue(device_infos.getType().getRevisionNo())
+                    if (product_code == ExtractHexDecValue(module_infos["product_code"]) and
+                        revision_number == ExtractHexDecValue(module_infos["revision_number"])):
+                        self.cntdevice = device_infos 
+                        self.cntdeviceType = device_type  
+                        return device_infos, self.GetModuleExtraParams(vendor, product_code, revision_number)
+        return None, None
+    
+    def ImportModuleLibrary(self, filepath):
+        if os.path.isfile(filepath):
+            shutil.copy(filepath, self.Path)
+            self.LoadModules()
+            return True
+        return False
+    
+    def LoadModulesExtraParams(self):
+        self.ModulesExtraParams = {}
+        
+        csvfile_path = self.GetModulesExtraParamsFilePath()
+        if os.path.exists(csvfile_path):
+            csvfile = open(csvfile_path, "rb")
+            sample = csvfile.read(1024)
+            csvfile.seek(0)
+            dialect = csv.Sniffer().sniff(sample)
+            has_header = csv.Sniffer().has_header(sample)
+            reader = csv.reader(csvfile, dialect)
+            for row in reader:
+                if has_header:
+                    has_header = False
+                else:
+                    params_values = {}
+                    for (param, param_infos), value in zip(
+                        self.MODULES_EXTRA_PARAMS, row[3:]):
+                        if value != "":
+                            params_values[param] = int(value)
+                    self.ModulesExtraParams[
+                        tuple(map(int, row[:3]))] = params_values
+            csvfile.close()
+    
+    def SaveModulesExtraParams(self):
+        csvfile = open(self.GetModulesExtraParamsFilePath(), "wb")
+        extra_params = [param for param, params_infos in self.MODULES_EXTRA_PARAMS]
+        writer = csv.writer(csvfile, delimiter=';')
+        writer.writerow(['Vendor', 'product_code', 'revision_number'] + extra_params)
+        for (vendor, product_code, revision_number), module_extra_params in self.ModulesExtraParams.iteritems():
+            writer.writerow([vendor, product_code, revision_number] + 
+                            [module_extra_params.get(param, '') 
+                             for param in extra_params])
+        csvfile.close()
+    
+    def SetModuleExtraParam(self, vendor, product_code, revision_number, param, value):
+        vendor = ExtractHexDecValue(vendor)
+        product_code = ExtractHexDecValue(product_code)
+        revision_number = ExtractHexDecValue(revision_number)
+        
+        module_infos = (vendor, product_code, revision_number)
+        self.ModulesExtraParams.setdefault(module_infos, {})
+        self.ModulesExtraParams[module_infos][param] = value
+        
+        self.SaveModulesExtraParams()
+    
+    def GetModuleExtraParams(self, vendor, product_code, revision_number):
+        vendor = ExtractHexDecValue(vendor)
+        product_code = ExtractHexDecValue(product_code)
+        revision_number = ExtractHexDecValue(revision_number)
+        
+        if self.ParentLibrary is not None:
+            extra_params = self.ParentLibrary.GetModuleExtraParams(vendor, product_code, revision_number)
+        else:
+            extra_params = {}
+        
+        extra_params.update(self.ModulesExtraParams.get((vendor, product_code, revision_number), {}))
+        
+        for param, param_infos in self.MODULES_EXTRA_PARAMS:
+            extra_params.setdefault(param, param_infos["default"])
+        
+        return extra_params
+
+USERDATA_DIR = wx.StandardPaths.Get().GetUserDataDir()
+if wx.Platform != '__WXMSW__':
+    USERDATA_DIR += '_files'
+
+ModulesDatabase = ModulesLibrary(
+    os.path.join(USERDATA_DIR, "ethercat_modules"))
+
+class RootClass:
+    
+    CTNChildrenTypes = [("EthercatNode",_EthercatCTN,"Ethercat Master")]
+    EditorType = LibraryEditor
+       
+    
+    def __init__(self):
+        self.ModulesLibrary = None
+        self.LoadModulesLibrary()
+    
+    def GetIconName(self):
+        return "Ethercat"
+    
+    def GetModulesLibraryPath(self, project_path=None):
+        if project_path is None:
+            project_path = self.CTNPath()
+        return os.path.join(project_path, "modules") 
+    
+    def OnCTNSave(self, from_project_path=None):
+        if from_project_path is not None:
+            shutil.copytree(self.GetModulesLibraryPath(from_project_path),
+                            self.GetModulesLibraryPath())
+        return True
+    
+    def CTNGenerate_C(self, buildpath, locations):
+        return [],"",False
+    
+    def LoadModulesLibrary(self):
+        if self.ModulesLibrary is None:
+            self.ModulesLibrary = ModulesLibrary(self.GetModulesLibraryPath(), ModulesDatabase)
+        else:
+            self.ModulesLibrary.LoadModulesLibrary()
+    
+    def GetModulesDatabaseInstance(self):
+        return ModulesDatabase
+    
+    def GetModulesLibraryInstance(self):
+        return self.ModulesLibrary
+    
+    def GetModulesLibrary(self, profile_filter=None):
+        return self.ModulesLibrary.GetModulesLibrary(profile_filter)
+    
+    def GetVendors(self):
+        return self.ModulesLibrary.GetVendors()
+    
+    def GetModuleInfos(self, module_infos):
+        return self.ModulesLibrary.GetModuleInfos(module_infos)
+