eRPC: Server closes connection on exception to prevent client to block until timeout when it happens.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of Beremiz
#
# Copyright (C) 2013: Real-Time & Embedded Systems (RTES) Lab. University of Seoul, Korea
#
# See COPYING file for copyrights details.
from __future__ import absolute_import
from __future__ import division
from builtins import str as text
import codecs
import wx
mailbox_protocols = ["AoE", "EoE", "CoE", "FoE", "SoE", "VoE"]
def ExtractHexDecValue(value):
"""
convert numerical value in string format into decimal or hex format.
@param value : hex or decimal data
@return integer data
"""
try:
return int(value)
except Exception:
pass
try:
return int(value.replace("#", "0"), 16)
except Exception:
raise ValueError(_("Invalid value for HexDecValue \"%s\"") % value)
def ExtractName(names, default=None):
"""
Extract "name" field from XML entries.
@param names : XML entry
@default : if it fails to extract from the designated XML entry, return the default value ("None").
@return default or the name extracted
"""
if len(names) == 1:
return names[0].getcontent()
else:
for name in names:
if name.getLcId() == 1033:
return name.getcontent()
return default
#--------------------------------------------------
# Remote Exec Etherlab Commands
#--------------------------------------------------
# --------------------- for master ---------------------------
MASTER_STATE = """
import commands
result = commands.getoutput("ethercat master")
returnVal =result
"""
# --------------------- for slave ----------------------------
# ethercat state -p (slave position) (state (INIT, PREOP, SAFEOP, OP))
SLAVE_STATE = """
import commands
result = commands.getoutput("ethercat state -p %d %s")
returnVal = result
"""
# ethercat slave
GET_SLAVE = """
import commands
result = commands.getoutput("ethercat slaves")
returnVal =result
"""
# ethercat xml -p (slave position)
SLAVE_XML = """
import commands
result = commands.getoutput("ethercat xml -p %d")
returnVal = result
"""
# ethercat upload -p (slave position) -t (type) (index) (sub index)
SDO_UPLOAD = """
import commands
sdo_data = []
input_data = "%s"
slave_pos = %d
command_string = ""
for sdo_token in input_data.split(","):
if len(sdo_token) > 1:
sdo_token = sdo_token.strip()
type, idx, subidx = sdo_token.split(" ")
command_string = "ethercat upload -p " + str(slave_pos) + " -t " + type + " " + idx + " " + subidx
result = commands.getoutput(command_string)
sdo_data.append(result)
returnVal =sdo_data
"""
# ethercat download -p (slave position) (main index) (sub index) (value)
SDO_DOWNLOAD = """
import commands
result = commands.getoutput("ethercat download --type %s -p %d %s %s %s")
returnVal =result
"""
# ethercat sii_read -p (slave position)
SII_READ = """
import commands
result = commands.getoutput("ethercat sii_read -p %d")
returnVal =result
"""
# ethercat reg_read -p (slave position) (address) (size)
REG_READ = """
import commands
result = commands.getoutput("ethercat reg_read -p %d %s %s")
returnVal =result
"""
# ethercat reg_read -p (slave position) (address) (size)
MULTI_REG_READ = """
import commands
output = []
addr, size = range(2)
slave_num = %d
reg_info_str = "%s"
reg_info_list = reg_info_str.split("|")
for slave_idx in range(slave_num):
for reg_info in reg_info_list:
param = reg_info.split(",")
result = commands.getoutput("ethercat reg_read -p "
+ str(slave_idx) + " "
+ param[addr] + " "
+ param[size])
output.append(str(slave_idx) + "," + param[addr] + "," + result)
returnVal = output
"""
# ethercat sii_write -p (slave position) - (contents)
SII_WRITE = """
import subprocess
process = subprocess.Popen(
["ethercat", "-f", "sii_write", "-p", "%d", "-"],
stdin=subprocess.PIPE)
process.communicate(sii_data)
returnVal = process.returncode
"""
# ethercat reg_write -p (slave position) -t (uinit16) (address) (data)
REG_WRITE = """
import commands
result = commands.getoutput("ethercat reg_write -p %d -t uint16 %s %s")
returnVal =result
"""
# ethercat rescan -p (slave position)
RESCAN = """
import commands
result = commands.getoutput("ethercat rescan -p %d")
returnVal =result
"""
# ethercat pdos
PDOS = """
import commands
result = commands.getoutput("ethercat pdos -p 0")
returnVal =result
"""
# --------------------------------------------------
# Common Method For EtherCAT Management
# --------------------------------------------------
class _CommonSlave(object):
# ----- Data Structure for ethercat management ----
SlaveState = ""
# SDO base data type for Ethercatmaster
BaseDataTypes = {
"bool": ["BOOLEAN", "BOOL", "BIT"],
"uint8": ["BYTE", "USINT", "BIT1", "BIT2", "BIT3", "BIT4", "BIT5", "BIT6",
"BIT7", "BIT8", "BITARR8", "UNSIGNED8"],
"uint16": ["BITARR16", "UNSIGNED16", "UINT"],
"uint32": ["BITARR32", "UNSIGNED24", "UINT24", "UNSIGNED32", "UDINT"],
"uint64": ["UNSINED40", "UINT40", "UNSIGNED48", "UINT48", "UNSIGNED56",
"UINT56", "UNSIGNED64", "ULINT"],
"int8": ["INTEGER8", "SINT"],
"int16": ["INTEGER16", "INT"],
"int32": ["INTEGER24", "INT24", "INTEGER32", "DINT"],
"int64": ["INTEGER40", "INT40", "INTEGER48", "INT48", "INTEGER56", "INT56",
"INTEGER64", "LINT"],
"float": ["REAL", "REAL32"],
"double": ["LREAL", "REAL64"],
"string": ["VISUBLE_STRING", "STRING(n)"],
"octet_string": ["OCTET_STRING"],
"unicode_string": ["UNICODE_STRING"]
}
# category of SDO data
DatatypeDescription, CommunicationObject, ManufacturerSpecific, \
ProfileSpecific, Reserved, AllSDOData = range(6)
# SDO data informations: index, sub-index, type, bit size, category-name
SDOVariables = []
SDOSubEntryData = []
# defalut value of SDO data in XML
# Not Used
DefaultValueDic = {}
# Flags for checking "write" permission of OD entries
CheckPREOP = False
CheckSAFEOP = False
CheckOP = False
# Save PDO Data
TxPDOInfo = []
TxPDOCategory = []
RxPDOInfo = []
RxPDOCategory = []
# Save EEPROM Data
SiiData = ""
# Save Register Data
RegData = ""
CrtRegSpec = {"ESCType": "",
"FMMUNumber": "",
"SMNumber": "",
"PDIType": ""}
def __init__(self, controler):
"""
Constructor
@param controler: _EthercatSlaveCTN class in EthercatSlave.py
"""
self.Controler = controler
self.HexDecode = codecs.getdecoder("hex_codec")
self.ClearSDODataSet()
# -------------------------------------------------------------------------------
# Used Master State
# -------------------------------------------------------------------------------
def GetMasterState(self):
"""
Execute "ethercat master" command and parse the execution result
@return MasterState
"""
# exectute "ethercat master" command
_error, return_val = self.Controler.RemoteExec(MASTER_STATE, return_val=None)
master_state = {}
# parse the reslut
for each_line in return_val.splitlines():
if len(each_line) > 0:
chunks = each_line.strip().split(':', 1)
key = chunks[0]
value = []
if len(chunks) > 1:
value = chunks[1].split()
if '(attached)' in value:
value.remove('(attached)')
master_state[key] = value
return master_state
# -------------------------------------------------------------------------------
# Used Slave State
# -------------------------------------------------------------------------------
def RequestSlaveState(self, command):
"""
Set slave state to the specified one using "ethercat states -p %d %s" command.
Command example : "ethercat states -p 0 PREOP" (target slave position and target state are given.)
@param command : target slave state
"""
_error, _return_val = self.Controler.RemoteExec(
SLAVE_STATE % (self.Controler.GetSlavePos(), command),
return_val=None)
def GetSlaveStateFromSlave(self):
"""
Get slave information using "ethercat slaves" command and store the information into internal data structure
(self.SlaveState) for "Slave State"
return_val example : 0 0:0 PREOP + EL9800 (V4.30) (PIC24, SPI, ET1100)
"""
_error, return_val = self.Controler.RemoteExec(GET_SLAVE, return_val=None)
self.SlaveState = return_val
return return_val
# -------------------------------------------------------------------------------
# Used SDO Management
# -------------------------------------------------------------------------------
def SDODownload(self, data_type, idx, sub_idx, value):
"""
Set an SDO object value to user-specified value using "ethercat download" command.
Command example : "ethercat download --type int32 -p 0 0x8020 0x12 0x00000000"
@param data_type : data type of SDO entry
@param idx : index of the SDO entry
@param sub_idx : subindex of the SDO entry
@param value : value of SDO entry
"""
valid_type = self.GetValidDataType(data_type)
_error, return_val = self.Controler.RemoteExec(
SDO_DOWNLOAD%(valid_type, self.Controler.GetSlavePos(),
idx, sub_idx, value), return_val = None)
return return_val
def BackupSDODataSet(self):
"""
Back-up current SDO entry information to restore the SDO data
in case that the user cancels SDO update operation.
"""
self.BackupDatatypeDescription = self.SaveDatatypeDescription
self.BackupCommunicationObject = self.SaveCommunicationObject
self.BackupManufacturerSpecific = self.SaveManufacturerSpecific
self.BackupProfileSpecific = self.SaveProfileSpecific
self.BackupReserved = self.SaveReserved
self.BackupAllSDOData = self.SaveAllSDOData
def ClearSDODataSet(self):
"""
Clear the specified SDO entry information.
"""
for dummy in range(6):
self.SaveSDOData.append([])
def GetAllSDOValuesFromSlave(self):
"""
Get SDO values of All SDO entries.
@return return_val: list of result of "SDO_UPLOAD"
"""
entry_infos = ""
alldata_idx = len(self.SDOVariables)
counter = 0
for category in self.SDOVariables:
counter +=1
# for avoid redundant repetition
if counter == alldata_idx:
continue
for entry in category:
valid_type = self.GetValidDataType(entry["type"])
for_command_string = "%s %s %s ," % \
(valid_type, entry["idx"], entry["subIdx"])
entry_infos += for_command_string
error, return_val = self.Controler.RemoteExec(SDO_UPLOAD%(entry_infos, self.Controler.GetSlavePos()), return_val = None)
return return_val
def GetSDOValuesFromSlave(self, entries_info):
"""
Get SDO values of some SDO entries.
@param entries_info: dictionary of SDO entries that is wanted to know the value.
@return return_val: list of result of "SDO_UPLOAD"
"""
entry_infos = ""
entries_info_list = entries_info.items()
entries_info_list.sort()
for (idx, subIdx), entry in entries_info_list:
valid_type = self.GetValidDataType(entry["type"])
for_command_string = "%s %s %s ," % \
(valid_type, str(idx), str(subIdx))
entry_infos += for_command_string
error, return_val = self.Controler.RemoteExec(SDO_UPLOAD%(entry_infos, self.Controler.GetSlavePos()), return_val = None)
return return_val
def ExtractObjects(self):
"""
Extract object type items from imported ESI xml.
And they are stuctured as dictionary.
@return objects: dictionary of objects
"""
objects = {}
slave = self.Controler.CTNParent.GetSlave(self.Controler.GetSlavePos())
type_infos = slave.getType()
device, alignment = self.Controler.CTNParent.GetModuleInfos(type_infos)
if device is not None :
for dictionary in device.GetProfileDictionaries():
dictionary.load()
for object in dictionary.getObjects().getObject():
object_index = ExtractHexDecValue(object.getIndex().getcontent())
objects[(object_index)] = object
return objects
def ExtractAllDataTypes(self):
"""
Extract all data types from imported ESI xml.
@return dataTypes: dictionary of datatypes
"""
dataTypes = {}
slave = self.Controler.CTNParent.GetSlave(self.Controler.GetSlavePos())
type_infos = slave.getType()
device, alignment = self.Controler.CTNParent.GetModuleInfos(type_infos)
for dictionary in device.GetProfileDictionaries():
dictionary.load()
datatypes = dictionary.getDataTypes()
if datatypes is not None:
for datatype in datatypes.getDataType():
dataTypes[datatype.getName()] = datatype
return dataTypes
def IsBaseDataType(self, datatype):
"""
Check if the datatype is a base data type.
@return baseTypeFlag: true if datatype is a base data type, unless false
"""
baseTypeFlag = False
for baseDataTypeList in self.BaseDataTypes.values():
if datatype in baseDataTypeList:
baseTypeFlag = True
break
return baseTypeFlag
def GetBaseDataType(self, datatype):
"""
Get a base data type corresponding the datatype.
@param datatype: Some data type (string format)
@return base data type
"""
if self.IsBaseDataType(datatype):
return datatype
elif not datatype.find("STRING") == -1:
return datatype
else:
datatypes = self.ExtractAllDataTypes()
base_datatype = datatypes[datatype].getBaseType()
return self.GetBaseDataType(base_datatype)
def GetValidDataType(self, datatype):
"""
Convert the datatype into a data type that is possible to download/upload
in etherlab master stack.
@param datatype: Some data type (string format)
@return base_type: vaild data type
"""
base_type = self.GetBaseDataType(datatype)
if re.match("STRING\([0-9]*\)", datatype) is not None:
return "string"
else:
for key, value in self.BaseDataTypes.items():
if base_type in value:
return key
return base_type
# Not Used
def GetAllEntriesList(self):
"""
Get All entries information that includes index, sub-index, name,
type, bit size, PDO mapping, and default value.
@return self.entries: dictionary of entry
"""
slave = self.Controler.CTNParent.GetSlave(self.Controler.GetSlavePos())
type_infos = slave.getType()
device, alignment = self.Controler.CTNParent.GetModuleInfos(type_infos)
self.entries = device.GetEntriesList()
datatypes = self.ExtractAllDataTypes()
objects = self.ExtractObjects()
entries_list = self.entries.items()
entries_list.sort()
# append sub entries
for (index, subidx), entry in entries_list:
# entry_* is string type
entry_type = entry["Type"]
entry_index = entry["Index"]
try:
object_info = objects[index].getInfo()
except:
continue
if object_info is not None:
obj_content = object_info.getcontent()
typeinfo = datatypes.get(entry_type, None)
bitsize = typeinfo.getBitSize()
type_content = typeinfo.getcontent()
# ArrayInfo type
if type_content is not None and type_content["name"] == "ArrayInfo":
for arrayinfo in type_content["value"]:
element_num = arrayinfo.getElements()
first_subidx = arrayinfo.getLBound()
for offset in range(element_num):
new_subidx = int(first_subidx) + offset
entry_subidx = hex(new_subidx)
if obj_content["value"][new_subidx]["name"] == "SubItem":
subitem = obj_content["value"][new_subidx]["value"]
subname = subitem[new_subidx].getName()
if subname is not None:
entry_name = "%s - %s" % \
(ExtractName(objects[index].getName()), subname)
else:
entry_name = ExtractName(objects[index].getName())
self.entries[(index, new_subidx)] = {
"Index": entry_index,
"SubIndex": entry_subidx,
"Name": entry_name,
"Type": typeinfo.getBaseType(),
"BitSize": str(bitsize/element_num),
"Access": entry["Access"],
"PDOMapping": entry["PDOMapping"]}
try:
value_info = subitem[new_subidx].getInfo().getcontent()\
["value"][0]["value"][0]
self.AppendDefaultValue(index, new_subidx, value_info)
except:
pass
try:
value_info = subitem[subidx].getInfo().getcontent()\
["value"][0]["value"][0]
self.AppendDefaultValue(index, subidx, value_info)
except:
pass
# EnumInfo type
elif type_content is not None and type_content["name"] == "EnumInfo":
self.entries[(index, subidx)]["EnumInfo"] = {}
for enuminfo in type_content["value"]:
text = enuminfo.getText()
enum = enuminfo.getEnum()
self.entries[(index, subidx)]["EnumInfo"][str(enum)] = text
self.entries[(index, subidx)]["DefaultValue"] = "0x00"
# another types
else:
if subidx == 0x00:
tmp_subidx = 0x00
try:
if obj_content["value"][tmp_subidx]["name"] == "SubItem":
sub_name = entry["Name"].split(" - ")[1]
for num in range(len(obj_content["value"])):
if sub_name == \
obj_content["value"][num]["value"][num].getName():
subitem_content = obj_content["value"][tmp_subidx]\
["value"][tmp_subidx]
value_info = subitem_content.getInfo().getcontent()\
["value"][0]["value"][0]
tmp_subidx += 1
break
else:
value_info = None
else:
value_info = \
obj_content["value"][tmp_subidx]["value"][tmp_subidx]
self.AppendDefaultValue(index, subidx, value_info)
except:
pass
return self.entries
# Not Used
def AppendDefaultValue(self, index, subidx, value_info=None):
"""
Get the default value from the ESI xml
@param index: entry index
@param subidx: entry sub index
@param value_info: dictionary of infomation about default value
"""
# there is not default value.
if value_info == None:
return
raw_value = value_info["value"]
# default value is hex binary type.
if value_info["name"] == "DefaultData":
raw_value_bit = list(hex(raw_value).split("0x")[1])
datatype = self.GetValidDataType(self.entries[(index, subidx)]["Type"])
if datatype is "string" or datatype is "octet_string":
if "L" in raw_value_bit:
raw_value_bit.remove("L")
default_value = "".join(raw_value_bit).decode("hex")
elif datatype is "unicode_string":
default_value = "".join(raw_value_bit).decode("hex").\
decode("utf-8")
else:
bit_num = len(raw_value_bit)
# padding
if not bit_num%2 == 0:
raw_value_bit.insert(0, "0")
default_value_bit = []
# little endian -> big endian
for num in range(bit_num):
if num%2 == 0:
default_value_bit.insert(0, raw_value_bit[num])
default_value_bit.insert(1, raw_value_bit[num+1])
default_value = "0x%s" % "".join(default_value_bit)
# default value is string type.
# this case is not tested yet.
elif value_info["name"] == "DefaultString":
default_value = raw_value
# default value is Hex or Dec type.
elif value_info["name"] == "DefaultValue":
default_value = "0x" + hex(ExtractHexDecValue(raw_value))
self.entries[(index, subidx)]["DefaultValue"] = default_value
# -------------------------------------------------------------------------------
# Used PDO Monitoring
# -------------------------------------------------------------------------------
def RequestPDOInfo(self):
"""
Load slave information from RootClass (XML data) and parse the information
(calling SlavePDOData() method).
"""
# Load slave information from ESI XML file (def EthercatMaster.py)
slave = self.Controler.CTNParent.GetSlave(self.Controler.GetSlavePos())
type_infos = slave.getType()
device, _alignment = self.Controler.CTNParent.GetModuleInfos(type_infos)
# Initialize PDO data set
self.ClearDataSet()
# if 'device' object is valid, call SavePDOData() to parse PDO data
if device is not None:
self.SavePDOData(device)
def SavePDOData(self, device):
"""
Parse PDO data and store the results in TXPDOCategory and RXPDOCategory
Tx(Rx)PDOCategory : index, name, entry number
Tx(Rx)Info : entry index, sub index, name, length, type
@param device : Slave information extracted from ESI XML file
"""
# Parsing TXPDO entries
for pdo, _pdo_info in ([(pdo, "Inputs") for pdo in device.getTxPdo()]):
# Save pdo_index, entry, and name of each entry
pdo_index = ExtractHexDecValue(pdo.getIndex().getcontent())
entries = pdo.getEntry()
pdo_name = ExtractName(pdo.getName())
excludes = pdo.getExclude()
exclude_list = []
Sm = pdo.getSm()
if excludes :
for exclude in excludes:
exclude_list.append(ExtractHexDecValue(exclude.getcontent()))
# Initialize entry number count
count = 0
# Parse entries
for entry in entries:
# Save index and subindex
index = ExtractHexDecValue(entry.getIndex().getcontent())
subindex = ExtractHexDecValue(entry.getSubIndex())
# if entry name exists, save entry data
if ExtractName(entry.getName()) is not None:
entry_infos = {
"entry_index": index,
"subindex": subindex,
"name": ExtractName(entry.getName()),
"bitlen": entry.getBitLen()
}
if entry.getDataType() is not None:
entry_infos["type"] = entry.getDataType().getcontent()
self.TxPDOInfo.append(entry_infos)
count += 1
categorys = {"pdo_index" : pdo_index, "name" : pdo_name, "sm" : Sm,
"number_of_entry" : count, "exclude_list" : exclude_list}
self.TxPDOCategory.append(categorys)
# Parsing RxPDO entries
for pdo, _pdo_info in ([(rxpdo, "Outputs") for rxpdo in device.getRxPdo()]):
# Save pdo_index, entry, and name of each entry
pdo_index = ExtractHexDecValue(pdo.getIndex().getcontent())
entries = pdo.getEntry()
pdo_name = ExtractName(pdo.getName())
excludes = pdo.getExclude()
exclude_list = []
Sm = pdo.getSm()
if excludes :
for exclude in excludes:
exclude_list.append(ExtractHexDecValue(exclude.getcontent()))
# Initialize entry number count
count = 0
# Parse entries
for entry in entries:
# Save index and subindex
index = ExtractHexDecValue(entry.getIndex().getcontent())
subindex = ExtractHexDecValue(entry.getSubIndex())
# if entry name exists, save entry data
if ExtractName(entry.getName()) is not None:
entry_infos = {
"entry_index": index,
"subindex": subindex,
"name": ExtractName(entry.getName()),
"bitlen": str(entry.getBitLen())
}
if entry.getDataType() is not None:
entry_infos["type"] = entry.getDataType().getcontent()
self.RxPDOInfo.append(entry_infos)
count += 1
categorys = {"pdo_index" : pdo_index, "name" : pdo_name, "sm" : Sm,
"number_of_entry" : count, "exclude_list" : exclude_list}
self.RxPDOCategory.append(categorys)
def GetTxPDOCategory(self):
"""
Get TxPDOCategory data structure (Meta informaton of TxPDO).
TxPDOCategorys : index, name, number of entries
@return TxPDOCategorys
"""
return self.TxPDOCategory
def GetRxPDOCategory(self):
"""
Get RxPDOCategory data structure (Meta information of RxPDO).
RxPDOCategorys : index, name, number of entries
@return RxPDOCategorys
"""
return self.RxPDOCategory
def GetTxPDOInfo(self):
"""
Get TxPDOInfo data structure (Detailed information on TxPDO entries).
TxPDOInfos : entry index, sub index, name, length, type
@return TxPDOInfos
"""
return self.TxPDOInfo
def GetRxPDOInfo(self):
"""
Get RxPDOInfo data structure (Detailed information on RxPDO entries).
RxPDOInfos : entry index, sub index, name, length, type
@return RxPDOInfos
"""
return self.RxPDOInfo
def ClearDataSet(self):
"""
Initialize PDO management data structure.
"""
self.TxPDOInfo = []
self.TxPDOCategory = []
self.RxPDOInfo = []
self.RxPDOCategory = []
# -------------------------------------------------------------------------------
# Used EEPROM Management
# -------------------------------------------------------------------------------
# Base data types in ETG2000; format = {"Name": "BitSize"}
BaseDataTypeDict = {"BOOL": "01",
"SINT": "02",
"INT": "03",
"DINT": "04",
"USINT": "05",
"UINT": "06",
"UDINT": "07",
"REAL": "08",
"INT24": "10",
"LREAL": "11",
"INT40": "12",
"INT48": "13",
"INT56": "14",
"LINT": "15",
"UINT24": "16",
"UINT40": "18",
"UINT48": "19",
"UINT56": "1a",
"ULINT": "1b",
"BITARR8": "2d",
"BITARR16": "2e",
"BITARR32": "2f",
"BIT1": "30",
"BIT2": "31",
"BIT3": "32",
"BIT4": "33",
"BIT5": "34",
"BIT6": "35",
"BIT7": "36",
"BIT8": "37"}
def GetSmartViewInfos(self):
"""
Parse XML data for "Smart View" of EEPROM contents.
@return smartview_infos : EEPROM contents dictionary
"""
smartview_infos = {"eeprom_size": 128,
"pdi_type": 0,
"device_emulation": "False",
"vendor_id": '0x00000000',
"product_code": '0x00000000',
"revision_no": '0x00000000',
"serial_no": '0x00000000',
"supported_mailbox": "",
"mailbox_bootstrapconf_outstart": '0',
"mailbox_bootstrapconf_outlength": '0',
"mailbox_bootstrapconf_instart": '0',
"mailbox_bootstrapconf_inlength": '0',
"mailbox_standardconf_outstart": '0',
"mailbox_standardconf_outlength": '0',
"mailbox_standardconf_instart": '0',
"mailbox_standardconf_inlength": '0'}
slave = self.Controler.CTNParent.GetSlave(self.Controler.GetSlavePos())
type_infos = slave.getType()
device, _alignment = self.Controler.CTNParent.GetModuleInfos(type_infos)
# 'device' represents current slave device selected by user
if device is not None:
for eeprom_element in device.getEeprom().getcontent():
# get EEPROM size; <Device>-<Eeprom>-<ByteSize>
if eeprom_element["name"] == "ByteSize":
smartview_infos["eeprom_size"] = eeprom_element
elif eeprom_element["name"] == "ConfigData":
configData_data = self.DecimalToHex(eeprom_element)
# get PDI type; <Device>-<Eeprom>-<ConfigData> address 0x00
smartview_infos["pdi_type"] = int(configData_data[0:2], 16)
# get state of device emulation; <Device>-<Eeprom>-<ConfigData> address 0x01
if "{:0>8b}".format(int(configData_data[2:4], 16))[7] == '1':
smartview_infos["device_emulation"] = "True"
elif eeprom_element["name"] == "BootStrap":
bootstrap_data = "{:0>16x}".format(eeprom_element)
# get bootstrap configuration; <Device>-<Eeprom>-<BootStrap>
for cfg, iter in [("mailbox_bootstrapconf_outstart", 0),
("mailbox_bootstrapconf_outlength", 1),
("mailbox_bootstrapconf_instart", 2),
("mailbox_bootstrapconf_inlength", 3)]:
smartview_infos[cfg] = str(int(bootstrap_data[4*iter+2:4*(iter+1)]+bootstrap_data[4*iter:4*iter+2], 16))
# get protocol (profile) types supported by mailbox; <Device>-<Mailbox>
with device.getMailbox() as mb:
if mb is not None:
for mailbox_protocol in mailbox_protocols:
if getattr(mb, "get%s" % mailbox_protocol)() is not None:
smartview_infos["supported_mailbox"] += "%s, " % mailbox_protocol
smartview_infos["supported_mailbox"] = smartview_infos["supported_mailbox"].strip(", ")
# get standard configuration of mailbox; <Device>-<Sm>
for sm_element in device.getSm():
if sm_element.getcontent() == "MBoxOut":
smartview_infos["mailbox_standardconf_outstart"] = str(ExtractHexDecValue(sm_element.getStartAddress()))
smartview_infos["mailbox_standardconf_outlength"] = str(ExtractHexDecValue(sm_element.getDefaultSize()))
elif sm_element.getcontent() == "MBoxIn":
smartview_infos["mailbox_standardconf_instart"] = str(ExtractHexDecValue(sm_element.getStartAddress()))
smartview_infos["mailbox_standardconf_inlength"] = str(ExtractHexDecValue(sm_element.getDefaultSize()))
else:
pass
# get device identity from <Device>-<Type>
# vendor ID; by default, pre-defined value in self.ModulesLibrary
# if device type in 'vendor' item equals to actual slave device type, set 'vendor_id' to vendor ID.
for vendor_id, vendor in self.Controler.CTNParent.CTNParent.ModulesLibrary.Library.iteritems():
for available_device in vendor["groups"][vendor["groups"].keys()[0]]["devices"]:
if available_device[0] == type_infos["device_type"]:
smartview_infos["vendor_id"] = "0x" + "{:0>8x}".format(vendor_id)
# product code;
if device.getType().getProductCode() is not None:
product_code = device.getType().getProductCode()
smartview_infos["product_code"] = "0x"+"{:0>8x}".format(ExtractHexDecValue(product_code))
# revision number;
if device.getType().getRevisionNo() is not None:
revision_no = device.getType().getRevisionNo()
smartview_infos["revision_no"] = "0x"+"{:0>8x}".format(ExtractHexDecValue(revision_no))
# serial number;
if device.getType().getSerialNo() is not None:
serial_no = device.getType().getSerialNo()
smartview_infos["serial_no"] = "0x"+"{:0>8x}".format(ExtractHexDecValue(serial_no))
return smartview_infos
else:
return None
def DecimalToHex(self, decnum):
"""
Convert decimal value into hexadecimal representation.
@param decnum : decimal value
@return hex_data : hexadecimal representation of input value in decimal
"""
value = "%x" % int(decnum, 16)
value_len = len(value)
if (value_len % 2) == 0:
hex_len = value_len
else:
hex_len = (value_len // 2) * 2 + 2
hex_data = ("{:0>"+str(hex_len)+"x}").format(int(decnum, 16))
return hex_data
def SiiRead(self):
"""
Get slave EEPROM contents maintained by master device using "ethercat sii_read -p %d" command.
Command example : "ethercat sii_read -p 0"
@return return_val : result of "ethercat sii_read" (binary data)
"""
_error, return_val = self.Controler.RemoteExec(SII_READ % (self.Controler.GetSlavePos()), return_val=None)
self.SiiData = return_val
return return_val
def SiiWrite(self, binary):
"""
Overwrite slave EEPROM contents using "ethercat sii_write -p %d" command.
Command example : "ethercat sii_write -p 0 - (binary contents)"
@param binary : EEPROM contents in binary data format
@return return_val : result of "ethercat sii_write" (If it succeeds, the return value is NULL.)
"""
_error, return_val = self.Controler.RemoteExec(
SII_WRITE % (self.Controler.GetSlavePos()),
return_val=None,
sii_data=binary)
return return_val
def LoadData(self):
"""
Loading data from EEPROM use Sii_Read Method
@return self.BinaryCode : slave EEPROM data in binary format (zero-padded)
"""
return_val = self.Controler.CommonMethod.SiiRead()
self.BinaryCode = return_val
self.Controler.SiiData = self.BinaryCode
# append zero-filled padding data up to EEPROM size
for dummy in range(self.SmartViewInfosFromXML["eeprom_size"] - len(self.BinaryCode)):
self.BinaryCode = self.BinaryCode + self.HexDecode('ff')[0]
return self.BinaryCode
def HexRead(self, binary):
"""
Convert binary digit representation into hexadecimal representation for "Hex View" menu.
@param binary : binary digits
@return hexCode : hexadecimal digits
@return hexview_table_row, hexview_table_col : Grid size for "Hex View" UI
"""
row_code = []
row_text = ""
row = 0
hex_code = []
hexview_table_col = 17
for index in range(0, len(binary)):
if len(binary[index]) != 1:
break
else:
digithexstr = hex(ord(binary[index]))
tempvar2 = digithexstr[2:4]
if len(tempvar2) == 1:
tempvar2 = "0" + tempvar2
row_code.append(tempvar2)
if int(digithexstr, 16) >= 32 and int(digithexstr, 16) <= 126:
row_text = row_text + chr(int(digithexstr, 16))
else:
row_text = row_text + "."
if index != 0:
if len(row_code) == (hexview_table_col - 1):
row_code.append(row_text)
hex_code.append(row_code)
row_text = ""
row_code = []
row = row + 1
hexview_table_row = row
return hex_code, hexview_table_row, hexview_table_col
def GenerateEEPROMList(self, data, direction, length):
"""
Generate EEPROM data list by reconstructing 'data' string.
example : data="12345678", direction=0, length=8 -> eeprom_list=['12', '34', '56', '78']
data="12345678", direction=1, length=8 -> eeprom_list=['78', '56', '34', '12']
@param data : string to be reconstructed
@param direction : endianness
@param length : data length
@return eeprom_list : reconstructed list data structure
"""
eeprom_list = []
if direction is 0 or 1:
for dummy in range(length//2):
if data == "":
eeprom_list.append("00")
else:
eeprom_list.append(data[direction*(length-2):direction*(length-2)+2])
data = data[(1-direction)*2:length-direction*2]
length -= 2
return eeprom_list
def XmlToEeprom(self):
"""
Extract slave EEPROM contents using slave ESI XML file.
- Mandatory parts
- String category : ExtractEEPROMStringCategory()
- General category : ExtractEEPROMGeneralCategory()
- FMMU category : ExtractEEPROMFMMUCategory
- SyncM category : ExtractEEPROMSyncMCategory()
- Tx/RxPDO category : ExtractEEPROMPDOCategory()
- DC category : ExtractEEPROMDCCategory()
@return eeprom_binary
"""
eeprom = []
data = ""
eeprom_size = 0
eeprom_binary = ""
# 'device' is the slave device of the current EtherCAT slave plugin
slave = self.Controler.CTNParent.GetSlave(self.Controler.GetSlavePos())
type_infos = slave.getType()
device, _alignment = self.Controler.CTNParent.GetModuleInfos(type_infos)
if device is not None:
# get ConfigData for EEPROM offset 0x0000-0x000d; <Device>-<Eeprom>-<ConfigData>
for eeprom_element in device.getEeprom().getcontent():
if eeprom_element["name"] == "ConfigData":
data = self.DecimalToHex(eeprom_element)
eeprom += self.GenerateEEPROMList(data, 0, 28)
# calculate CRC for EEPROM offset 0x000e-0x000f
crc = 0x48
for segment in eeprom:
for i in range(8):
bit = crc & 0x80
crc = (crc << 1) | ((int(segment, 16) >> (7 - i)) & 0x01)
if bit:
crc ^= 0x07
for dummy in range(8):
bit = crc & 0x80
crc <<= 1
if bit:
crc ^= 0x07
eeprom.append(hex(crc)[len(hex(crc))-3:len(hex(crc))-1])
eeprom.append("00")
# get VendorID for EEPROM offset 0x0010-0x0013;
data = ""
for vendor_id, vendor in self.Controler.CTNParent.CTNParent.ModulesLibrary.Library.iteritems():
for available_device in vendor["groups"][vendor["groups"].keys()[0]]["devices"]:
if available_device[0] == type_infos["device_type"]:
data = "{:0>8x}".format(vendor_id)
eeprom += self.GenerateEEPROMList(data, 1, 8)
# get Product Code for EEPROM offset 0x0014-0x0017;
data = ""
if device.getType().getProductCode() is not None:
data = "{:0>8x}".format(ExtractHexDecValue(device.getType().getProductCode()))
eeprom += self.GenerateEEPROMList(data, 1, 8)
# get Revision Number for EEPROM offset 0x0018-0x001b;
data = ""
if device.getType().getRevisionNo() is not None:
data = "{:0>8x}".format(ExtractHexDecValue(device.getType().getRevisionNo()))
eeprom += self.GenerateEEPROMList(data, 1, 8)
# get Serial Number for EEPROM 0x001c-0x001f;
data = ""
if device.getType().getSerialNo() is not None:
data = "{:0>8x}".format(ExtractHexDecValue(device.getType().getSerialNo()))
eeprom += self.GenerateEEPROMList(data, 1, 8)
# get Execution Delay for EEPROM 0x0020-0x0021; not analyzed yet
eeprom.append("00")
eeprom.append("00")
# get Port0/1 Delay for EEPROM offset 0x0022-0x0025; not analyzed yet
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
# reserved for EEPROM offset 0x0026-0x0027;
eeprom.append("00")
eeprom.append("00")
# get BootStrap for EEPROM offset 0x0028-0x002e; <Device>-<Eeprom>-<BootStrap>
data = ""
for eeprom_element in device.getEeprom().getcontent():
if eeprom_element["name"] == "BootStrap":
data = "{:0>16x}".format(int(eeprom_element,16))
eeprom += self.GenerateEEPROMList(data, 0, 16)
# get Standard Mailbox for EEPROM offset 0x0030-0x0037; <Device>-<sm>
data = ""
standard_send_mailbox_offset = None
standard_send_mailbox_size = None
standard_receive_mailbox_offset = None
standard_receive_mailbox_size = None
for sm_element in device.getSm():
if sm_element.getcontent() == "MBoxOut":
standard_receive_mailbox_offset = "{:0>4x}".format(ExtractHexDecValue(sm_element.getStartAddress()))
standard_receive_mailbox_size = "{:0>4x}".format(ExtractHexDecValue(sm_element.getDefaultSize()))
elif sm_element.getcontent() == "MBoxIn":
standard_send_mailbox_offset = "{:0>4x}".format(ExtractHexDecValue(sm_element.getStartAddress()))
standard_send_mailbox_size = "{:0>4x}".format(ExtractHexDecValue(sm_element.getDefaultSize()))
if standard_receive_mailbox_offset is None:
eeprom.append("00")
eeprom.append("00")
else:
eeprom.append(standard_receive_mailbox_offset[2:4])
eeprom.append(standard_receive_mailbox_offset[0:2])
if standard_receive_mailbox_size is None:
eeprom.append("00")
eeprom.append("00")
else:
eeprom.append(standard_receive_mailbox_size[2:4])
eeprom.append(standard_receive_mailbox_size[0:2])
if standard_send_mailbox_offset is None:
eeprom.append("00")
eeprom.append("00")
else:
eeprom.append(standard_send_mailbox_offset[2:4])
eeprom.append(standard_send_mailbox_offset[0:2])
if standard_send_mailbox_size is None:
eeprom.append("00")
eeprom.append("00")
else:
eeprom.append(standard_send_mailbox_size[2:4])
eeprom.append(standard_send_mailbox_size[0:2])
# get supported mailbox protocols for EEPROM offset 0x0038-0x0039;
data = 0
with device.getMailbox() as mb:
if mb is not None:
for bit, mbprot in enumerate(mailbox_protocols):
if getattr(mb, "get%s" % mbprot)() is not None:
data += 1 << bit
data = "{:0>4x}".format(data)
eeprom.append(data[2:4])
eeprom.append(data[0:2])
# resereved for EEPROM offset 0x003a-0x007b;
for i in range(0x007b-0x003a+0x0001):
eeprom.append("00")
# get EEPROM size for EEPROM offset 0x007c-0x007d;
# Modify by jblee because of update IDE module (minidom -> lxml)
data = ""
for eeprom_element in device.getEeprom().getchildren():
if eeprom_element.tag == "ByteSize":
eeprom_size = int(objectify.fromstring(eeprom_element.tostring()).text)
data = "{:0>4x}".format(eeprom_size/1024*8-1)
if data == "":
eeprom.append("00")
eeprom.append("00")
else:
eeprom.append(data[2:4])
eeprom.append(data[0:2])
# Version for EEPROM 0x007e-0x007f;
# According to "EtherCAT Slave Device Description(V0.3.0)"
eeprom.append("01")
eeprom.append("00")
# append String Category data
for data in self.ExtractEEPROMStringCategory(device):
eeprom.append(data)
# append General Category data
for data in self.ExtractEEPROMGeneralCategory(device):
eeprom.append(data)
# append FMMU Category data
for data in self.ExtractEEPROMFMMUCategory(device):
eeprom.append(data)
# append SyncM Category data
for data in self.ExtractEEPROMSyncMCategory(device):
eeprom.append(data)
# append TxPDO Category data
for data in self.ExtractEEPROMPDOCategory(device, "TxPdo"):
eeprom.append(data)
# append RxPDO Category data
for data in self.ExtractEEPROMPDOCategory(device, "RxPdo"):
eeprom.append(data)
# append DC Category data
for data in self.ExtractEEPROMDCCategory(device):
eeprom.append(data)
# append padding
padding = eeprom_size-len(eeprom)
for i in range(padding):
eeprom.append("ff")
# convert binary code
for index in range(eeprom_size):
eeprom_binary = eeprom_binary + self.HexDecode(eeprom[index])[0]
return eeprom_binary
def ExtractEEPROMStringCategory(self, device):
"""
Extract "Strings" category data from slave ESI XML and generate EEPROM image data.
@param device : 'device' object in the slave ESI XML
@return eeprom : "Strings" category EEPROM image data
"""
eeprom = []
self.Strings = []
data = ""
count = 0 # string counter
padflag = False # padding flag if category length is odd
# index information for General Category in EEPROM
self.GroupIdx = 0
self.ImgIdx = 0
self.OrderIdx = 0
self.NameIdx = 0
# flag for preventing duplicated vendor specific data
typeflag = False
grouptypeflag = False
groupnameflag = False
devnameflag = False
imageflag = False
# vendor specific data
# element1; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Type>
# vendor_specific_data : vendor specific data (binary type)
vendor_specific_data = ""
# vendor_spec_strings : list of vendor specific "strings" for preventing duplicated strings
vendor_spec_strings = []
for element in device.getType().getcontent():
data += element
if data != "" and isinstance(data, text):
for vendor_spec_string in vendor_spec_strings:
if data == vendor_spec_string:
self.OrderIdx = vendor_spec_strings.index(data)+1
typeflag = True
break
if typeflag is False:
count += 1
self.Strings.append(data)
vendor_spec_strings.append(data)
typeflag = True
self.OrderIdx = count
vendor_specific_data += "{:0>2x}".format(len(data))
for character in range(len(data)):
vendor_specific_data += "{:0>2x}".format(ord(data[character]))
data = ""
# element2-1; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<GroupType>
data = device.getGroupType()
if data is not None and isinstance(data, text):
for vendor_spec_string in vendor_spec_strings:
if data == vendor_spec_string:
self.GroupIdx = vendor_spec_strings.index(data)+1
grouptypeflag = True
break
if grouptypeflag is False:
count += 1
self.Strings.append(data)
vendor_spec_strings.append(data)
grouptypeflag = True
self.GroupIdx = count
vendor_specific_data += "{:0>2x}".format(len(data))
for character in range(len(data)):
vendor_specific_data += "{:0>2x}".format(ord(data[character]))
# element2-2; <EtherCATInfo>-<Groups>-<Group>-<Type>
if grouptypeflag is False:
if self.Controler.CTNParent.CTNParent.ModulesLibrary.Library is not None:
for _vendor_id, vendor in self.Controler.CTNParent.CTNParent.ModulesLibrary.Library.iteritems():
for group_type, group_etc in vendor["groups"].iteritems():
for device_item in group_etc["devices"]:
if device == device_item[1]:
data = group_type
if data is not None and isinstance(data, text):
for vendor_spec_string in vendor_spec_strings:
if data == vendor_spec_string:
self.GroupIdx = vendor_spec_strings.index(data)+1
grouptypeflag = True
break
if grouptypeflag is False:
count += 1
self.Strings.append(data)
vendor_spec_strings.append(data)
grouptypeflag = True
self.GroupIdx = count
vendor_specific_data += "{:0>2x}".format(len(data))
for character in range(len(data)):
vendor_specific_data += "{:0>2x}".format(ord(data[character]))
data = ""
# element3; <EtherCATInfo>-<Descriptions>-<Groups>-<Group>-<Name(LcId is "1033")>
if self.Controler.CTNParent.CTNParent.ModulesLibrary.Library is not None:
for _vendorId, vendor in self.Controler.CTNParent.CTNParent.ModulesLibrary.Library.iteritems():
for group_type, group_etc in vendor["groups"].iteritems():
for device_item in group_etc["devices"]:
if device == device_item[1]:
data = group_etc["name"]
if data != "" and isinstance(data, text):
for vendor_spec_string in vendor_spec_strings:
if data == vendor_spec_string:
groupnameflag = True
break
if groupnameflag is False:
count += 1
self.Strings.append(data)
vendor_spec_strings.append(data)
groupnameflag = True
vendor_specific_data += "{:0>2x}".format(len(data))
for character in range(len(data)):
vendor_specific_data += "{:0>2x}".format(ord(data[character]))
data = ""
# element4; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Name(LcId is "1033" or "1"?)>
for element in device.getName():
if element.getLcId() == 1 or element.getLcId() == 1033:
data = element.getcontent()
if data != "" and isinstance(data, text):
for vendor_spec_string in vendor_spec_strings:
if data == vendor_spec_string:
self.NameIdx = vendor_spec_strings.index(data)+1
devnameflag = True
break
if devnameflag is False:
count += 1
self.Strings.append(data)
vendor_spec_strings.append(data)
devnameflag = True
self.NameIdx = count
vendor_specific_data += "{:0>2x}".format(len(data))
for character in range(len(data)):
vendor_specific_data += "{:0>2x}".format(ord(data[character]))
data = ""
# element5-1; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Image16x14>
if device.getcontent() is not None:
data = device.getcontent()
if data is not None and isinstance(data, text):
for vendor_spec_string in vendor_spec_strings:
if data == vendor_spec_string:
self.ImgIdx = vendor_spec_strings.index(data)+1
imageflag = True
break
if imageflag is False:
count += 1
self.Strings.append(data)
vendor_spec_strings.append(data)
imageflag = True
self.ImgIdx = count
vendor_specific_data += "{:0>2x}".format(len(data))
for character in range(len(data)):
vendor_specific_data += "{:0>2x}".format(ord(data[character]))
# element5-2; <EtherCATInfo>-<Descriptions>-<Groups>-<Group>-<Image16x14>
if imageflag is False:
if self.Controler.CTNParent.CTNParent.ModulesLibrary.Library is not None:
for _vendor_id, vendor in self.Controler.CTNParent.CTNParent.ModulesLibrary.Library.iteritems():
for group_type, group_etc in vendor["groups"].iteritems():
for device_item in group_etc["devices"]:
if device == device_item[1]:
data = group_etc
if data is not None and isinstance(data, text):
for vendor_spec_string in vendor_spec_strings:
if data == vendor_spec_string:
self.ImgIdx = vendor_spec_strings.index(data)+1
imageflag = True
break
if imageflag is False:
count += 1
self.Strings.append(data)
vendor_spec_strings.append(data)
imageflag = True
self.ImgIdx = count
vendor_specific_data += "{:0>2x}".format(len(data))
for character in range(len(data)):
vendor_specific_data += "{:0>2x}".format(ord(data[character]))
data = ""
# DC related elements
# <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Dc>-<OpMode>-<Name>
dc_related_elements = ""
if device.getDc() is not None:
for element in device.getDc().getOpMode():
data = element.getName()
if data != "":
count += 1
self.Strings.append(data)
dc_related_elements += "{:0>2x}".format(len(data))
for character in range(len(data)):
dc_related_elements += "{:0>2x}".format(ord(data[character]))
data = ""
# Input elements(TxPDO)
# <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<TxPdo>; Name
input_elements = ""
inputs = []
for element in device.getTxPdo():
for name in element.getName():
data = name.getcontent()
for input in inputs:
if data == input:
data = ""
if data != "":
count += 1
self.Strings.append(data)
inputs.append(data)
input_elements += "{:0>2x}".format(len(data))
for character in range(len(data)):
input_elements += "{:0>2x}".format(ord(data[character]))
data = ""
for entry in element.getEntry():
for name in entry.getName():
data = name.getcontent()
for input in inputs:
if data == input:
data = ""
if data != "":
count += 1
self.Strings.append(data)
inputs.append(data)
input_elements += "{:0>2x}".format(len(data))
for character in range(len(data)):
input_elements += "{:0>2x}".format(ord(data[character]))
data = ""
# Output elements(RxPDO)
# <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<RxPdo>; Name
output_elements = ""
outputs = []
for element in device.getRxPdo():
for name in element.getName():
data = name.getcontent()
for output in outputs:
if data == output:
data = ""
if data != "":
count += 1
self.Strings.append(data)
outputs.append(data)
output_elements += "{:0>2x}".format(len(data))
for character in range(len(data)):
output_elements += "{:0>2x}".format(ord(data[character]))
data = ""
for entry in element.getEntry():
for name in entry.getName():
data = name.getcontent()
for output in outputs:
if data == output:
data = ""
if data != "":
count += 1
self.Strings.append(data)
outputs.append(data)
output_elements += "{:0>2x}".format(len(data))
for character in range(len(data)):
output_elements += "{:0>2x}".format(ord(data[character]))
data = ""
# form eeprom data
# category header
eeprom.append("0a")
eeprom.append("00")
# category length (word); 1 word is 4 bytes. "+2" is the length of string's total number
length = len(vendor_specific_data + dc_related_elements + input_elements + output_elements) + 2
if length % 4 == 0:
pass
else:
length += length % 4
padflag = True
eeprom.append("{:0>4x}".format(length//4)[2:4])
eeprom.append("{:0>4x}".format(length//4)[0:2])
# total numbers of strings
eeprom.append("{:0>2x}".format(count))
for element in [vendor_specific_data,
dc_related_elements,
input_elements,
output_elements]:
for dummy in range(len(element)//2):
if element == "":
eeprom.append("00")
else:
eeprom.append(element[0:2])
element = element[2:len(element)]
# padding if length is odd bytes
if padflag is True:
eeprom.append("ff")
return eeprom
def ExtractEEPROMGeneralCategory(self, device):
"""
Extract "General" category data from slave ESI XML and generate EEPROM image data.
@param device : 'device' object in the slave ESI XML
@return eeprom : "Strings" category EEPROM image data
"""
eeprom = []
# category header
eeprom.append("1e")
eeprom.append("00")
# category length
eeprom.append("10")
eeprom.append("00")
# word 1 : Group Type index and Image index in STRINGS Category
eeprom.append("{:0>2x}".format(self.GroupIdx))
eeprom.append("{:0>2x}".format(self.ImgIdx))
# word 2 : Device Type index and Device Name index in STRINGS Category
eeprom.append("{:0>2x}".format(self.OrderIdx))
eeprom.append("{:0>2x}".format(self.NameIdx))
# word 3 : Physical Layer Port info. and CoE Details
eeprom.append("01") # Physical Layer Port info - assume 01
# CoE Details; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Mailbox>-<CoE>
coe_details = 1 # sdo enabled
with device.getMailbox() as mb
if mb is not None:
coe = mb.getCoE()
if coe is not None:
for bit, flag in enumerate(["SdoInfo", "PdoAssign", "PdoConfig",
"PdoUpload", "CompleteAccess"]):
if getattr(coe, "get%s" % flag)() is not None:
coe_details += 1 << bit
eeprom.append("{:0>2x}".format(coe_details))
# word 4 : FoE Details and EoE Details
# FoE Details; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Mailbox>-<FoE>
if mb is not None and mb.getFoE() is not None:
eeprom.append("01")
else:
eeprom.append("00")
# EoE Details; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Mailbox>-<EoE>
if mb is not None and mb.getEoE() is not None:
eeprom.append("01")
else:
eeprom.append("00")
# word 5 : SoE Channels(reserved) and DS402 Channels
# SoE Details; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Mailbox>-<SoE>
if mb is not None and mb.getSoE() is not None:
eeprom.append("01")
else:
eeprom.append("00")
# DS402Channels; <EtherCATInfo>-<Descriptions>-<Devices>-<Device>-<Mailbox>-<CoE>: DS402Channels
ds402ch = False
if mb is not None:
coe = mb.getCoE()
if coe is not None:
ds402ch = coe.getDS402Channels()
eeprom.append("01" if ds402ch in [True, 1] else "00")
# word 6 : SysmanClass(reserved) and Flags
eeprom.append("00") # reserved
# Flags
en_safeop = False
en_lrw = False
if device.getType().getTcCfgModeSafeOp() is True \
or device.getType().getTcCfgModeSafeOp() == 1:
en_safeop = True
if device.getType().getUseLrdLwr() is True \
or device.getType().getUseLrdLwr() == 1:
en_lrw = True
flags = "0b"+"000000"+str(int(en_lrw))+str(int(en_safeop))
eeprom.append("{:0>2x}".format(int(flags, 2)))
# word 7 : Current On EBus (assume 0x0000)
eeprom.append("00")
eeprom.append("00")
# after word 7; couldn't analyze yet
eeprom.append("03")
eeprom.append("00")
eeprom.append("11")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
eeprom.append("00")
return eeprom
def ExtractEEPROMFMMUCategory(self, device):
"""
Extract "FMMU" category data from slave ESI XML and generate EEPROM image data.
@param device : 'device' object in the slave ESI XML
@return eeprom : "Strings" category EEPROM image data
"""
eeprom = []
data = ""
count = 0 # number of FMMU
padflag = False
for fmmu in device.getFmmu():
count += 1
if fmmu.getcontent() == "Outputs":
data += "01"
if fmmu.getcontent() == "Inputs":
data += "02"
if fmmu.getcontent() == "MBoxState":
data += "03"
# construct of EEPROM data
if data != "":
# category header
eeprom.append("28")
eeprom.append("00")
# category length
if count % 2 == 1:
padflag = True
eeprom.append("{:0>4x}".format((count+1)//2)[2:4])
eeprom.append("{:0>4x}".format((count+1)//2)[0:2])
else:
eeprom.append("{:0>4x}".format((count)//2)[2:4])
eeprom.append("{:0>4x}".format((count)//2)[0:2])
for dummy in range(count):
if data == "":
eeprom.append("00")
else:
eeprom.append(data[0:2])
data = data[2:len(data)]
# padding if length is odd bytes
if padflag is True:
eeprom.append("ff")
return eeprom
def ExtractEEPROMSyncMCategory(self, device):
"""
Extract "SyncM" category data from slave ESI XML and generate EEPROM image data.
@param device : 'device' object in the slave ESI XML
@return eeprom : "Strings" category EEPROM image data
"""
eeprom = []
data = ""
number = {"MBoxOut": "01", "MBoxIn": "02", "Outputs": "03", "Inputs": "04"}
for sm in device.getSm():
for attr in [sm.getStartAddress(),
sm.getDefaultSize(),
sm.getControlByte()]:
if attr is not None:
data += "{:0>4x}".format(ExtractHexDecValue(attr))[2:4]
data += "{:0>4x}".format(ExtractHexDecValue(attr))[0:2]
else:
data += "0000"
if sm.getEnable() == "1" or sm.getEnable() is True:
data += "01"
else:
data += "00"
data += number[sm.getcontent()]
if data != "":
# category header
eeprom.append("29")
eeprom.append("00")
# category length
eeprom.append("{:0>4x}".format(len(data)//4)[2:4])
eeprom.append("{:0>4x}".format(len(data)//4)[0:2])
for dummy in range(len(data)//2):
if data == "":
eeprom.append("00")
else:
eeprom.append(data[0:2])
data = data[2:len(data)]
return eeprom
def ExtractEEPROMPDOCategory(self, device, pdotype):
"""
Extract ""PDO (Tx, Rx)"" category data from slave ESI XML and generate EEPROM image data.
@param device : 'device' object in the slave ESI XML
@param pdotype : identifier whether "TxPDO" or "RxPDO".
@return eeprom : "Strings" category EEPROM image data
"""
eeprom = []
data = ""
count = 0
en_fixed = False
en_mandatory = False
en_virtual = False
for element in eval("device.get%s()" % pdotype):
# PDO Index
data += "{:0>4x}".format(ExtractHexDecValue(element.getIndex().getcontent()))[2:4]
data += "{:0>4x}".format(ExtractHexDecValue(element.getIndex().getcontent()))[0:2]
# Number of Entries
data += "{:0>2x}".format(len(element.getEntry()))
# About Sync Manager
if element.getSm() is not None:
data += "{:0>2x}".format(element.getSm())
else:
data += "ff"
# Reference to DC Synch (according to ET1100 documentation) - assume 0
data += "00"
# Name Index
objname = ""
for name in element.getName():
objname = name.getcontent()
for name in self.Strings:
count += 1
if objname == name:
break
if len(self.Strings)+1 == count:
data += "00"
else:
data += "{:0>2x}".format(count)
count = 0
# Flags; by Fixed, Mandatory, Virtual attributes ?
if element.getFixed() is True or 1:
en_fixed = True
if element.getMandatory() is True or 1:
en_mandatory = True
if element.getVirtual() is True or element.getVirtual():
en_virtual = True
data += str(int(en_fixed)) + str(int(en_mandatory)) + str(int(en_virtual)) + "0"
for entry in element.getEntry():
# Entry Index
data += "{:0>4x}".format(ExtractHexDecValue(entry.getIndex().getcontent()))[2:4]
data += "{:0>4x}".format(ExtractHexDecValue(entry.getIndex().getcontent()))[0:2]
# Subindex
data += "{:0>2x}".format(int(entry.getSubIndex()))
# Entry Name Index
objname = ""
for name in entry.getName():
objname = name.getcontent()
for name in self.Strings:
count += 1
if objname == name:
break
if len(self.Strings)+1 == count:
data += "00"
else:
data += "{:0>2x}".format(count)
count = 0
# DataType
if entry.getDataType() is not None:
if entry.getDataType().getcontent() in self.BaseDataTypeDict:
data += self.BaseDataTypeDict[entry.getDataType().getcontent()]
else:
data += "00"
else:
data += "00"
# BitLen
if entry.getBitLen() is not None:
data += "{:0>2x}".format(int(entry.getBitLen()))
else:
data += "00"
# Flags; by Fixed attributes ?
en_fixed = False
if entry.getFixed() is True or entry.getFixed() == 1:
en_fixed = True
data += str(int(en_fixed)) + "000"
if data != "":
# category header
if pdotype == "TxPdo":
eeprom.append("32")
elif pdotype == "RxPdo":
eeprom.append("33")
else:
eeprom.append("00")
eeprom.append("00")
# category length
eeprom.append("{:0>4x}".format(len(data)//4)[2:4])
eeprom.append("{:0>4x}".format(len(data)//4)[0:2])
data = str(data.lower())
for dummy in range(len(data)//2):
if data == "":
eeprom.append("00")
else:
eeprom.append(data[0:2])
data = data[2:len(data)]
return eeprom
def ExtractEEPROMDCCategory(self, device):
"""
Extract "DC(Distributed Clock)" category data from slave ESI XML and generate EEPROM image data.
@param device : 'device' object in the slave ESI XML
@return eeprom : "Strings" category EEPROM image data
"""
eeprom = []
data = ""
count = 0
namecount = 0
if device.getDc() is not None:
for element in device.getDc().getOpMode():
count += 1
# assume that word 1-7 are 0x0000
data += "0000"
data += "0000"
data += "0000"
data += "0000"
data += "0000"
data += "0000"
data += "0000"
# word 8-10
# AssignActivate
if element.getAssignActivate() is not None:
data += "{:0>4x}".format(ExtractHexDecValue(element.getAssignActivate()))[2:4]
data += "{:0>4x}".format(ExtractHexDecValue(element.getAssignActivate()))[0:2]
else:
data += "0000"
# Factor of CycleTimeSync0 ? and default is 1?
if element.getCycleTimeSync0() is not None:
if element.getCycleTimeSync0().getFactor() is not None:
data += "{:0>2x}".format(int(element.getCycleTimeSync0().getFactor()))
data += "00"
else:
data += "0100"
else:
data += "0100"
# Index of Name in STRINGS Category
# Name Index
objname = ""
for name in element.getName():
objname += name
for name in self.Strings:
namecount += 1
if objname == name:
break
if len(self.Strings)+1 == namecount:
data += "00"
else:
data += "{:0>2x}".format(namecount)
namecount = 0
data += "00"
# assume that word 11-12 are 0x0000
data += "0000"
data += "0000"
if data != "":
# category header
eeprom.append("3c")
eeprom.append("00")
# category length
eeprom.append("{:0>4x}".format(len(data)//4)[2:4])
eeprom.append("{:0>4x}".format(len(data)//4)[0:2])
data = str(data.lower())
for dummy in range(len(data)//2):
if data == "":
eeprom.append("00")
else:
eeprom.append(data[0:2])
data = data[2:len(data)]
return eeprom
# -------------------------------------------------------------------------------
# Used Register Access
# -------------------------------------------------------------------------------
def RegRead(self, offset, length):
"""
Read slave ESC register content using "ethercat reg_read -p %d %s %s" command.
Command example : "ethercat reg_read -p 0 0x0c00 0x0400"
@param offset : register address
@param length : register length
@return return_val : register data
"""
_error, return_val = self.Controler.RemoteExec(
REG_READ % (self.Controler.GetSlavePos(), offset, length),
return_val=None)
return return_val
def MultiRegRead(self, slave_num, reg_infos):
"""
@slave_num:
@param addr_info:
@return return_val:
"""
reg_info_str = ""
for reg_info in reg_infos:
reg_info_str = reg_info_str + "%s|" % reg_info
reg_info_str = reg_info_str.strip("|")
_error, return_val = self.Controler.RemoteExec(\
MULTI_REG_READ%(slave_num, reg_info_str),
return_val = None)
return return_val
def RegWrite(self, address, data):
"""
Write data to slave ESC register using "ethercat reg_write -p %d %s %s" command.
Command example : "ethercat reg_write -p 0 0x0c04 0x0001"
@param address : register address
@param data : data to write
@return return_val : the execution result of "ethercat reg_write" (for error check)
"""
_error, return_val = self.Controler.RemoteExec(
REG_WRITE % (self.Controler.GetSlavePos(), address, data),
return_val=None)
return return_val
def Rescan(self):
"""
Synchronize EEPROM data in master controller with the data in slave device after EEPROM write.
Command example : "ethercat rescan -p 0"
"""
_error, _return_val = self.Controler.RemoteExec(RESCAN % (self.Controler.GetSlavePos()), return_val=None)
# -------------------------------------------------------------------------------
# Used DC Configuration
#-------------------------------------------------------------------------------
def LoadESIData(self):
return_data = []
slave = self.Controler.CTNParent.GetSlave(self.Controler.GetSlavePos())
type_infos = slave.getType()
device, alignment = self.Controler.CTNParent.GetModuleInfos(type_infos)
if device.getDc() is not None:
for OpMode in device.getDc().getOpMode():
temp_data = {
"desc" : OpMode.getDesc() if OpMode.getDesc() is not None else "Unused",
"assign_activate" : OpMode.getAssignActivate() \
if OpMode.getAssignActivate() is not None else "#x0000",
"cycletime_sync0" : OpMode.getCycleTimeSync0().getcontent() \
if OpMode.getCycleTimeSync0() is not None else None,
"shifttime_sync0" : OpMode.getShiftTimeSync0().getcontent() \
if OpMode.getShiftTimeSync0() is not None else None,
"cycletime_sync1" : OpMode.getShiftTimeSync1().getcontent() \
if OpMode.getShiftTimeSync1() is not None else None,
"shifttime_sync1" : OpMode.getShiftTimeSync1().getcontent() \
if OpMode.getShiftTimeSync1() is not None else None
}
if OpMode.getCycleTimeSync0() is not None:
temp_data["cycletime_sync0_factor"] = OpMode.getCycleTimeSync0().getFactor()
if OpMode.getCycleTimeSync1() is not None:
temp_data["cycletime_sync1_factor"] = OpMode.getCycleTimeSync1().getFactor()
return_data.append(temp_data)
return return_data
#-------------------------------------------------------------------------------
# Common Use Methods
# -------------------------------------------------------------------------------
def CheckConnect(self, cyclic_flag):
"""
Check connection status (1) between Beremiz and the master (2) between the master and the slave.
@param cyclic_flag: 0 - one shot, 1 - periodic
@return True or False
"""
if self.Controler.GetCTRoot()._connector is not None:
# Check connection between the master and the slave.
# Command example : "ethercat xml -p 0"
_error, return_val = self.Controler.RemoteExec(SLAVE_XML % (self.Controler.GetSlavePos()), return_val=None)
number_of_lines = return_val.split("\n")
if len(number_of_lines) <= 2: # No slave connected to the master controller
if not cyclic_flag:
self.CreateErrorDialog(_('No connected slaves'))
return False
elif len(number_of_lines) > 2:
return True
else:
# The master controller is not connected to Beremiz host
if not cyclic_flag:
self.CreateErrorDialog(_('PLC not connected!'))
return False
def CreateErrorDialog(self, mention):
"""
Create a dialog to indicate error or warning.
@param mention : Error String
"""
app_frame = self.Controler.GetCTRoot().AppFrame
dlg = wx.MessageDialog(app_frame, mention,
_(' Warning...'),
wx.OK | wx.ICON_INFORMATION)
dlg.ShowModal()
dlg.Destroy()