--- a/etherlab/etherlab.py Tue Jun 25 00:55:38 2013 +0200
+++ b/etherlab/etherlab.py Tue Sep 24 00:48:21 2013 +0200
@@ -1,5 +1,5 @@
import os, shutil
-from xml.dom import minidom
+from lxml import etree
import wx
import csv
@@ -13,81 +13,79 @@
from EthercatMaster import _EthercatCTN
from ConfigEditor import LibraryEditor, ETHERCAT_VENDOR, ETHERCAT_GROUP, ETHERCAT_DEVICE
+ScriptDirectory = os.path.split(os.path.realpath(__file__))[0]
+
#--------------------------------------------------
# Ethercat ConfNode
#--------------------------------------------------
-EtherCATInfoClasses = GenerateClassesFromXSD(os.path.join(os.path.dirname(__file__), "EtherCATInfo.xsd"))
-
-cls = EtherCATInfoClasses["EtherCATBase.xsd"].get("DictionaryType", None)
+EtherCATInfoParser = GenerateParserFromXSD(os.path.join(os.path.dirname(__file__), "EtherCATInfo.xsd"))
+EtherCATInfo_XPath = lambda xpath: etree.XPath(xpath)
+
+def extract_param(el):
+ if el.tag == "Index":
+ return "#x%4.4X" % int(el.text)
+ if el.tag == "PDOMapping":
+ if el.text is None:
+ return ""
+ return el.text.upper()
+ if el.text is None:
+ return ""
+ return el.text
+
+def extract_pdo_infos(pdo_infos):
+ return {
+ pdo_infos.tag + " " + el.tag: extract_param(el)
+ for el in pdo_infos}
+
+def HexDecValue(ctxt, values):
+ return str(ExtractHexDecValue(values[0]))
+
+def EntryName(ctxt, values):
+ default=None
+ names = []
+ for element in values:
+ if element.tag == "Default":
+ default = element.text
+ else:
+ names.append(element)
+ return ExtractName(names, default)
+
+class AddEntry(etree.XSLTExtension):
+
+ def __init__(self, entries):
+ etree.XSLTExtension.__init__(self)
+ self.Entries = entries
+
+ def execute(self, context, self_node, input_node, output_parent):
+ infos = etree.Element('entry_infos')
+ self.process_children(context, infos)
+ index, subindex = map(
+ lambda x: int(infos.find(x).text),
+ ["Index", "SubIndex"])
+ new_entry_infos = {
+ el.tag: extract_param(el)
+ for el in infos if el.tag != "PDO"}
+ if (index, subindex) != (0, 0):
+ entry_infos = self.Entries.get((index, subindex))
+ if entry_infos is not None:
+ PDO_infos = infos.find("PDO")
+ if PDO_infos is not None:
+ entry_infos.update(extract_pdo_infos(PDO_infos))
+ else:
+ self.Entries[(index, subindex)] = new_entry_infos
+
+entries_list_xslt = etree.parse(
+ os.path.join(ScriptDirectory, "entries_list.xslt"))
+
+cls = EtherCATInfoParser.GetElementClass("DeviceType")
if cls:
- cls.loadXMLTreeArgs = None
-
- setattr(cls, "_loadXMLTree", getattr(cls, "loadXMLTree"))
-
- def loadXMLTree(self, *args):
- self.loadXMLTreeArgs = args
- setattr(cls, "loadXMLTree", loadXMLTree)
-
- def load(self):
- if self.loadXMLTreeArgs is not None:
- self._loadXMLTree(*self.loadXMLTreeArgs)
- self.loadXMLTreeArgs = None
- setattr(cls, "load", load)
-
-cls = EtherCATInfoClasses["EtherCATInfo.xsd"].get("DeviceType", None)
-if cls:
- cls.DataTypes = None
-
+
+ profile_numbers_xpath = EtherCATInfo_XPath("Profile/ProfileNo")
def GetProfileNumbers(self):
- profiles = []
-
- for profile in self.getProfile():
- profile_content = profile.getcontent()
- if profile_content is None:
- continue
-
- for content_element in profile_content["value"]:
- if content_element["name"] == "ProfileNo":
- profiles.append(content_element["value"])
-
- return profiles
+ return [number.text for number in profile_numbers_xpath(self)]
setattr(cls, "GetProfileNumbers", GetProfileNumbers)
- def GetProfileDictionaries(self):
- dictionaries = []
-
- for profile in self.getProfile():
-
- profile_content = profile.getcontent()
- if profile_content is None:
- continue
-
- for content_element in profile_content["value"]:
- if content_element["name"] == "Dictionary":
- dictionaries.append(content_element["value"])
- elif content_element["name"] == "DictionaryFile":
- raise ValueError, "DictionaryFile for defining Device Profile is not yet supported!"
-
- return dictionaries
- setattr(cls, "GetProfileDictionaries", GetProfileDictionaries)
-
- def ExtractDataTypes(self):
- self.DataTypes = {}
-
- for dictionary in self.GetProfileDictionaries():
- dictionary.load()
-
- datatypes = dictionary.getDataTypes()
- if datatypes is not None:
-
- for datatype in datatypes.getDataType():
- content = datatype.getcontent()
- if content is not None and content["name"] == "SubItem":
- self.DataTypes[datatype.getName()] = datatype
-
- setattr(cls, "ExtractDataTypes", ExtractDataTypes)
-
def getCoE(self):
mailbox = self.getMailbox()
if mailbox is not None:
@@ -96,78 +94,22 @@
setattr(cls, "getCoE", getCoE)
def GetEntriesList(self, limits=None):
- if self.DataTypes is None:
- self.ExtractDataTypes()
-
entries = {}
- for dictionary in self.GetProfileDictionaries():
- dictionary.load()
-
- for object in dictionary.getObjects().getObject():
- entry_index = object.getIndex().getcontent()
- index = ExtractHexDecValue(entry_index)
- if limits is None or limits[0] <= index <= limits[1]:
- entry_type = object.getType()
- entry_name = ExtractName(object.getName())
-
- entry_type_infos = self.DataTypes.get(entry_type, None)
- if entry_type_infos is not None:
- content = entry_type_infos.getcontent()
- for subitem in content["value"]:
- entry_subidx = subitem.getSubIdx()
- if entry_subidx is None:
- entry_subidx = "0"
- subidx = ExtractHexDecValue(entry_subidx)
- subitem_access = ""
- subitem_pdomapping = ""
- subitem_flags = subitem.getFlags()
- if subitem_flags is not None:
- access = subitem_flags.getAccess()
- if access is not None:
- subitem_access = access.getcontent()
- pdomapping = subitem_flags.getPdoMapping()
- if pdomapping is not None:
- subitem_pdomapping = pdomapping.upper()
- entries[(index, subidx)] = {
- "Index": entry_index,
- "SubIndex": entry_subidx,
- "Name": "%s - %s" %
- (entry_name.decode("utf-8"),
- ExtractName(subitem.getDisplayName(),
- subitem.getName()).decode("utf-8")),
- "Type": subitem.getType(),
- "BitSize": subitem.getBitSize(),
- "Access": subitem_access,
- "PDOMapping": subitem_pdomapping}
- else:
- entry_access = ""
- entry_pdomapping = ""
- entry_flags = object.getFlags()
- if entry_flags is not None:
- access = entry_flags.getAccess()
- if access is not None:
- entry_access = access.getcontent()
- pdomapping = entry_flags.getPdoMapping()
- if pdomapping is not None:
- entry_pdomapping = pdomapping.upper()
- entries[(index, 0)] = {
- "Index": entry_index,
- "SubIndex": "0",
- "Name": entry_name,
- "Type": entry_type,
- "BitSize": object.getBitSize(),
- "Access": entry_access,
- "PDOMapping": entry_pdomapping}
-
- for TxPdo in self.getTxPdo():
- ExtractPdoInfos(TxPdo, "Transmit", entries, limits)
- for RxPdo in self.getRxPdo():
- ExtractPdoInfos(RxPdo, "Receive", entries, limits)
+ entries_list_xslt_tree = etree.XSLT(
+ entries_list_xslt, extensions = {
+ ("entries_list_ns", "add_entry"): AddEntry(entries),
+ ("entries_list_ns", "HexDecValue"): HexDecValue,
+ ("entries_list_ns", "EntryName"): EntryName})
+ entries_list_xslt_tree(self, **dict(zip(
+ ["min_index", "max_index"],
+ map(lambda x: etree.XSLT.strparam(str(x)),
+ limits if limits is not None else [0x0000, 0xFFFF])
+ )))
return entries
setattr(cls, "GetEntriesList", GetEntriesList)
-
+
def GetSyncManagers(self):
sync_managers = []
for sync_manager in self.getSm():
@@ -200,38 +142,6 @@
SortGroupItems(item)
group["children"].sort(GroupItemCompare)
-def ExtractPdoInfos(pdo, pdo_type, entries, limits=None):
- pdo_index = pdo.getIndex().getcontent()
- pdo_name = ExtractName(pdo.getName())
- for pdo_entry in pdo.getEntry():
- entry_index = pdo_entry.getIndex().getcontent()
- entry_subindex = pdo_entry.getSubIndex()
- index = ExtractHexDecValue(entry_index)
- subindex = ExtractHexDecValue(entry_subindex)
-
- if limits is None or limits[0] <= index <= limits[1]:
- entry = entries.get((index, subindex), None)
- if entry is not None:
- entry["PDO index"] = pdo_index
- entry["PDO name"] = pdo_name
- entry["PDO type"] = pdo_type
- else:
- entry_type = pdo_entry.getDataType()
- if entry_type is not None:
- if pdo_type == "Transmit":
- access = "ro"
- pdomapping = "T"
- else:
- access = "wo"
- pdomapping = "R"
- entries[(index, subindex)] = {
- "Index": entry_index,
- "SubIndex": entry_subindex,
- "Name": ExtractName(pdo_entry.getName()),
- "Type": entry_type.getcontent(),
- "Access": access,
- "PDOMapping": pdomapping}
-
class ModulesLibrary:
MODULES_EXTRA_PARAMS = [
@@ -276,6 +186,8 @@
def GetModulesExtraParamsFilePath(self):
return os.path.join(self.Path, "modules_extra_params.cfg")
+ groups_xpath = EtherCATInfo_XPath("Descriptions/Groups/Group")
+ devices_xpath = EtherCATInfo_XPath("Descriptions/Devices/Device")
def LoadModules(self):
self.Library = {}
@@ -283,37 +195,40 @@
for file in files:
filepath = os.path.join(self.Path, file)
if os.path.isfile(filepath) and os.path.splitext(filepath)[-1] == ".xml":
+ self.modules_infos = None
+
xmlfile = open(filepath, 'r')
- xml_tree = minidom.parse(xmlfile)
+ try:
+ self.modules_infos = etree.fromstring(
+ xmlfile.read(), EtherCATInfoParser)
+ except:
+ pass
xmlfile.close()
- self.modules_infos = None
- for child in xml_tree.childNodes:
- if child.nodeType == xml_tree.ELEMENT_NODE and child.nodeName == "EtherCATInfo":
- self.modules_infos = EtherCATInfoClasses["EtherCATInfo.xsd"]["EtherCATInfo"]()
- self.modules_infos.loadXMLTree(child)
-
if self.modules_infos is not None:
vendor = self.modules_infos.getVendor()
- vendor_category = self.Library.setdefault(ExtractHexDecValue(vendor.getId()),
- {"name": ExtractName(vendor.getName(), _("Miscellaneous")),
- "groups": {}})
+ vendor_category = self.Library.setdefault(
+ ExtractHexDecValue(vendor.getId()),
+ {"name": ExtractName(vendor.getName(), _("Miscellaneous")),
+ "groups": {}})
- for group in self.modules_infos.getDescriptions().getGroups().getGroup():
+ for group in self.groups_xpath(self.modules_infos):
group_type = group.getType()
- vendor_category["groups"].setdefault(group_type, {"name": ExtractName(group.getName(), group_type),
- "parent": group.getParentGroup(),
- "order": group.getSortOrder(),
- "value": group.getcontent()["value"],
- "devices": []})
+ vendor_category["groups"].setdefault(group_type,
+ {"name": ExtractName(group.getName(), group_type),
+ "parent": group.getParentGroup(),
+ "order": group.getSortOrder(),
+ #"value": group.getcontent()["value"],
+ "devices": []})
- for device in self.modules_infos.getDescriptions().getDevices().getDevice():
+ for device in self.devices_xpath(self.modules_infos):
device_group = device.getGroupType()
if not vendor_category["groups"].has_key(device_group):
raise ValueError, "Not such group \"%\"" % device_group
- vendor_category["groups"][device_group]["devices"].append((device.getType().getcontent(), device))
+ vendor_category["groups"][device_group]["devices"].append(
+ (device.getType().getcontent(), device))
return self.Library
@@ -464,8 +379,8 @@
CTNChildrenTypes = [("EthercatNode",_EthercatCTN,"Ethercat Master")]
EditorType = LibraryEditor
-
-
+
+
def __init__(self):
self.ModulesLibrary = None
self.LoadModulesLibrary()