diff -r bcf346f558bd -r 02a2b5dee5e3 etherlab/etherlab.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/etherlab/etherlab.py Sat Jun 23 09:08:13 2018 +0200 @@ -0,0 +1,433 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This file is part of Beremiz +# +# Copyright (C) 2011-2014: Laurent BESSARD, Edouard TISSERANT +# RTES Lab : CRKim, JBLee, youcu +# Higen Motor : Donggu Kang +# +# See COPYING file for copyrights details. + +import os, shutil +from lxml import etree + +import wx +import csv + +from xmlclass import * + +from ConfigTreeNode import ConfigTreeNode, XSDSchemaErrorMessage +from PLCControler import UndoBuffer, LOCATION_CONFNODE, LOCATION_MODULE, LOCATION_GROUP, LOCATION_VAR_INPUT, LOCATION_VAR_OUTPUT, LOCATION_VAR_MEMORY + +from EthercatSlave import ExtractHexDecValue, ExtractName +from EthercatMaster import _EthercatCTN +from ConfigEditor import LibraryEditor, ETHERCAT_VENDOR, ETHERCAT_GROUP, ETHERCAT_DEVICE + +ScriptDirectory = os.path.split(os.path.realpath(__file__))[0] + +#-------------------------------------------------- +# Ethercat ConfNode +#-------------------------------------------------- + +EtherCATInfoParser = GenerateParserFromXSD(os.path.join(os.path.dirname(__file__), "EtherCATInfo.xsd")) +EtherCATInfo_XPath = lambda xpath: etree.XPath(xpath) + +def HexDecValue(context, *args): + return str(ExtractHexDecValue(args[0][0])) + +def EntryName(context, *args): + return ExtractName(args[0], + args[1][0] if len(args) > 1 else None) + +ENTRY_INFOS_KEYS = [ + ("Index", lambda x: "#x%4.4X" % int(x), "#x0000"), + ("SubIndex", str, "0"), + ("Name", str, ""), + ("Type", str, ""), + ("BitSize", int, 0), + ("Access", str, ""), + ("PDOMapping", str, ""), + ("PDO index", str, ""), + ("PDO name", str, ""), + ("PDO type", str, "")] + +class EntryListFactory: + + def __init__(self, entries): + self.Entries = entries + + def AddEntry(self, context, *args): + index, subindex = map(lambda x: int(x[0]), args[:2]) + new_entry_infos = { + key: translate(arg[0]) if len(arg) > 0 else default + for (key, translate, default), arg + in zip(ENTRY_INFOS_KEYS, args)} + + if (index, subindex) != (0, 0): + entry_infos = self.Entries.get((index, subindex)) + if entry_infos is not None: + for param in ["PDO index", "PDO name", "PDO type"]: + value = new_entry_infos.get(param) + if value is not None: + entry_infos[param] = value + else: + self.Entries[(index, subindex)] = new_entry_infos + +entries_list_xslt = etree.parse( + os.path.join(ScriptDirectory, "entries_list.xslt")) + +cls = EtherCATInfoParser.GetElementClass("DeviceType") +if cls: + + profile_numbers_xpath = EtherCATInfo_XPath("Profile/ProfileNo") + def GetProfileNumbers(self): + return [number.text for number in profile_numbers_xpath(self)] + setattr(cls, "GetProfileNumbers", GetProfileNumbers) + + def getCoE(self): + mailbox = self.getMailbox() + if mailbox is not None: + return mailbox.getCoE() + return None + setattr(cls, "getCoE", getCoE) + + def GetEntriesList(self, limits=None): + entries = {} + + factory = EntryListFactory(entries) + + entries_list_xslt_tree = etree.XSLT( + entries_list_xslt, extensions = { + ("entries_list_ns", "AddEntry"): factory.AddEntry, + ("entries_list_ns", "HexDecValue"): HexDecValue, + ("entries_list_ns", "EntryName"): EntryName}) + entries_list_xslt_tree(self, **dict(zip( + ["min_index", "max_index"], + map(lambda x: etree.XSLT.strparam(str(x)), + limits if limits is not None else [0x0000, 0xFFFF]) + ))) + + return entries + setattr(cls, "GetEntriesList", GetEntriesList) + + def GetSyncManagers(self): + sync_managers = [] + for sync_manager in self.getSm(): + sync_manager_infos = {} + for name, value in [("Name", sync_manager.getcontent()), + ("Start Address", sync_manager.getStartAddress()), + ("Default Size", sync_manager.getDefaultSize()), + ("Control Byte", sync_manager.getControlByte()), + ("Enable", sync_manager.getEnable())]: + if value is None: + value ="" + sync_manager_infos[name] = value + sync_managers.append(sync_manager_infos) + return sync_managers + setattr(cls, "GetSyncManagers", GetSyncManagers) + +def GroupItemCompare(x, y): + if x["type"] == y["type"]: + if x["type"] == ETHERCAT_GROUP: + return cmp(x["order"], y["order"]) + else: + return cmp(x["name"], y["name"]) + elif x["type"] == ETHERCAT_GROUP: + return -1 + return 1 + +def SortGroupItems(group): + for item in group["children"]: + if item["type"] == ETHERCAT_GROUP: + SortGroupItems(item) + group["children"].sort(GroupItemCompare) + +class ModulesLibrary: + + MODULES_EXTRA_PARAMS = [ + ("pdo_alignment", { + "column_label": _("PDO alignment"), + "column_size": 150, + "default": 8, + "description": _( +"Minimal size in bits between 2 pdo entries")}), + ("max_pdo_size", { + "column_label": _("Max entries by PDO"), + "column_size": 150, + "default": 255, + "description": _( +"""Maximal number of entries mapped in a PDO +including empty entries used for PDO alignment""")}), + ("add_pdo", { + "column_label": _("Creating new PDO"), + "column_size": 150, + "default": 0, + "description": _( +"""Adding a PDO not defined in default configuration +for mapping needed location variables +(1 if possible)""")}) + ] + + def __init__(self, path, parent_library=None): + self.Path = path + if not os.path.exists(self.Path): + os.makedirs(self.Path) + self.ParentLibrary = parent_library + + if parent_library is not None: + self.LoadModules() + else: + self.Library = None + self.LoadModulesExtraParams() + + def GetPath(self): + return self.Path + + def GetModulesExtraParamsFilePath(self): + return os.path.join(self.Path, "modules_extra_params.cfg") + + groups_xpath = EtherCATInfo_XPath("Descriptions/Groups/Group") + devices_xpath = EtherCATInfo_XPath("Descriptions/Devices/Device") + def LoadModules(self): + self.Library = {} + + files = os.listdir(self.Path) + for file in files: + filepath = os.path.join(self.Path, file) + if os.path.isfile(filepath) and os.path.splitext(filepath)[-1] == ".xml": + self.modules_infos = None + + xmlfile = open(filepath, 'r') + try: + self.modules_infos, error = EtherCATInfoParser.LoadXMLString(xmlfile.read()) + if error is not None: + self.GetCTRoot().logger.write_warning( + XSDSchemaErrorMessage % (filepath + error)) + except Exception, exc: + self.modules_infos, error = None, unicode(exc) + xmlfile.close() + + if self.modules_infos is not None: + vendor = self.modules_infos.getVendor() + + vendor_category = self.Library.setdefault( + ExtractHexDecValue(vendor.getId()), + {"name": ExtractName(vendor.getName(), _("Miscellaneous")), + "groups": {}}) + + for group in self.groups_xpath(self.modules_infos): + group_type = group.getType() + + vendor_category["groups"].setdefault(group_type, + {"name": ExtractName(group.getName(), group_type), + "parent": group.getParentGroup(), + "order": group.getSortOrder(), + #"value": group.getcontent()["value"], + "devices": []}) + + for device in self.devices_xpath(self.modules_infos): + device_group = device.getGroupType() + if not vendor_category["groups"].has_key(device_group): + raise ValueError, "Not such group \"%\"" % device_group + vendor_category["groups"][device_group]["devices"].append( + (device.getType().getcontent(), device)) + + else: + + self.GetCTRoot().logger.write_error( + _("Couldn't load %s XML file:\n%s") % (filepath, error)) + + return self.Library + + def GetModulesLibrary(self, profile_filter=None): + if self.Library is None: + self.LoadModules() + library = [] + for vendor_id, vendor in self.Library.iteritems(): + groups = [] + children_dict = {} + for group_type, group in vendor["groups"].iteritems(): + group_infos = {"name": group["name"], + "order": group["order"], + "type": ETHERCAT_GROUP, + "infos": None, + "children": children_dict.setdefault(group_type, [])} + device_dict = {} + for device_type, device in group["devices"]: + if profile_filter is None or profile_filter in device.GetProfileNumbers(): + product_code = device.getType().getProductCode() + revision_number = device.getType().getRevisionNo() + module_infos = {"device_type": device_type, + "vendor": vendor_id, + "product_code": product_code, + "revision_number": revision_number} + module_infos.update(self.GetModuleExtraParams(vendor_id, product_code, revision_number)) + device_infos = {"name": ExtractName(device.getName()), + "type": ETHERCAT_DEVICE, + "infos": module_infos, + "children": []} + group_infos["children"].append(device_infos) + device_type_occurrences = device_dict.setdefault(device_type, []) + device_type_occurrences.append(device_infos) + for device_type_occurrences in device_dict.itervalues(): + if len(device_type_occurrences) > 1: + for occurrence in device_type_occurrences: + occurrence["name"] += _(" (rev. %s)") % occurrence["infos"]["revision_number"] + if len(group_infos["children"]) > 0: + if group["parent"] is not None: + parent_children = children_dict.setdefault(group["parent"], []) + parent_children.append(group_infos) + else: + groups.append(group_infos) + if len(groups) > 0: + library.append({"name": vendor["name"], + "type": ETHERCAT_VENDOR, + "infos": None, + "children": groups}) + library.sort(lambda x, y: cmp(x["name"], y["name"])) + return library + + def GetVendors(self): + return [(vendor_id, vendor["name"]) for vendor_id, vendor in self.Library.items()] + + def GetModuleInfos(self, module_infos): + vendor = ExtractHexDecValue(module_infos["vendor"]) + vendor_infos = self.Library.get(vendor) + if vendor_infos is not None: + for group_name, group_infos in vendor_infos["groups"].iteritems(): + for device_type, device_infos in group_infos["devices"]: + product_code = ExtractHexDecValue(device_infos.getType().getProductCode()) + revision_number = ExtractHexDecValue(device_infos.getType().getRevisionNo()) + if (product_code == ExtractHexDecValue(module_infos["product_code"]) and + revision_number == ExtractHexDecValue(module_infos["revision_number"])): + self.cntdevice = device_infos + self.cntdeviceType = device_type + return device_infos, self.GetModuleExtraParams(vendor, product_code, revision_number) + return None, None + + def ImportModuleLibrary(self, filepath): + if os.path.isfile(filepath): + shutil.copy(filepath, self.Path) + self.LoadModules() + return True + return False + + def LoadModulesExtraParams(self): + self.ModulesExtraParams = {} + + csvfile_path = self.GetModulesExtraParamsFilePath() + if os.path.exists(csvfile_path): + csvfile = open(csvfile_path, "rb") + sample = csvfile.read(1024) + csvfile.seek(0) + dialect = csv.Sniffer().sniff(sample) + has_header = csv.Sniffer().has_header(sample) + reader = csv.reader(csvfile, dialect) + for row in reader: + if has_header: + has_header = False + else: + params_values = {} + for (param, param_infos), value in zip( + self.MODULES_EXTRA_PARAMS, row[3:]): + if value != "": + params_values[param] = int(value) + self.ModulesExtraParams[ + tuple(map(int, row[:3]))] = params_values + csvfile.close() + + def SaveModulesExtraParams(self): + csvfile = open(self.GetModulesExtraParamsFilePath(), "wb") + extra_params = [param for param, params_infos in self.MODULES_EXTRA_PARAMS] + writer = csv.writer(csvfile, delimiter=';') + writer.writerow(['Vendor', 'product_code', 'revision_number'] + extra_params) + for (vendor, product_code, revision_number), module_extra_params in self.ModulesExtraParams.iteritems(): + writer.writerow([vendor, product_code, revision_number] + + [module_extra_params.get(param, '') + for param in extra_params]) + csvfile.close() + + def SetModuleExtraParam(self, vendor, product_code, revision_number, param, value): + vendor = ExtractHexDecValue(vendor) + product_code = ExtractHexDecValue(product_code) + revision_number = ExtractHexDecValue(revision_number) + + module_infos = (vendor, product_code, revision_number) + self.ModulesExtraParams.setdefault(module_infos, {}) + self.ModulesExtraParams[module_infos][param] = value + + self.SaveModulesExtraParams() + + def GetModuleExtraParams(self, vendor, product_code, revision_number): + vendor = ExtractHexDecValue(vendor) + product_code = ExtractHexDecValue(product_code) + revision_number = ExtractHexDecValue(revision_number) + + if self.ParentLibrary is not None: + extra_params = self.ParentLibrary.GetModuleExtraParams(vendor, product_code, revision_number) + else: + extra_params = {} + + extra_params.update(self.ModulesExtraParams.get((vendor, product_code, revision_number), {})) + + for param, param_infos in self.MODULES_EXTRA_PARAMS: + extra_params.setdefault(param, param_infos["default"]) + + return extra_params + +USERDATA_DIR = wx.StandardPaths.Get().GetUserDataDir() +if wx.Platform != '__WXMSW__': + USERDATA_DIR += '_files' + +ModulesDatabase = ModulesLibrary( + os.path.join(USERDATA_DIR, "ethercat_modules")) + +class RootClass: + + CTNChildrenTypes = [("EthercatNode",_EthercatCTN,"Ethercat Master")] + EditorType = LibraryEditor + + + def __init__(self): + self.ModulesLibrary = None + self.LoadModulesLibrary() + + def GetIconName(self): + return "Ethercat" + + def GetModulesLibraryPath(self, project_path=None): + if project_path is None: + project_path = self.CTNPath() + return os.path.join(project_path, "modules") + + def OnCTNSave(self, from_project_path=None): + if from_project_path is not None: + shutil.copytree(self.GetModulesLibraryPath(from_project_path), + self.GetModulesLibraryPath()) + return True + + def CTNGenerate_C(self, buildpath, locations): + return [],"",False + + def LoadModulesLibrary(self): + if self.ModulesLibrary is None: + self.ModulesLibrary = ModulesLibrary(self.GetModulesLibraryPath(), ModulesDatabase) + else: + self.ModulesLibrary.LoadModulesLibrary() + + def GetModulesDatabaseInstance(self): + return ModulesDatabase + + def GetModulesLibraryInstance(self): + return self.ModulesLibrary + + def GetModulesLibrary(self, profile_filter=None): + return self.ModulesLibrary.GetModulesLibrary(profile_filter) + + def GetVendors(self): + return self.ModulesLibrary.GetVendors() + + def GetModuleInfos(self, module_infos): + return self.ModulesLibrary.GetModuleInfos(module_infos) +