Base build mechanism layout.
"""
Base definitions for beremiz plugins
"""
import os
import plugins
import types
import shutil
from xml.dom import minidom
from xmlclass import GenerateClassesFromXSDstring
from PLCControler import PLCControler
_BaseParamsClass = GenerateClassesFromXSDstring("""<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="BaseParams">
<xsd:complexType>
<xsd:attribute name="Name" type="xsd:string" use="required"/>
<xsd:attribute name="IEC_Channel" type="xsd:integer" use="required"/>
<xsd:attribute name="Enabled" type="xsd:boolean" use="required" default="true"/>
</xsd:complexType>
</xsd:element>
</xsd:schema>""")[0]["BaseParams"]
NameTypeSeparator = '@'
class PlugTemplate:
"""
This class is the one that define plugins.
"""
XSD = None
PlugChildsTypes = []
PlugMaxCount = None
PluginMethods = []
def _AddParamsMembers(self):
Classes = GenerateClassesFromXSDstring(self.XSD)[0]
self.PlugParams = []
Classes = [(name, XSDclass) for name, XSDclass in Classes.items() if XSDclass.IsBaseClass]
if len(Classes) == 1:
name, XSDclass = Classes[0]
obj = XSDclass()
self.PlugParams = (name, obj)
setattr(self, name, obj)
def __init__(self):
# Create BaseParam
self.BaseParams = _BaseParamsClass()
self.MandatoryParams = ("BaseParams", self.BaseParams)
self._AddParamsMembers()
self.PluggedChilds = {}
def PluginBaseXmlFilePath(self, PlugName=None):
return os.path.join(self.PlugPath(PlugName), "baseplugin.xml")
def PluginXmlFilePath(self, PlugName=None):
return os.path.join(self.PlugPath(PlugName), "plugin.xml")
def PlugPath(self,PlugName=None):
if not PlugName:
PlugName = self.BaseParams.getName()
return os.path.join(self.PlugParent.PlugPath(), PlugName + NameTypeSeparator + self.PlugType)
def PlugTestModified(self):
return False
def OnPlugSave(self):
return True
def GetPlugParamsAttributes(self):
return self.PlugParams[1].getElementAttributes()
def SetPlugParamsAttribute(self, name, value):
attr = getattr(self.PlugParams[1], name, None)
if isinstance(attr, types.ClassType):
attr.SetValue(value)
else:
setattr(self.PlugParams[1], name, value)
def PlugRequestSave(self):
# If plugin do not have corresponding directory
plugpath = self.PlugPath()
if not os.path.isdir(plugpath):
# Create it
os.mkdir(plugpath)
# generate XML for base XML parameters controller of the plugin
basexmlfilepath = self.PluginBaseXmlFilePath()
if basexmlfilepath:
BaseXMLFile = open(basexmlfilepath,'w')
BaseXMLFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n")
BaseXMLFile.write(self.MandatoryParams[1].generateXMLText(self.MandatoryParams[0], 0))
BaseXMLFile.close()
# generate XML for XML parameters controller of the plugin
xmlfilepath = self.PluginXmlFilePath()
if xmlfilepath:
XMLFile = open(xmlfilepath,'w')
XMLFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n")
XMLFile.write(self.PlugParams[1].generateXMLText(self.PlugParams[0], 0))
XMLFile.close()
# Call the plugin specific OnPlugSave method
result = self.OnPlugSave()
if not result:
return "Error while saving \"%s\""%self.PlugPath()
# go through all childs and do the same
for PlugChild in self.IterChilds():
result = PlugChild.PlugRequestSave()
if result:
return result
return None
def PlugImport(self, src_PlugPath):
shutil.copytree(src_PlugPath, self.PlugPath)
return True
def PlugGenerate_C(self, buildpath, current_location, locations, logger):
"""
Generate C code
@param current_location: Tupple containing plugin IEC location : %I0.0.4.5 => (0,0,4,5)
@param locations: List of complete variables locations \
[(IEC_loc, IEC_Direction, IEC_Type, Name)]\
ex: [((0,0,4,5),'I','STRING','__IX_0_0_4_5'),...]
@return: [(C_file_name, CFLAGS),...] , LDFLAGS_TO_APPEND
"""
logger.write_warning(".".join(map(lambda x:str(x), current_location)) + " -> Nothing yo do")
return [],""
def _Generate_C(self, buildpath, current_location, locations, logger):
# Generate plugins [(Cfiles, CFLAGS)], LDFLAGS
PlugCFilesAndCFLAGS, PlugLDFLAGS = self.PlugGenerate_C(buildpath, current_location, locations, logger)
# recurse through all childs, and stack their results
for PlugChild in self.IterChilds():
# Compute child's IEC location
new_location = current_location + (self.BaseParams.getIEC_Channel())
# Get childs [(Cfiles, CFLAGS)], LDFLAGS
CFilesAndCFLAGS, LDFLAGS = \
PlugChild._Generate_C(
#keep the same path
buildpath,
# but update location (add curent IEC channel at the end)
new_location,
# filter locations that start with current IEC location
[ (l,d,t,n) for l,d,t,n in locations if l[0:len(new_location)] == new_location ],
#propagete logger
logger)
# stack the result
PlugCFilesAndCFLAGS += CFilesAndCFLAGS
PlugLDFLAGS += LDFLAGS
return PlugCFilesAndCFLAGS,PlugLDFLAGS
def BlockTypesFactory(self):
return []
def STLibraryFactory(self):
return ""
def IterChilds(self):
for PlugType, PluggedChilds in self.PluggedChilds.items():
for PlugInstance in PluggedChilds:
yield PlugInstance
def _GetChildBySomething(self, sep, something, matching):
toks = matching.split(sep,1)
for PlugInstance in self.IterChilds():
# if match component of the name
if getattr(PlugInstance.BaseParams, something) == toks[0]:
# if Name have other components
if len(toks) == 2:
# Recurse in order to find the latest object
return PlugInstance._GetChildBySomething( sep, something, toks[1])
# No sub name -> found
return PlugInstance
# Not found
return None
def GetChildByName(self, Name):
return self._GetChildBySomething('.',"Name", Name)
def GetChildByIECLocation(self, Location):
return self._GetChildBySomething('_',"IEC_Channel", Name)
def GetPlugInfos(self):
childs = []
for child in self.IterChilds():
childs.append(child.GetPlugInfos())
return {"name" : self.BaseParams.getName(), "type" : None, "values" : childs}
def FindNewIEC_Channel(self, DesiredChannel):
"""
Changes IEC Channel number to DesiredChannel if available, nearest available if not.
@param DesiredChannel: The desired IEC channel (int)
"""
# Get Current IEC channel
CurrentChannel = self.BaseParams.getIEC_Channel()
# Do nothing if no change
if CurrentChannel == DesiredChannel: return CurrentChannel
# Build a list of used Channels out of parent's PluggedChilds
AllChannels=[]
for PlugInstance in self.PlugParent.IterChilds():
if PlugInstance != self:
AllChannels.append(PlugInstance.BaseParams.getIEC_Channel())
AllChannels.sort()
# Now, try to guess the nearest available channel
res = DesiredChannel
while res in AllChannels: # While channel not free
if res < CurrentChannel: # Want to go down ?
res -= 1 # Test for n-1
if res < 0 : return CurrentChannel # Can't go bellow 0, do nothing
else : # Want to go up ?
res += 1 # Test for n-1
# Finally set IEC Channel
self.BaseParams.setIEC_Channel(res)
return res
def OnPlugClose(self):
return True
def _doRemoveChild(self, PlugInstance):
# Remove all childs of child
for SubPlugInstance in PlugInstance.IterChilds():
PlugInstance._doRemoveChild(SubPlugInstance)
# Call the OnCloseMethod
PlugInstance.OnPlugClose()
# Delete plugin dir
shutil.rmtree(PlugInstance.PlugPath())
# Remove child of PluggedChilds
self.PluggedChilds[PlugInstance.PlugType].remove(PlugInstance)
# Forget it... (View have to refresh)
def PlugRemoveChild(self, PlugName):
# Fetch the plugin
PlugInstance = self.GetChildByName(PlugName)
# Ask to his parent to remove it
PlugInstance.PlugParent._doRemoveChild(PlugInstance)
def PlugAddChild(self, PlugName, PlugType):
"""
Create the plugins that may be added as child to this node self
@param PlugType: string desining the plugin class name (get name from PlugChildsTypes)
@param PlugName: string for the name of the plugin instance
"""
PlugChildsTypes = dict(self.PlugChildsTypes)
# Check that adding this plugin is allowed
try:
PlugClass = PlugChildsTypes[PlugType]
except KeyError:
raise Exception, "Cannot create child %s of type %s "%(PlugName, PlugType)
# if PlugClass is a class factory, call it. (prevent unneeded imports)
if type(PlugClass) == types.FunctionType:
PlugClass = PlugClass()
# Eventualy Initialize child instance list for this class of plugin
PluggedChildsWithSameClass = self.PluggedChilds.setdefault(PlugType, list())
# Check count
if getattr(PlugClass, "PlugMaxCount", None) and len(PluggedChildsWithSameClass) >= PlugClass.PlugMaxCount:
raise Exception, "Max count (%d) reached for this plugin of type %s "%(PlugClass.PlugMaxCount, PlugType)
# create the final class, derived of provided plugin and template
class FinalPlugClass(PlugClass, PlugTemplate):
"""
Plugin class is derivated into FinalPlugClass before being instanciated
This way __init__ is overloaded to ensure PlugTemplate.__init__ is called
before PlugClass.__init__, and to do the file related stuff.
"""
def __init__(_self):
# self is the parent
_self.PlugParent = self
# Keep track of the plugin type name
_self.PlugType = PlugType
# Call the base plugin template init - change XSD into class members
PlugTemplate.__init__(_self)
# If dir have already be made, and file exist
if os.path.isdir(_self.PlugPath(PlugName)) and os.path.isfile(_self.PluginXmlFilePath(PlugName)):
#Load the plugin.xml file into parameters members
_self.LoadXMLParams(PlugName)
# Check that IEC_Channel is not already in use.
self.FindNewIEC_Channel(self.BaseParams.getIEC_Channel())
# Call the plugin real __init__
if getattr(PlugClass, "__init__", None):
PlugClass.__init__(_self)
#Load and init all the childs
_self.LoadChilds()
else:
# If plugin do not have corresponding file/dirs - they will be created on Save
# Set plugin name
_self.BaseParams.setName(PlugName)
os.mkdir(_self.PlugPath())
# Find an IEC number
_self.FindNewIEC_Channel(0)
# Call the plugin real __init__
if getattr(PlugClass, "__init__", None):
PlugClass.__init__(_self)
_self.PlugRequestSave()
# Create the object out of the resulting class
newPluginOpj = FinalPlugClass()
# Store it in PluggedChils
PluggedChildsWithSameClass.append(newPluginOpj)
return newPluginOpj
def LoadXMLParams(self, PlugName = None, test = True):
# Get the base xml tree
basexmlfilepath = self.PluginBaseXmlFilePath(PlugName)
if basexmlfilepath:
basexmlfile = open(basexmlfilepath, 'r')
basetree = minidom.parse(basexmlfile)
self.MandatoryParams[1].loadXMLTree(basetree.childNodes[0])
basexmlfile.close()
# Get the xml tree
xmlfilepath = self.PluginXmlFilePath(PlugName)
if xmlfilepath:
xmlfile = open(xmlfilepath, 'r')
tree = minidom.parse(xmlfile)
self.PlugParams[1].loadXMLTree(tree.childNodes[0])
xmlfile.close()
if test:
# Basic check. Better to fail immediately.
if not PlugName:
PlugName = os.path.split(self.PlugPath())[1].split(NameTypeSeparator)[0]
if (self.BaseParams.getName() != PlugName):
raise Exception, "Project tree layout do not match plugin.xml %s!=%s "%(PlugName, self.BaseParams.getName())
# Now, self.PlugPath() should be OK
def LoadChilds(self):
# Iterate over all PlugName@PlugType in plugin directory, and try to open them
for PlugDir in os.listdir(self.PlugPath()):
if os.path.isdir(os.path.join(self.PlugPath(), PlugDir)) and \
PlugDir.count(NameTypeSeparator) == 1:
try:
self.PlugAddChild(*PlugDir.split(NameTypeSeparator))
except Exception, e:
print e
def _GetClassFunction(name):
def GetRootClass():
return getattr(__import__("plugins." + name), name).RootClass
return GetRootClass
class PluginsRoot(PlugTemplate):
# For root object, available Childs Types are modules of the plugin packages.
PlugChildsTypes = [(name, _GetClassFunction(name)) for name in plugins.__all__]
XSD = """<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:simpleType name="Win32Compiler">
<xsd:restriction base="xsd:string">
<xsd:enumeration value="Cygwin"/>
<xsd:enumeration value="MinGW"/>
<xsd:enumeration value="VC++"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:element name="BeremizRoot">
<xsd:complexType>
<xsd:element name="TargetType">
<xsd:complexType>
<xsd:choice>
<xsd:element name="Win32">
<xsd:complexType>
<xsd:attribute name="ToolChain" type="ppx:Win32Compiler" use="required" default="MinGW"/>
<xsd:attribute name="Priority" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="Linux">
<xsd:complexType>
<xsd:attribute name="Compiler" type="xsd:string" use="required" default="gcc"/>
<xsd:attribute name="Nice" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="Xenomai">
<xsd:complexType>
<xsd:attribute name="xeno-config" type="xsd:string" use="required" default="/usr/xenomai/"/>
<xsd:attribute name="Compiler" type="xsd:string" use="required"/>
<xsd:attribute name="Priority" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="RTAI">
<xsd:complexType>
<xsd:attribute name="xeno-config" type="xsd:string" use="required"/>
<xsd:attribute name="Compiler" type="xsd:string" use="required"/>
<xsd:attribute name="Priority" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="Library">
<xsd:complexType>
<xsd:attribute name="Dynamic" type="xsd:boolean" use="required" default="true"/>
<xsd:attribute name="Compiler" type="xsd:string" use="required"/>
</xsd:complexType>
</xsd:element>
</xsd:choice>
</xsd:complexType>
</xsd:element>
</xsd:complexType>
</xsd:element>
</xsd:schema>
"""
def __init__(self):
PlugTemplate.__init__(self)
# self is the parent
self.PlugParent = None
# Keep track of the plugin type name
self.PlugType = "Beremiz"
self.ProjectPath = ""
self.PLCManager = None
def HasProjectOpened(self):
"""
Return if a project is actually opened
"""
return self.ProjectPath != ""
def GetProjectPath(self):
return self.ProjectPath
def GetTargetTypes(self):
return self.BeremizRoot.TargetType.getChoices().keys()
def ChangeTargetType(self, target_type):
self.BeremizRoot.TargetType.addContent(target_type)
def GetTargetAttributes(self):
content = self.BeremizRoot.TargetType.getContent()
if content:
return content["value"].getElementAttributes()
else:
return []
def SetTargetAttribute(self, name, value):
content = self.BeremizRoot.TargetType.getContent()
if content:
attr = getattr(content["value"], name, None)
if isinstance(attr, types.ClassType):
attr.SetValue(value)
else:
setattr(content["value"], name, value)
def GetTargetType(self):
content = self.BeremizRoot.TargetType.getContent()
if content:
return content["name"]
else:
return ""
def NewProject(self, ProjectPath, PLCParams):
"""
Create a new project in an empty folder
@param ProjectPath: path of the folder where project have to be created
@param PLCParams: properties of the PLCOpen program created
"""
# Verify that choosen folder is empty
if not os.path.isdir(ProjectPath) or len(os.listdir(ProjectPath)) > 0:
return "Folder choosen isn't empty. You can't use it for a new project!"
# Create Controler for PLCOpen program
self.PLCManager = PLCControler()
self.PLCManager.CreateNewProject(PLCParams.pop("projectName"))
self.PLCManager.SetProjectProperties(properties = PLCParams)
# Change XSD into class members
self._AddParamsMembers()
self.PluggedChilds = {}
# No IEC channel, name, etc...
self.MandatoryParams = []
# Keep track of the root plugin (i.e. project path)
self.ProjectPath = ProjectPath
self.BaseParams.setName(os.path.split(ProjectPath)[1])
return None
def LoadProject(self, ProjectPath):
"""
Load a project contained in a folder
@param ProjectPath: path of the project folder
"""
# Verify that project contains a PLCOpen program
plc_file = os.path.join(ProjectPath, "plc.xml")
if not os.path.isfile(plc_file):
return "Folder choosen doesn't contain a program. It's not a valid project!"
# Create Controler for PLCOpen program
self.PLCManager = PLCControler()
# Load PLCOpen file
result = self.PLCManager.OpenXMLFile(plc_file)
if result:
return result
# Change XSD into class members
self._AddParamsMembers()
self.PluggedChilds = {}
# No IEC channel, name, etc...
self.MandatoryParams = None
# Keep track of the root plugin (i.e. project path)
self.ProjectPath = ProjectPath
# If dir have already be made, and file exist
if os.path.isdir(self.PlugPath()) and os.path.isfile(self.PluginXmlFilePath()):
#Load the plugin.xml file into parameters members
result = self.LoadXMLParams(test = False)
if result:
return result
#Load and init all the childs
self.LoadChilds()
self.BaseParams.setName(os.path.split(ProjectPath)[1])
return None
def SaveProject(self):
if not self.PLCManager.SaveXMLFile():
self.PLCManager.SaveXMLFile(os.path.join(self.ProjectPath, 'plc.xml'))
self.PlugRequestSave()
def PlugPath(self, PlugName=None):
return self.ProjectPath
def PluginBaseXmlFilePath(self, PlugName=None):
return None
def PluginXmlFilePath(self, PlugName=None):
return os.path.join(self.PlugPath(PlugName), "beremiz.xml")
def PlugGenerate_C(self, buildpath, current_location, locations, logger):
"""
Generate C code
@param current_location: Tupple containing plugin IEC location : %I0.0.4.5 => (0,0,4,5)
@param locations: List of complete variables locations \
[(IEC_loc, IEC_Direction, IEC_Type, Name)]\
ex: [((0,0,4,5),'I','STRING','__IX_0_0_4_5'),...]
@return: [(C_file_name, CFLAGS),...] , LDFLAGS_TO_APPEND
"""
return [(C_file_name, "") for C_file_name in self.PLCGeneratedCFiles ] , ""
def _Generate_SoftPLC(self, buildpath, logger):
LOCATED_MODEL = re.compile("__LOCATED_VAR\((?P<IEC_TYPE>[A-Z]*),(?P<NAME>[_A-Za-z0-9]*),(?P<DIR>[QMI])(?:,(?P<SIZE>[XBWD]))?,(?P<LOC>[,0-9]*)\)")
if self.PLCManager:
logger.write("Generating SoftPLC IEC-61131 ST/IL/SFC code...\n")
plc_file = os.path.join(self.TargetDir, "plc.st")
result = self.PLCManager.GenerateProgram(plc_file)
if not result:
logger.write_error("Error : ST/IL/SFC code generator returned %d"%result)
return False
logger.write("Compiling ST Program in to C Program...\n")
status, result, err_result = self.LogCommand("%s %s -I %s %s"%(iec2cc_path, plc_file, ieclib_path, self.TargetDir))
if status:
new_dialog = wx.Frame(None)
ST_viewer = TextViewer(new_dialog, None, None)
#ST_viewer.Enable(False)
ST_viewer.SetKeywords(IEC_KEYWORDS)
ST_viewer.SetText(file(plc_file).read())
new_dialog.Show()
raise Exception, "Error : IEC to C compiler returned %d"%status
C_files = result.splitlines()
C_files.remove("POUS.c")
C_files = map(lambda filename:os.path.join(self.TargetDir, filename), C_files)
logger.write("Extracting Located Variables...\n")
location_file = open(os.path.join(self.TargetDir,"LOCATED_VARIABLES.h"))
locations = []
lines = [line.strip() for line in location_file.readlines()]
for line in lines:
result = LOCATED_MODEL.match(line)
if result:
resdict = result.groupdict()
# rewrite location as a tuple of integers
resdict['LOC'] = tuple(map(int,resdict['LOC'].split(',')))
if not resdict['SIZE']:
resdict['SIZE'] = 'X'
locations.append(resdict)
self.PLCGeneratedCFiles = C_files
self.PLCGeneratedLocatedVars = locations
return True
def _build(self, logger):
buildpath = os.path.join(self.ProjectPath, "build")
if not os.path.exists(buildpath):
os.mkdir(buildpath)
logger.write("Start build in %s" % buildpath)
if not self._Generate_SoftPLC(buildpath, logger):
logger.write("SoftPLC code generation failed !")
return
logger.write("SoftPLC code generation successfull")
try:
CFilesAndCFLAGS, LDFLAGS = self._Generate_C(
buildpath,
(),
self.PLCGeneratedLocatedVars,
logger)
except Exception, msg:
logger.write_error("Plugins code generation Failed !")
logger.write_error(str(msg))
return
logger.write_error("Plugins code generation successfull")
for CFile, CFLAG in CFilesAndCFLAGS:
print CFile,CFLAG
LDFLAGS
PluginMethods = [("Build",_build), ("Clean",None), ("Run",None), ("EditPLC",None), ("Simulate",None)]