"""
Base definitions for beremiz plugins
"""
import os,sys
import plugins
import types
import shutil
from xml.dom import minidom
import wx
#Quick hack to be able to find Beremiz IEC tools. Should be config params.
base_folder = os.path.split(sys.path[0])[0]
sys.path.append(os.path.join(base_folder, "plcopeneditor"))
sys.path.append(os.path.join(base_folder, "docutils"))
from docpdf import *
from xmlclass import GenerateClassesFromXSDstring
from wxPopen import ProcessLogger
_BaseParamsClass = GenerateClassesFromXSDstring("""<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="BaseParams">
<xsd:complexType>
<xsd:attribute name="Name" type="xsd:string" use="optional" default="__unnamed__"/>
<xsd:attribute name="IEC_Channel" type="xsd:integer" use="required"/>
<xsd:attribute name="Enabled" type="xsd:boolean" use="optional" default="true"/>
</xsd:complexType>
</xsd:element>
</xsd:schema>""")["BaseParams"]
NameTypeSeparator = '@'
class MiniTextControler:
def __init__(self, filepath):
self.FilePath = filepath
def SetEditedElementText(self, tagname, text):
file = open(self.FilePath, "w")
file.write(text)
file.close()
def GetEditedElementText(self, tagname):
if os.path.isfile(self.FilePath):
file = open(self.FilePath, "r")
text = file.read()
file.close()
return text
return ""
def GetEditedElementInterfaceVars(self, tagname):
return []
def GetEditedElementType(self, tagname):
return "program"
def GetBlockTypes(self, tagname = ""):
return []
def GetEnumeratedDataValues(self):
return []
def StartBuffering(self):
pass
def EndBuffering(self):
pass
def BufferProject(self):
pass
class PlugTemplate:
"""
This class is the one that define plugins.
"""
XSD = None
PlugChildsTypes = []
PlugMaxCount = None
PluginMethods = []
def _AddParamsMembers(self):
self.PlugParams = None
if self.XSD:
Classes = GenerateClassesFromXSDstring(self.XSD)
Classes = [(name, XSDclass) for name, XSDclass in Classes.items() if XSDclass.IsBaseClass]
if len(Classes) == 1:
name, XSDclass = Classes[0]
obj = XSDclass()
self.PlugParams = (name, obj)
setattr(self, name, obj)
def __init__(self):
# Create BaseParam
self.BaseParams = _BaseParamsClass()
self.MandatoryParams = ("BaseParams", self.BaseParams)
self._AddParamsMembers()
self.PluggedChilds = {}
# copy PluginMethods so that it can be later customized
self.PluginMethods = [dic.copy() for dic in self.PluginMethods]
def PluginBaseXmlFilePath(self, PlugName=None):
return os.path.join(self.PlugPath(PlugName), "baseplugin.xml")
def PluginXmlFilePath(self, PlugName=None):
return os.path.join(self.PlugPath(PlugName), "plugin.xml")
def PlugPath(self,PlugName=None):
if not PlugName:
PlugName = self.BaseParams.getName()
return os.path.join(self.PlugParent.PlugPath(), PlugName + NameTypeSeparator + self.PlugType)
def PlugTestModified(self):
return self.ChangesToSave
def ProjectTestModified(self):
"""
recursively check modified status
"""
if self.PlugTestModified():
return True
for PlugChild in self.IterChilds():
if PlugChild.ProjectTestModified():
return True
return False
def OnPlugSave(self):
#Default, do nothing and return success
return True
def GetParamsAttributes(self, path = None):
if path:
parts = path.split(".", 1)
if self.MandatoryParams and parts[0] == self.MandatoryParams[0]:
return self.MandatoryParams[1].getElementInfos(parts[0], parts[1])
elif self.PlugParams and parts[0] == self.PlugParams[0]:
return self.PlugParams[1].getElementInfos(parts[0], parts[1])
else:
params = []
if wx.VERSION < (2, 8, 0) and self.MandatoryParams:
params.append(self.MandatoryParams[1].getElementInfos(self.MandatoryParams[0]))
if self.PlugParams:
params.append(self.PlugParams[1].getElementInfos(self.PlugParams[0]))
return params
def SetParamsAttribute(self, path, value, logger):
self.ChangesToSave = True
# Filter IEC_Channel and Name, that have specific behavior
if path == "BaseParams.IEC_Channel":
return self.FindNewIEC_Channel(value,logger), True
elif path == "BaseParams.Name":
res = self.FindNewName(value,logger)
self.PlugRequestSave()
return res, True
parts = path.split(".", 1)
if self.MandatoryParams and parts[0] == self.MandatoryParams[0]:
self.MandatoryParams[1].setElementValue(parts[1], value)
elif self.PlugParams and parts[0] == self.PlugParams[0]:
self.PlugParams[1].setElementValue(parts[1], value)
return value, False
def PlugRequestSave(self):
# If plugin do not have corresponding directory
plugpath = self.PlugPath()
if not os.path.isdir(plugpath):
# Create it
os.mkdir(plugpath)
# generate XML for base XML parameters controller of the plugin
if self.MandatoryParams:
BaseXMLFile = open(self.PluginBaseXmlFilePath(),'w')
BaseXMLFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n")
BaseXMLFile.write(self.MandatoryParams[1].generateXMLText(self.MandatoryParams[0], 0))
BaseXMLFile.close()
# generate XML for XML parameters controller of the plugin
if self.PlugParams:
XMLFile = open(self.PluginXmlFilePath(),'w')
XMLFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n")
XMLFile.write(self.PlugParams[1].generateXMLText(self.PlugParams[0], 0))
XMLFile.close()
# Call the plugin specific OnPlugSave method
result = self.OnPlugSave()
if not result:
return "Error while saving \"%s\""%self.PlugPath()
# mark plugin as saved
self.ChangesToSave = False
# go through all childs and do the same
for PlugChild in self.IterChilds():
result = PlugChild.PlugRequestSave()
if result:
return result
return None
def PlugImport(self, src_PlugPath):
shutil.copytree(src_PlugPath, self.PlugPath)
return True
def PlugGenerate_C(self, buildpath, locations, logger):
"""
Generate C code
@param locations: List of complete variables locations \
[{"IEC_TYPE" : the IEC type (i.e. "INT", "STRING", ...)
"NAME" : name of the variable (generally "__IW0_1_2" style)
"DIR" : direction "Q","I" or "M"
"SIZE" : size "X", "B", "W", "D", "L"
"LOC" : tuple of interger for IEC location (0,1,2,...)
}, ...]
@return: [(C_file_name, CFLAGS),...] , LDFLAGS_TO_APPEND
"""
logger.write_warning(".".join(map(lambda x:str(x), self.GetCurrentLocation())) + " -> Nothing to do\n")
return [],"",False
def _Generate_C(self, buildpath, locations, logger):
# Generate plugins [(Cfiles, CFLAGS)], LDFLAGS
PlugCFilesAndCFLAGS, PlugLDFLAGS, DoCalls = self.PlugGenerate_C(buildpath, locations, logger)
# if some files heve been generated put them in the list with their location
if PlugCFilesAndCFLAGS:
LocationCFilesAndCFLAGS = [(self.GetCurrentLocation(), PlugCFilesAndCFLAGS, DoCalls)]
else:
LocationCFilesAndCFLAGS = []
# plugin asks for some LDFLAGS
if PlugLDFLAGS:
# LDFLAGS can be either string
if type(PlugLDFLAGS)==type(str()):
LDFLAGS=[PlugLDFLAGS]
#or list of strings
elif type(PlugLDFLAGS)==type(list()):
LDFLAGS=PlugLDFLAGS[:]
else:
LDFLAGS=[]
# recurse through all childs, and stack their results
for PlugChild in self.IECSortedChilds():
new_location = PlugChild.GetCurrentLocation()
# How deep are we in the tree ?
depth=len(new_location)
_LocationCFilesAndCFLAGS, _LDFLAGS = \
PlugChild._Generate_C(
#keep the same path
buildpath,
# filter locations that start with current IEC location
[loc for loc in locations if loc["LOC"][0:depth] == new_location ],
#propagete logger
logger)
# stack the result
LocationCFilesAndCFLAGS += _LocationCFilesAndCFLAGS
LDFLAGS += _LDFLAGS
return LocationCFilesAndCFLAGS,LDFLAGS
def BlockTypesFactory(self):
return []
def STLibraryFactory(self):
return ""
def IterChilds(self):
for PlugType, PluggedChilds in self.PluggedChilds.items():
for PlugInstance in PluggedChilds:
yield PlugInstance
def IECSortedChilds(self):
# reorder childs by IEC_channels
ordered = [(chld.BaseParams.getIEC_Channel(),chld) for chld in self.IterChilds()]
if ordered:
ordered.sort()
return zip(*ordered)[1]
else:
return []
def _GetChildBySomething(self, something, toks):
for PlugInstance in self.IterChilds():
# if match component of the name
if getattr(PlugInstance.BaseParams, something) == toks[0]:
# if Name have other components
if len(toks) >= 2:
# Recurse in order to find the latest object
return PlugInstance._GetChildBySomething( something, toks[1:])
# No sub name -> found
return PlugInstance
# Not found
return None
def GetChildByName(self, Name):
if Name:
toks = Name.split('.')
return self._GetChildBySomething("Name", toks)
else:
return self
def GetChildByIECLocation(self, Location):
if Location:
return self._GetChildBySomething("IEC_Channel", Location)
else:
return self
def GetCurrentLocation(self):
"""
@return: Tupple containing plugin IEC location of current plugin : %I0.0.4.5 => (0,0,4,5)
"""
return self.PlugParent.GetCurrentLocation() + (self.BaseParams.getIEC_Channel(),)
def GetCurrentName(self):
"""
@return: String "ParentParentName.ParentName.Name"
"""
return self.PlugParent._GetCurrentName() + self.BaseParams.getName()
def _GetCurrentName(self):
"""
@return: String "ParentParentName.ParentName.Name."
"""
return self.PlugParent._GetCurrentName() + self.BaseParams.getName() + "."
def GetPlugRoot(self):
return self.PlugParent.GetPlugRoot()
def GetFullIEC_Channel(self):
return ".".join([str(i) for i in self.GetCurrentLocation()]) + ".x"
def GetLocations(self):
location = self.GetCurrentLocation()
return [loc for loc in self.PlugParent.GetLocations() if loc["LOC"][0:len(location)] == location]
def GetPlugInfos(self):
childs = []
# reorder childs by IEC_channels
for child in self.IECSortedChilds():
childs.append(child.GetPlugInfos())
if wx.VERSION < (2, 8, 0):
return {"name" : "%d-%s"%(self.BaseParams.getIEC_Channel(),self.BaseParams.getName()), "type" : self.BaseParams.getName(), "values" : childs}
else:
return {"name" : self.BaseParams.getName(), "channel" : self.BaseParams.getIEC_Channel(), "enabled" : self.BaseParams.getEnabled(), "parent" : len(self.PlugChildsTypes) > 0, "type" : self.BaseParams.getName(), "values" : childs}
def FindNewName(self, DesiredName, logger):
"""
Changes Name to DesiredName if available, Name-N if not.
@param DesiredName: The desired Name (string)
"""
# Get Current Name
CurrentName = self.BaseParams.getName()
# Do nothing if no change
#if CurrentName == DesiredName: return CurrentName
# Build a list of used Name out of parent's PluggedChilds
AllNames=[]
for PlugInstance in self.PlugParent.IterChilds():
if PlugInstance != self:
AllNames.append(PlugInstance.BaseParams.getName())
# Find a free name, eventually appending digit
res = DesiredName
suffix = 1
while res in AllNames:
res = "%s-%d"%(DesiredName, suffix)
suffix += 1
# Get old path
oldname = self.PlugPath()
# Check previous plugin existance
dontexist = self.BaseParams.getName() == "__unnamed__"
# Set the new name
self.BaseParams.setName(res)
# Rename plugin dir if exist
if not dontexist:
shutil.move(oldname, self.PlugPath())
# warn user he has two left hands
if DesiredName != res:
logger.write_warning("A child names \"%s\" already exist -> \"%s\"\n"%(DesiredName,res))
return res
def FindNewIEC_Channel(self, DesiredChannel, logger):
"""
Changes IEC Channel number to DesiredChannel if available, nearest available if not.
@param DesiredChannel: The desired IEC channel (int)
"""
# Get Current IEC channel
CurrentChannel = self.BaseParams.getIEC_Channel()
# Do nothing if no change
#if CurrentChannel == DesiredChannel: return CurrentChannel
# Build a list of used Channels out of parent's PluggedChilds
AllChannels=[]
for PlugInstance in self.PlugParent.IterChilds():
if PlugInstance != self:
AllChannels.append(PlugInstance.BaseParams.getIEC_Channel())
AllChannels.sort()
# Now, try to guess the nearest available channel
res = DesiredChannel
while res in AllChannels: # While channel not free
if res < CurrentChannel: # Want to go down ?
res -= 1 # Test for n-1
if res < 0 :
if logger :
logger.write_warning("Cannot find lower free IEC channel than %d\n"%CurrentChannel)
return CurrentChannel # Can't go bellow 0, do nothing
else : # Want to go up ?
res += 1 # Test for n-1
# Finally set IEC Channel
self.BaseParams.setIEC_Channel(res)
if logger and DesiredChannel != res:
logger.write_warning("A child with IEC channel %d already exist -> %d\n"%(DesiredChannel,res))
return res
def OnPlugClose(self):
return True
def _doRemoveChild(self, PlugInstance):
# Remove all childs of child
for SubPlugInstance in PlugInstance.IterChilds():
PlugInstance._doRemoveChild(SubPlugInstance)
# Call the OnCloseMethod
PlugInstance.OnPlugClose()
# Delete plugin dir
shutil.rmtree(PlugInstance.PlugPath())
# Remove child of PluggedChilds
self.PluggedChilds[PlugInstance.PlugType].remove(PlugInstance)
# Forget it... (View have to refresh)
def PlugRemove(self):
# Fetch the plugin
#PlugInstance = self.GetChildByName(PlugName)
# Ask to his parent to remove it
self.PlugParent._doRemoveChild(self)
def PlugAddChild(self, PlugName, PlugType, logger):
"""
Create the plugins that may be added as child to this node self
@param PlugType: string desining the plugin class name (get name from PlugChildsTypes)
@param PlugName: string for the name of the plugin instance
"""
# reorgabize self.PlugChildsTypes tuples from (name, PlugClass, Help)
# to ( name, (PlugClass, Help)), an make a dict
transpose = zip(*self.PlugChildsTypes)
PlugChildsTypes = dict(zip(transpose[0],zip(transpose[1],transpose[2])))
# Check that adding this plugin is allowed
try:
PlugClass, PlugHelp = PlugChildsTypes[PlugType]
except KeyError:
raise Exception, "Cannot create child %s of type %s "%(PlugName, PlugType)
# if PlugClass is a class factory, call it. (prevent unneeded imports)
if type(PlugClass) == types.FunctionType:
PlugClass = PlugClass()
# Eventualy Initialize child instance list for this class of plugin
PluggedChildsWithSameClass = self.PluggedChilds.setdefault(PlugType, list())
# Check count
if getattr(PlugClass, "PlugMaxCount", None) and len(PluggedChildsWithSameClass) >= PlugClass.PlugMaxCount:
raise Exception, "Max count (%d) reached for this plugin of type %s "%(PlugClass.PlugMaxCount, PlugType)
# create the final class, derived of provided plugin and template
class FinalPlugClass(PlugClass, PlugTemplate):
"""
Plugin class is derivated into FinalPlugClass before being instanciated
This way __init__ is overloaded to ensure PlugTemplate.__init__ is called
before PlugClass.__init__, and to do the file related stuff.
"""
def __init__(_self):
# self is the parent
_self.PlugParent = self
# Keep track of the plugin type name
_self.PlugType = PlugType
# remind the help string, for more fancy display
_self.PlugHelp = PlugHelp
# Call the base plugin template init - change XSD into class members
PlugTemplate.__init__(_self)
# check name is unique
NewPlugName = _self.FindNewName(PlugName, logger)
# If dir have already be made, and file exist
if os.path.isdir(_self.PlugPath(NewPlugName)): #and os.path.isfile(_self.PluginXmlFilePath(PlugName)):
#Load the plugin.xml file into parameters members
_self.LoadXMLParams(logger, NewPlugName)
# Basic check. Better to fail immediately.
if (_self.BaseParams.getName() != NewPlugName):
raise Exception, "Project tree layout do not match plugin.xml %s!=%s "%(NewPlugName, _self.BaseParams.getName())
# Now, self.PlugPath() should be OK
# Check that IEC_Channel is not already in use.
_self.FindNewIEC_Channel(_self.BaseParams.getIEC_Channel(),logger)
# Call the plugin real __init__
if getattr(PlugClass, "__init__", None):
PlugClass.__init__(_self)
#Load and init all the childs
_self.LoadChilds(logger)
#just loaded, nothing to saved
_self.ChangesToSave = False
else:
# If plugin do not have corresponding file/dirs - they will be created on Save
os.mkdir(_self.PlugPath())
# Find an IEC number
_self.FindNewIEC_Channel(0, None)
# Call the plugin real __init__
if getattr(PlugClass, "__init__", None):
PlugClass.__init__(_self)
_self.PlugRequestSave()
#just created, must be saved
_self.ChangesToSave = True
def _getBuildPath(_self):
return self._getBuildPath()
# Create the object out of the resulting class
newPluginOpj = FinalPlugClass()
# Store it in PluggedChils
PluggedChildsWithSameClass.append(newPluginOpj)
return newPluginOpj
def LoadXMLParams(self, logger, PlugName = None):
methode_name = os.path.join(self.PlugPath(PlugName), "methods.py")
if os.path.isfile(methode_name):
logger.write_error("Welcome to the Beremiz Demo\n\n")
logger.write("This demo provides a PLC working with the CANopen plugin\n")
logger.write("""Some external programs are also provided:\n
- a CAN TCP server to simulate the CANopen network
- a virtual slave node to simulate input block
- a virtual slave node to simulate output block
""")
logger.write("\nInfo: For this demo, %s plugin has some special methods to run external programs.\nThese methods are defined in methods.py\n" % (PlugName or "Root"))
open_pdf(os.path.join(os.path.split(__file__)[0], "doc", "manual_beremiz.pdf"), pagenum=20)
execfile(methode_name)
# Get the base xml tree
if self.MandatoryParams:
#try:
basexmlfile = open(self.PluginBaseXmlFilePath(PlugName), 'r')
basetree = minidom.parse(basexmlfile)
self.MandatoryParams[1].loadXMLTree(basetree.childNodes[0])
basexmlfile.close()
#except Exception, e:
# logger.write_error("Couldn't load plugin base parameters %s :\n %s" % (PlugName, str(e)))
# Get the xml tree
if self.PlugParams:
#try:
xmlfile = open(self.PluginXmlFilePath(PlugName), 'r')
tree = minidom.parse(xmlfile)
self.PlugParams[1].loadXMLTree(tree.childNodes[0])
xmlfile.close()
#except Exception, e:
# logger.write_error("Couldn't load plugin parameters %s :\n %s" % (PlugName, str(e)))
def LoadChilds(self, logger):
# Iterate over all PlugName@PlugType in plugin directory, and try to open them
for PlugDir in os.listdir(self.PlugPath()):
if os.path.isdir(os.path.join(self.PlugPath(), PlugDir)) and \
PlugDir.count(NameTypeSeparator) == 1:
pname, ptype = PlugDir.split(NameTypeSeparator)
#try:
self.PlugAddChild(pname, ptype, logger)
#except Exception, e:
# logger.write_error("Could not add child \"%s\", type %s :\n%s\n"%(pname, ptype, str(e)))
def EnableMethod(self, method, value):
for d in self.PluginMethods:
if d["method"]==method:
d["enabled"]=value
return True
return False
def _GetClassFunction(name):
def GetRootClass():
return getattr(__import__("plugins." + name), name).RootClass
return GetRootClass
####################################################################################
####################################################################################
####################################################################################
################################### ROOT ######################################
####################################################################################
####################################################################################
####################################################################################
if wx.Platform == '__WXMSW__':
exe_ext=".exe"
else:
exe_ext=""
iec2c_path = os.path.join(base_folder, "matiec", "iec2c"+exe_ext)
ieclib_path = os.path.join(base_folder, "matiec", "lib")
# import for project creation timestamping
from time import localtime
from datetime import datetime
# import necessary stuff from PLCOpenEditor
from PLCControler import PLCControler
from PLCOpenEditor import PLCOpenEditor, ProjectDialog
from TextViewer import TextViewer
from plcopen.structures import IEC_KEYWORDS, AddPluginBlockList, ClearPluginTypes, PluginTypes
import runtime
import re
class PluginsRoot(PlugTemplate, PLCControler):
"""
This class define Root object of the plugin tree.
It is responsible of :
- Managing project directory
- Building project
- Handling PLCOpenEditor controler and view
- Loading user plugins and instanciante them as childs
- ...
"""
# For root object, available Childs Types are modules of the plugin packages.
PlugChildsTypes = [(name, _GetClassFunction(name), help) for name, help in zip(plugins.__all__,plugins.helps)]
XSD = """<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="BeremizRoot">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="TargetType">
<xsd:complexType>
<xsd:choice>
<xsd:element name="Win32">
<xsd:complexType>
<xsd:attribute name="Priority" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="Linux">
<xsd:complexType>
<xsd:attribute name="Nice" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="Xenomai">
<xsd:complexType>
<xsd:attribute name="xeno_config" type="xsd:string" use="optional" default="/usr/xenomai/"/>
<xsd:attribute name="Priority" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="RTAI">
<xsd:complexType>
<xsd:attribute name="rtai_config" type="xsd:string" use="required"/>
<xsd:attribute name="Priority" type="xsd:integer" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="Library">
<xsd:complexType>
<xsd:attribute name="Dynamic" type="xsd:boolean" use="optional" default="true"/>
</xsd:complexType>
</xsd:element>
</xsd:choice>
</xsd:complexType>
</xsd:element>
<xsd:element name="Connection">
<xsd:complexType>
<xsd:choice>
<xsd:element name="Local"/>
<xsd:element name="TCP_IP">
<xsd:complexType>
<xsd:attribute name="Host" type="xsd:string" use="required"/>
</xsd:complexType>
</xsd:element>
</xsd:choice>
</xsd:complexType>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="Compiler" type="xsd:string" use="optional" default="gcc"/>
<xsd:attribute name="CFLAGS" type="xsd:string" use="required"/>
<xsd:attribute name="Linker" type="xsd:string" use="optional" default="ld"/>
<xsd:attribute name="LDFLAGS" type="xsd:string" use="required"/>
</xsd:complexType>
</xsd:element>
</xsd:schema>
"""
def __init__(self, frame):
PLCControler.__init__(self)
self.MandatoryParams = None
self.AppFrame = frame
"""
This method are not called here... but in NewProject and OpenProject
self._AddParamsMembers()
self.PluggedChilds = {}
"""
# In both new or load scenario, no need to save
self.ChangesToSave = False
# root have no parent
self.PlugParent = None
# Keep track of the plugin type name
self.PlugType = "Beremiz"
# After __init__ root plugin is not valid
self.ProjectPath = None
self.PLCEditor = None
# copy PluginMethods so that it can be later customized
self.PluginMethods = [dic.copy() for dic in self.PluginMethods]
# special root member for handlig PLC execution
self.runningPLC = None
def PlugTestModified(self):
return self.ChangesToSave or not self.ProjectIsSaved()
def HasProjectOpened(self):
"""
Return if a project is actually opened
"""
return self.ProjectPath != None
def GetPlugRoot(self):
return self
def GetCurrentLocation(self):
return ()
def GetCurrentName(self):
return ""
def _GetCurrentName(self):
return ""
def GetProjectPath(self):
return self.ProjectPath
def GetProjectName(self):
return os.path.split(self.ProjectPath)[1]
def GetPlugInfos(self):
childs = []
for child in self.IterChilds():
childs.append(child.GetPlugInfos())
return {"name" : "PLC (%s)"%self.GetProjectName(), "type" : None, "values" : childs}
def NewProject(self, ProjectPath):
"""
Create a new project in an empty folder
@param ProjectPath: path of the folder where project have to be created
@param PLCParams: properties of the PLCOpen program created
"""
# Verify that choosen folder is empty
if not os.path.isdir(ProjectPath) or len(os.listdir(ProjectPath)) > 0:
return "Folder choosen isn't empty. You can't use it for a new project!"
dialog = ProjectDialog(self.AppFrame)
if dialog.ShowModal() == wx.ID_OK:
values = dialog.GetValues()
values["creationDateTime"] = datetime(*localtime()[:6])
dialog.Destroy()
else:
dialog.Destroy()
return "Project not created"
# Create PLCOpen program
self.CreateNewProject(values)
# Change XSD into class members
self._AddParamsMembers()
self.PluggedChilds = {}
# Keep track of the root plugin (i.e. project path)
self.ProjectPath = ProjectPath
# get plugins bloclist (is that usefull at project creation?)
self.RefreshPluginsBlockLists()
# this will create files base XML files
self.SaveProject()
return None
def LoadProject(self, ProjectPath, logger):
"""
Load a project contained in a folder
@param ProjectPath: path of the project folder
"""
# Verify that project contains a PLCOpen program
plc_file = os.path.join(ProjectPath, "plc.xml")
if not os.path.isfile(plc_file):
return "Folder choosen doesn't contain a program. It's not a valid project!"
# Load PLCOpen file
result = self.OpenXMLFile(plc_file)
if result:
return result
# Change XSD into class members
self._AddParamsMembers()
self.PluggedChilds = {}
# Keep track of the root plugin (i.e. project path)
self.ProjectPath = ProjectPath
# If dir have already be made, and file exist
if os.path.isdir(self.PlugPath()) and os.path.isfile(self.PluginXmlFilePath()):
#Load the plugin.xml file into parameters members
result = self.LoadXMLParams(logger)
if result:
return result
#Load and init all the childs
self.LoadChilds(logger)
self.RefreshPluginsBlockLists()
return None
def SaveProject(self):
if not self.SaveXMLFile():
self.SaveXMLFile(os.path.join(self.ProjectPath, 'plc.xml'))
if self.PLCEditor:
self.PLCEditor.RefreshTitle()
self.PlugRequestSave()
# Update PLCOpenEditor Plugin Block types from loaded plugins
def RefreshPluginsBlockLists(self):
if getattr(self, "PluggedChilds", None) is not None:
ClearPluginTypes()
AddPluginBlockList(self.BlockTypesFactory())
for child in self.IterChilds():
AddPluginBlockList(child.BlockTypesFactory())
if self.PLCEditor is not None:
self.PLCEditor.RefreshEditor()
def PlugPath(self, PlugName=None):
return self.ProjectPath
def PluginXmlFilePath(self, PlugName=None):
return os.path.join(self.PlugPath(PlugName), "beremiz.xml")
def PlugGenerate_C(self, buildpath, locations, logger):
"""
Generate C code
@param locations: List of complete variables locations \
[(IEC_loc, IEC_Direction, IEC_Type, Name)]\
ex: [((0,0,4,5),'I','STRING','__IX_0_0_4_5'),...]
@return: [(C_file_name, CFLAGS),...] , LDFLAGS_TO_APPEND
"""
return [(C_file_name, self.CFLAGS) for C_file_name in self.PLCGeneratedCFiles ] , "", False
def _getBuildPath(self):
return os.path.join(self.ProjectPath, "build")
def _getIECcodepath(self):
# define name for IEC code file
return os.path.join(self._getBuildPath(), "plc.st")
def _getIECgeneratedcodepath(self):
# define name for IEC generated code file
return os.path.join(self._getBuildPath(), "generated_plc.st")
def _getIECrawcodepath(self):
# define name for IEC raw code file
return os.path.join(self._getBuildPath(), "raw_plc.st")
def GetLocations(self):
locations = []
filepath = os.path.join(self._getBuildPath(),"LOCATED_VARIABLES.h")
if os.path.isfile(filepath):
# IEC2C compiler generate a list of located variables : LOCATED_VARIABLES.h
location_file = open(os.path.join(self._getBuildPath(),"LOCATED_VARIABLES.h"))
# each line of LOCATED_VARIABLES.h declares a located variable
lines = [line.strip() for line in location_file.readlines()]
# This regular expression parses the lines genereated by IEC2C
LOCATED_MODEL = re.compile("__LOCATED_VAR\((?P<IEC_TYPE>[A-Z]*),(?P<NAME>[_A-Za-z0-9]*),(?P<DIR>[QMI])(?:,(?P<SIZE>[XBWD]))?,(?P<LOC>[,0-9]*)\)")
for line in lines:
# If line match RE,
result = LOCATED_MODEL.match(line)
if result:
# Get the resulting dict
resdict = result.groupdict()
# rewrite string for variadic location as a tuple of integers
resdict['LOC'] = tuple(map(int,resdict['LOC'].split(',')))
# set located size to 'X' if not given
if not resdict['SIZE']:
resdict['SIZE'] = 'X'
# finally store into located variable list
locations.append(resdict)
return locations
def _Generate_SoftPLC(self, logger):
"""
Generate SoftPLC ST/IL/SFC code out of PLCOpenEditor controller, and compile it with IEC2C
@param buildpath: path where files should be created
@param logger: the log pseudo file
"""
# Update PLCOpenEditor Plugin Block types before generate ST code
self.RefreshPluginsBlockLists()
logger.write("Generating SoftPLC IEC-61131 ST/IL/SFC code...\n")
buildpath = self._getBuildPath()
# ask PLCOpenEditor controller to write ST/IL/SFC code file
result = self.GenerateProgram(self._getIECgeneratedcodepath())
if result is not None:
# Failed !
logger.write_error("Error in ST/IL/SFC code generator :\n%s\n"%result)
return False
plc_file = open(self._getIECcodepath(), "w")
if os.path.isfile(self._getIECrawcodepath()):
plc_file.write(open(self._getIECrawcodepath(), "r").read())
plc_file.write("\n")
plc_file.write(open(self._getIECgeneratedcodepath(), "r").read())
plc_file.close()
logger.write("Compiling IEC Program in to C code...\n")
# Now compile IEC code into many C files
# files are listed to stdout, and errors to stderr.
status, result, err_result = ProcessLogger(
logger,
"%s \"%s\" -I \"%s\" \"%s\""%(
iec2c_path,
self._getIECcodepath(),
ieclib_path, buildpath),
no_stdout=True).spin()
if status:
# Failed !
logger.write_error("Error : IEC to C compiler returned %d\n"%status)
return False
# Now extract C files of stdout
C_files = [ fname for fname in result.splitlines() if fname[-2:]==".c" or fname[-2:]==".C" ]
# remove those that are not to be compiled because included by others
C_files.remove("POUS.c")
if not C_files:
logger.write_error("Error : At least one configuration and one ressource must be declared in PLC !\n")
return False
# transform those base names to full names with path
C_files = map(lambda filename:os.path.join(buildpath, filename), C_files)
logger.write("Extracting Located Variables...\n")
# Keep track of generated located variables for later use by self._Generate_C
self.PLCGeneratedLocatedVars = self.GetLocations()
# Keep track of generated C files for later use by self.PlugGenerate_C
self.PLCGeneratedCFiles = C_files
# compute CFLAGS for plc
self.CFLAGS = "\"-I"+ieclib_path+"\""
return True
def _build(self, logger):
"""
Method called by user to (re)build SoftPLC and plugin tree
"""
buildpath = self._getBuildPath()
# Eventually create build dir
if not os.path.exists(buildpath):
os.mkdir(buildpath)
logger.flush()
logger.write("Start build in %s\n" % buildpath)
self.EnableMethod("_Clean", True)
self.EnableMethod("_showIECcode", True)
# Generate SoftPLC code
if not self._Generate_SoftPLC(logger):
logger.write_error("SoftPLC code generation failed !\n")
return False
#logger.write("SoftPLC code generation successfull\n")
logger.write("Generating plugins code ...\n")
# Generate C code and compilation params from plugin hierarchy
try:
LocationCFilesAndCFLAGS,LDFLAGS = self._Generate_C(
buildpath,
self.PLCGeneratedLocatedVars,
logger)
except Exception, msg:
logger.write_error("Plugins code generation Failed !\n")
logger.write_error(str(msg))
return False
#debug
#import pprint
#pp = pprint.PrettyPrinter(indent=4)
#logger.write("LocationCFilesAndCFLAGS :\n"+pp.pformat(LocationCFilesAndCFLAGS)+"\n")
#logger.write("LDFLAGS :\n"+pp.pformat(LDFLAGS)+"\n")
# Generate main
locstrs = map(lambda x:"_".join(map(str,x)), [loc for loc,Cfiles,DoCalls in LocationCFilesAndCFLAGS if loc and DoCalls])
plc_main = runtime.code("plc_common_main") % {
"calls_prototypes":"\n".join(
["int __init_%(s)s(int argc,char **argv);\nvoid __cleanup_%(s)s();\nvoid __retrive_%(s)s();\nvoid __publish_%(s)s();"%
{'s':locstr} for locstr in locstrs]),
"retrive_calls":" \n".join(["__retrive_%(s)s();"%{'s':locstr} for locstr in locstrs]),
"publish_calls":" \n".join(["__publish_%(s)s();"%{'s':locstr} for locstr in locstrs]),
"init_calls":" \n".join(["init_level++; if(res = __init_%(s)s(argc,argv)) return res;"%{'s':locstr} for locstr in locstrs]),
"cleanup_calls":" \n".join(["if(init_level-- > 0) __cleanup_%(s)s();"%{'s':locstr} for locstr in locstrs])}
target_name = self.BeremizRoot.TargetType.content["name"]
plc_main += runtime.code("plc_%s_main"%target_name)
main_path = os.path.join(buildpath, "main.c" )
f = open(main_path,'w')
f.write(plc_main)
f.close()
# First element is necessarely root
LocationCFilesAndCFLAGS[0][1].insert(0,(main_path, self.CFLAGS))
# Compile the resulting code into object files.
compiler = self.BeremizRoot.getCompiler()
_CFLAGS = self.BeremizRoot.getCFLAGS()
linker = self.BeremizRoot.getLinker()
_LDFLAGS = self.BeremizRoot.getLDFLAGS()
obns = []
objs = []
for Location, CFilesAndCFLAGS, DoCalls in LocationCFilesAndCFLAGS:
if Location:
logger.write("Plugin : " + self.GetChildByIECLocation(Location).GetCurrentName() + " " + str(Location)+"\n")
else:
logger.write("PLC :\n")
for CFile, CFLAGS in CFilesAndCFLAGS:
bn = os.path.basename(CFile)
obn = os.path.splitext(bn)[0]+".o"
obns.append(obn)
logger.write(" [CC] "+bn+" -> "+obn+"\n")
objectfilename = os.path.splitext(CFile)[0]+".o"
status, result, err_result = ProcessLogger(
logger,
"\"%s\" -c \"%s\" -o \"%s\" %s %s"%
(compiler, CFile, objectfilename, _CFLAGS, CFLAGS)
).spin()
if status != 0:
logger.write_error("Build failed\n")
return False
objs.append(objectfilename)
# Link all the object files into one executable
logger.write("Linking :\n")
exe = self.GetProjectName()
if target_name == "Win32":
exe += ".exe"
exe_path = os.path.join(buildpath, exe)
logger.write(" [CC] " + ' '.join(obns)+" -> " + exe + "\n")
status, result, err_result = ProcessLogger(
logger,
"\"%s\" \"%s\" -o \"%s\" %s"%
(linker,
'" "'.join(objs),
exe_path,
' '.join(LDFLAGS+[_LDFLAGS]))
).spin()
if status != 0:
logger.write_error("Build failed\n")
self.EnableMethod("_Run", False)
return False
self.EnableMethod("_Run", True)
return True
def _showIECcode(self, logger):
plc_file = self._getIECcodepath()
new_dialog = wx.Frame(None)
ST_viewer = TextViewer(new_dialog, "", None, None)
#ST_viewer.Enable(False)
ST_viewer.SetKeywords(IEC_KEYWORDS)
try:
text = file(plc_file).read()
except:
text = '(* No IEC code have been generated at that time ! *)'
ST_viewer.SetText(text = text)
new_dialog.Show()
def _editIECrawcode(self, logger):
new_dialog = wx.Frame(None)
buildpath = self._getBuildPath()
# Eventually create build dir
if not os.path.exists(buildpath):
os.mkdir(buildpath)
controler = MiniTextControler(self._getIECrawcodepath())
ST_viewer = TextViewer(new_dialog, "", None, controler)
#ST_viewer.Enable(False)
ST_viewer.SetKeywords(IEC_KEYWORDS)
ST_viewer.RefreshView()
new_dialog.Show()
def _EditPLC(self, logger):
if self.PLCEditor is None:
self.RefreshPluginsBlockLists()
def _onclose():
self.PLCEditor = None
def _onsave():
self.SaveProject()
self.PLCEditor = PLCOpenEditor(self.AppFrame, self)
self.PLCEditor.RefreshProjectTree()
self.PLCEditor.RefreshFileMenu()
self.PLCEditor.RefreshEditMenu()
self.PLCEditor.RefreshToolBar()
self.PLCEditor._onclose = _onclose
self.PLCEditor._onsave = _onsave
self.PLCEditor.Show()
def _Clean(self, logger):
if os.path.isdir(os.path.join(self._getBuildPath())):
logger.write("Cleaning the build directory\n")
shutil.rmtree(os.path.join(self._getBuildPath()))
else:
logger.write_error("Build directory already clean\n")
self.EnableMethod("_showIECcode", False)
self.EnableMethod("_Clean", False)
self.EnableMethod("_Run", False)
def _Run(self, logger):
command_start_plc = os.path.join(self._getBuildPath(),self.GetProjectName() + exe_ext)
if os.path.isfile(command_start_plc):
logger.write("Starting PLC\n")
def this_plc_finish_callback(*args):
if self.runningPLC is not None:
self.runningPLC = None
self.reset_finished()
self.runningPLC = ProcessLogger(
logger,
command_start_plc,
finish_callback = this_plc_finish_callback)
self.EnableMethod("_Clean", False)
self.EnableMethod("_Run", False)
self.EnableMethod("_Stop", True)
self.EnableMethod("_build", False)
else:
logger.write_error("%s doesn't exist\n" %command_start_plc)
def reset_finished(self):
self.EnableMethod("_Clean", True)
self.EnableMethod("_Run", True)
self.EnableMethod("_Stop", False)
self.EnableMethod("_build", True)
def _Stop(self, logger):
if self.runningPLC is not None:
logger.write("Stopping PLC\n")
was_runningPLC = self.runningPLC
self.runningPLC = None
was_runningPLC.kill()
self.reset_finished()
PluginMethods = [
{"bitmap" : os.path.join("images", "editPLC"),
"name" : "Edit PLC",
"tooltip" : "Edit PLC program with PLCOpenEditor",
"method" : "_EditPLC"},
{"bitmap" : os.path.join("images", "Build"),
"name" : "Build",
"tooltip" : "Build project into build folder",
"method" : "_build"},
{"bitmap" : os.path.join("images", "Clean"),
"name" : "Clean",
"tooltip" : "Clean project build folder",
"method" : "_Clean"},
{"bitmap" : os.path.join("images", "Run"),
"name" : "Run",
"enabled" : False,
"tooltip" : "Run PLC from build folder",
"method" : "_Run"},
{"bitmap" : os.path.join("images", "Stop"),
"name" : "Stop",
"enabled" : False,
"tooltip" : "Stop Running PLC",
"method" : "_Stop"},
{"bitmap" : os.path.join("images", "ShowIECcode"),
"name" : "Show IEC code",
"enabled" : False,
"tooltip" : "Show IEC code generated by PLCGenerator",
"method" : "_showIECcode"},
{"name" : "Edit raw IEC code",
"tooltip" : "Edit raw IEC code added to code generated by PLCGenerator",
"method" : "_editIECrawcode"}
]