Split runtime's twisted modules import and setup to ease runtime extensions hotpatching
import os, re
from lxml import etree
from xmlclass import GenerateParserFromXSD
from CodeFileTreeNode import CodeFile
from PythonEditor import PythonEditor
class PythonFileCTNMixin(CodeFile):
CODEFILE_NAME = "PyFile"
SECTIONS_NAMES = [
"globals",
"init",
"cleanup",
"start",
"stop"]
EditorType = PythonEditor
def __init__(self):
CodeFile.__init__(self)
filepath = self.PythonFileName()
if os.path.isfile(filepath):
PythonParser = GenerateParserFromXSD(
os.path.join(os.path.dirname(__file__), "py_ext_xsd.xsd"))
xmlfile = open(filepath, 'r')
pythonfile_xml = xmlfile.read()
xmlfile.close()
pythonfile_xml = pythonfile_xml.replace(
'xmlns="http://www.w3.org/2001/XMLSchema"',
'xmlns:xhtml="http://www.w3.org/1999/xhtml"')
for cre, repl in [
(re.compile("(?<!<xhtml:p>)(?:<!\[CDATA\[)"), "<xhtml:p><![CDATA["),
(re.compile("(?:]]>)(?!</xhtml:p>)"), "]]></xhtml:p>")]:
pythonfile_xml = cre.sub(repl, pythonfile_xml)
try:
python_code, error = PythonParser.LoadXMLString(pythonfile_xml)
if error is None:
self.CodeFile.globals.setanyText(python_code.getanyText())
os.remove(filepath)
self.CreateCodeFileBuffer(False)
self.OnCTNSave()
except Exception, exc:
error = unicode(exc)
if error is not None:
self.GetCTRoot().logger.write_error(
_("Couldn't import old %s file.") % self.CTNName())
def CodeFileName(self):
return os.path.join(self.CTNPath(), "pyfile.xml")
def PythonFileName(self):
return os.path.join(self.CTNPath(), "py_ext.xml")
PreSectionsTexts = {}
PostSectionsTexts = {}
def GetSection(self,section):
return self.PreSectionsTexts.get(section,"") + "\n" + \
getattr(self.CodeFile, section).getanyText() + "\n" + \
self.PostSectionsTexts.get(section,"")
def CTNGenerate_C(self, buildpath, locations):
# location string for that CTN
location_str = "_".join(map(lambda x:str(x),
self.GetCurrentLocation()))
configname = self.GetCTRoot().GetProjectConfigNames()[0]
# python side PLC global variables access stub
globalstubs = "\n".join(["""\
_%(name)s_ctype, _%(name)s_unpack, _%(name)s_pack = \\
TypeTranslator["%(IECtype)s"]
_PySafeGetPLCGlob_%(name)s = PLCBinary.__SafeGetPLCGlob_%(name)s
_PySafeGetPLCGlob_%(name)s.restype = None
_PySafeGetPLCGlob_%(name)s.argtypes = [ctypes.POINTER(_%(name)s_ctype)]
_PySafeSetPLCGlob_%(name)s = PLCBinary.__SafeSetPLCGlob_%(name)s
_PySafeSetPLCGlob_%(name)s.restype = None
_PySafeSetPLCGlob_%(name)s.argtypes = [ctypes.POINTER(_%(name)s_ctype)]
_PySafePLCGlobals.append(("%(name)s","%(IECtype)s"))
""" % { "name": variable.getname(),
"configname": configname.upper(),
"uppername": variable.getname().upper(),
"IECtype": variable.gettype()}
for variable in self.CodeFile.variables.variable])
# Runtime calls (start, stop, init, and cleanup)
rtcalls = ""
for section in self.SECTIONS_NAMES:
if section != "globals":
rtcalls += "def _runtime_%s_%s():\n" % (location_str, section)
sectiontext = self.GetSection(section).strip()
if sectiontext:
rtcalls += ' ' + \
sectiontext.replace('\n', '\n ')+"\n\n"
else:
rtcalls += " pass\n\n"
globalsection = self.GetSection("globals")
pyextname = self.CTNName()
PyFileContent = """\
#!/usr/bin/env python
# -*- coding: utf-8 -*-
## Code generated by Beremiz python mixin confnode
##
## Code for PLC global variable access
from targets.typemapping import TypeTranslator
import ctypes
_PyExtName = "%(pyextname)s"
_PySafePLCGlobals = []
%(globalstubs)s
## User code in "global" scope
%(globalsection)s
## Beremiz python runtime calls
%(rtcalls)s
""" % locals()
# write generated content to python file
runtimefile_path = os.path.join(buildpath,
"runtime_%s.py"%location_str)
runtimefile = open(runtimefile_path, 'w')
runtimefile.write(PyFileContent.encode('utf-8'))
runtimefile.close()
# C code for safe global variables access
vardecfmt = """\
extern __IEC_%(IECtype)s_t %(configname)s__%(uppername)s;
IEC_%(IECtype)s __%(name)s_rbuffer = __INIT_%(IECtype)s;
IEC_%(IECtype)s __%(name)s_wbuffer;
long __%(name)s_rlock = 0;
long __%(name)s_wlock = 0;
int __%(name)s_wbuffer_written = 0;
void __SafeGetPLCGlob_%(name)s(IEC_%(IECtype)s *pvalue){
while(AtomicCompareExchange(&__%(name)s_rlock, 0, 1));
*pvalue = __%(name)s_rbuffer;
AtomicCompareExchange((long*)&__%(name)s_rlock, 1, 0);
}
void __SafeSetPLCGlob_%(name)s(IEC_%(IECtype)s *value){
while(AtomicCompareExchange(&__%(name)s_wlock, 0, 1));
__%(name)s_wbuffer = *value;
__%(name)s_wbuffer_written = 1;
AtomicCompareExchange((long*)&__%(name)s_wlock, 1, 0);
}
"""
varretfmt = """\
if(!AtomicCompareExchange(&__%(name)s_wlock, 0, 1)){
if(__%(name)s_wbuffer_written == 1){
%(configname)s__%(uppername)s.value = __%(name)s_wbuffer;
__%(name)s_wbuffer_written = 0;
}
AtomicCompareExchange((long*)&__%(name)s_wlock, 1, 0);
}
"""
varpubfmt = """\
if(!AtomicCompareExchange(&__%(name)s_rlock, 0, 1)){
__%(name)s_rbuffer = %(configname)s__%(uppername)s.value;
AtomicCompareExchange((long*)&__%(name)s_rlock, 1, 0);
}
"""
var_str = map("\n".join, zip(*[
map(lambda f : f % varinfo,
(vardecfmt, varretfmt, varpubfmt))
for varinfo in map(lambda variable : {
"name": variable.getname(),
"configname": configname.upper(),
"uppername": variable.getname().upper(),
"IECtype": variable.gettype()},
self.CodeFile.variables.variable)]))
if len(var_str) > 0:
vardec, varret, varpub = var_str
else:
vardec = varret = varpub = ""
PyCFileContent = """\
/*
* Code generated by Beremiz py_ext confnode
* for safe global variables access
*/
#include "iec_types_all.h"
#include "beremiz.h"
/* User variables reference */
%(vardec)s
/* Beremiz confnode functions */
int __init_%(location_str)s(int argc,char **argv){
return 0;
}
void __cleanup_%(location_str)s(void){
}
void __retrieve_%(location_str)s(void){
%(varret)s
}
void __publish_%(location_str)s(void){
%(varpub)s
}
""" % locals()
Gen_PyCfile_path = os.path.join(buildpath, "PyCFile_%s.c"%location_str)
pycfile = open(Gen_PyCfile_path,'w')
pycfile.write(PyCFileContent)
pycfile.close()
matiec_flags = '"-I%s"'%os.path.abspath(
self.GetCTRoot().GetIECLibPath())
return ([(Gen_PyCfile_path, matiec_flags)],
"",
True,
("runtime_%s.py"%location_str, file(runtimefile_path,"rb")))