#!/usr/bin/env python
# -*- coding: utf-8 -*-
#This file is part of Beremiz, a Integrated Development Environment for
#programming IEC 61131-3 automates supporting plcopen standard and CanFestival.
#
#Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
#
#See COPYING file for copyrights details.
#
#This library is free software; you can redistribute it and/or
#modify it under the terms of the GNU General Public
#License as published by the Free Software Foundation; either
#version 2.1 of the License, or (at your option) any later version.
#
#This library is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
#General Public License for more details.
#
#You should have received a copy of the GNU General Public
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os, sys, getopt
from threading import Thread
def usage():
print """
Usage of Beremiz PLC execution service :\n
%s {[-n servicename] [-i IP] [-p port] [-x enabletaskbar] [-a autostart]|-h|--help} working_dir
-n - zeroconf service name (default:disabled)
-i - IP address of interface to bind to (default:localhost)
-p - port number default:3000
-h - print this help text and quit
-a - autostart PLC (0:disable 1:enable) (default:0)
-x - enable/disable wxTaskbarIcon (0:disable 1:enable) (default:1)
-t - enable/disable Twisted web interface (0:disable 1:enable) (default:1)
working_dir - directory where are stored PLC files
"""%sys.argv[0]
try:
opts, argv = getopt.getopt(sys.argv[1:], "i:p:n:x:t:a:h")
except getopt.GetoptError, err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
# default values
given_ip = None
port = 3000
servicename = None
autostart = False
enablewx = True
havewx = False
enabletwisted = True
havetwisted = False
for o, a in opts:
if o == "-h":
usage()
sys.exit()
elif o == "-i":
if len(a.split(".")) == 4 or a == "localhost":
given_ip = a
else:
usage()
sys.exit()
elif o == "-p":
# port: port that the service runs on
port = int(a)
elif o == "-n":
servicename = a
elif o == "-x":
enablewx = int(a)
elif o == "-t":
enabletwisted = int(a)
elif o == "-a":
autostart = int(a)
else:
usage()
sys.exit()
if len(argv) > 1:
usage()
sys.exit()
elif len(argv) == 1:
WorkingDir = argv[0]
os.chdir(WorkingDir)
elif len(argv) == 0:
WorkingDir = os.getcwd()
argv=[WorkingDir]
import __builtin__
if __name__ == '__main__':
__builtin__.__dict__['_'] = lambda x: x
if enablewx:
try:
import wx, re
from threading import Thread, currentThread
from types import *
havewx = True
except:
print "Wx unavailable !"
havewx = False
if havewx:
app=wx.App(redirect=False)
# Import module for internationalization
import gettext
CWD = os.path.split(os.path.realpath(__file__))[0]
def Bpath(*args):
return os.path.join(CWD,*args)
# Get folder containing translation files
localedir = os.path.join(CWD,"locale")
# Get the default language
langid = wx.LANGUAGE_DEFAULT
# Define translation domain (name of translation files)
domain = "Beremiz"
# Define locale for wx
loc = __builtin__.__dict__.get('loc', None)
if loc is None:
loc = wx.Locale(langid)
__builtin__.__dict__['loc'] = loc
# Define location for searching translation files
loc.AddCatalogLookupPathPrefix(localedir)
# Define locale domain
loc.AddCatalog(domain)
def unicode_translation(message):
return wx.GetTranslation(message).encode("utf-8")
if __name__ == '__main__':
__builtin__.__dict__['_'] = wx.GetTranslation#unicode_translation
defaulticon = wx.Image(Bpath("images", "brz.png"))
starticon = wx.Image(Bpath("images", "icoplay24.png"))
stopicon = wx.Image(Bpath("images", "icostop24.png"))
class ParamsEntryDialog(wx.TextEntryDialog):
if wx.VERSION < (2, 6, 0):
def Bind(self, event, function, id = None):
if id is not None:
event(self, id, function)
else:
event(self, function)
def __init__(self, parent, message, caption = "Please enter text", defaultValue = "",
style = wx.OK|wx.CANCEL|wx.CENTRE, pos = wx.DefaultPosition):
wx.TextEntryDialog.__init__(self, parent, message, caption, defaultValue, style, pos)
self.Tests = []
if wx.VERSION >= (2, 8, 0):
self.Bind(wx.EVT_BUTTON, self.OnOK, id=self.GetAffirmativeId())
elif wx.VERSION >= (2, 6, 0):
self.Bind(wx.EVT_BUTTON, self.OnOK, id=self.GetSizer().GetItem(3).GetSizer().GetAffirmativeButton().GetId())
else:
self.Bind(wx.EVT_BUTTON, self.OnOK, id=self.GetSizer().GetItem(3).GetSizer().GetChildren()[0].GetSizer().GetChildren()[0].GetWindow().GetId())
def OnOK(self, event):
value = self.GetValue()
texts = {"value" : value}
for function, message in self.Tests:
if not function(value):
message = wx.MessageDialog(self, message%texts, _("Error"), wx.OK|wx.ICON_ERROR)
message.ShowModal()
message.Destroy()
return
self.EndModal(wx.ID_OK)
event.Skip()
def GetValue(self):
return self.GetSizer().GetItem(1).GetWindow().GetValue()
def SetTests(self, tests):
self.Tests = tests
class BeremizTaskBarIcon(wx.TaskBarIcon):
TBMENU_START = wx.NewId()
TBMENU_STOP = wx.NewId()
TBMENU_CHANGE_NAME = wx.NewId()
TBMENU_CHANGE_PORT = wx.NewId()
TBMENU_CHANGE_INTERFACE = wx.NewId()
TBMENU_LIVE_SHELL = wx.NewId()
TBMENU_WXINSPECTOR = wx.NewId()
TBMENU_CHANGE_WD = wx.NewId()
TBMENU_QUIT = wx.NewId()
def __init__(self, pyroserver, level):
wx.TaskBarIcon.__init__(self)
self.pyroserver = pyroserver
# Set the image
self.UpdateIcon(None)
self.level = level
# bind some events
self.Bind(wx.EVT_MENU, self.OnTaskBarStartPLC, id=self.TBMENU_START)
self.Bind(wx.EVT_MENU, self.OnTaskBarStopPLC, id=self.TBMENU_STOP)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeName, id=self.TBMENU_CHANGE_NAME)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeInterface, id=self.TBMENU_CHANGE_INTERFACE)
self.Bind(wx.EVT_MENU, self.OnTaskBarLiveShell, id=self.TBMENU_LIVE_SHELL)
self.Bind(wx.EVT_MENU, self.OnTaskBarWXInspector, id=self.TBMENU_WXINSPECTOR)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangePort, id=self.TBMENU_CHANGE_PORT)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeWorkingDir, id=self.TBMENU_CHANGE_WD)
self.Bind(wx.EVT_MENU, self.OnTaskBarQuit, id=self.TBMENU_QUIT)
def CreatePopupMenu(self):
"""
This method is called by the base class when it needs to popup
the menu for the default EVT_RIGHT_DOWN event. Just create
the menu how you want it and return it from this function,
the base class takes care of the rest.
"""
menu = wx.Menu()
menu.Append(self.TBMENU_START, _("Start PLC"))
menu.Append(self.TBMENU_STOP, _("Stop PLC"))
if self.level==1:
menu.AppendSeparator()
menu.Append(self.TBMENU_CHANGE_NAME, _("Change Name"))
menu.Append(self.TBMENU_CHANGE_INTERFACE, _("Change IP of interface to bind"))
menu.Append(self.TBMENU_CHANGE_PORT, _("Change Port Number"))
menu.Append(self.TBMENU_CHANGE_WD, _("Change working directory"))
menu.AppendSeparator()
menu.Append(self.TBMENU_LIVE_SHELL, _("Launch a live Python shell"))
menu.Append(self.TBMENU_WXINSPECTOR, _("Launch WX GUI inspector"))
menu.AppendSeparator()
menu.Append(self.TBMENU_QUIT, _("Quit"))
return menu
def MakeIcon(self, img):
"""
The various platforms have different requirements for the
icon size...
"""
if "wxMSW" in wx.PlatformInfo:
img = img.Scale(16, 16)
elif "wxGTK" in wx.PlatformInfo:
img = img.Scale(22, 22)
# wxMac can be any size upto 128x128, so leave the source img alone....
icon = wx.IconFromBitmap(img.ConvertToBitmap() )
return icon
def OnTaskBarStartPLC(self, evt):
if self.pyroserver.plcobj is not None:
self.pyroserver.plcobj.StartPLC()
def OnTaskBarStopPLC(self, evt):
if self.pyroserver.plcobj is not None:
Thread(target=self.pyroserver.plcobj.StopPLC).start()
def OnTaskBarChangeInterface(self, evt):
dlg = ParamsEntryDialog(None, _("Enter the IP of the interface to bind"), defaultValue=self.pyroserver.ip_addr)
dlg.SetTests([(re.compile('\d{1,3}(?:\.\d{1,3}){3}$').match, _("IP is not valid!")),
( lambda x :len([x for x in x.split(".") if 0 <= int(x) <= 255]) == 4, _("IP is not valid!"))
])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.ip_addr = dlg.GetValue()
self.pyroserver.Stop()
def OnTaskBarChangePort(self, evt):
dlg = ParamsEntryDialog(None, _("Enter a port number "), defaultValue=str(self.pyroserver.port))
dlg.SetTests([(UnicodeType.isdigit, _("Port number must be an integer!")), (lambda port : 0 <= int(port) <= 65535 , _("Port number must be 0 <= port <= 65535!"))])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.port = int(dlg.GetValue())
self.pyroserver.Stop()
def OnTaskBarChangeWorkingDir(self, evt):
dlg = wx.DirDialog(None, _("Choose a working directory "), self.pyroserver.workdir, wx.DD_NEW_DIR_BUTTON)
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.workdir = dlg.GetPath()
self.pyroserver.Stop()
def OnTaskBarChangeName(self, evt):
dlg = ParamsEntryDialog(None, _("Enter a name "), defaultValue=self.pyroserver.name)
dlg.SetTests([(lambda name : len(name) is not 0 , _("Name must not be null!"))])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.name = dlg.GetValue()
self.pyroserver.Restart()
def _LiveShellLocals(self):
if self.pyroserver.plcobj is not None:
return {"locals":self.pyroserver.plcobj.python_runtime_vars}
else:
return {}
def OnTaskBarLiveShell(self, evt):
from wx import py
frame = py.crust.CrustFrame(**self._LiveShellLocals())
frame.Show()
def OnTaskBarWXInspector(self, evt):
# Activate the widget inspection tool
from wx.lib.inspection import InspectionTool
if not InspectionTool().initialized:
InspectionTool().Init(**self._LiveShellLocals())
wnd = wx.GetApp()
InspectionTool().Show(wnd, True)
def OnTaskBarQuit(self, evt):
if wx.Platform == '__WXMSW__':
Thread(target=self.pyroserver.Quit).start()
self.RemoveIcon()
wx.CallAfter(wx.GetApp().ExitMainLoop)
def UpdateIcon(self, plcstatus):
if plcstatus is "Started" :
currenticon = self.MakeIcon(starticon)
elif plcstatus is "Stopped":
currenticon = self.MakeIcon(stopicon)
else:
currenticon = self.MakeIcon(defaulticon)
self.SetIcon(currenticon, "Beremiz Service")
from runtime import PLCObject, PLCprint, ServicePublisher
import Pyro.core as pyro
if not os.path.isdir(WorkingDir):
os.mkdir(WorkingDir)
def default_evaluator(tocall, *args, **kwargs):
try:
res=(tocall(*args,**kwargs), None)
except Exception:
res=(None, sys.exc_info())
return res
class Server():
def __init__(self, servicename, ip_addr, port, workdir, argv, autostart=False, statuschange=None, evaluator=default_evaluator, website=None):
self.continueloop = True
self.daemon = None
self.servicename = servicename
self.ip_addr = ip_addr
self.port = port
self.workdir = workdir
self.argv = argv
self.plcobj = None
self.servicepublisher = None
self.autostart = autostart
self.statuschange = statuschange
self.evaluator = evaluator
self.website = website
def Loop(self):
while self.continueloop:
self.Start()
def Restart(self):
self.Stop()
def Quit(self):
self.continueloop = False
if self.plcobj is not None:
self.plcobj.UnLoadPLC()
self.Stop()
def Start(self):
pyro.initServer()
self.daemon=pyro.Daemon(host=self.ip_addr, port=self.port)
self.plcobj = PLCObject(self.workdir, self.daemon, self.argv, self.statuschange, self.evaluator, self.website)
uri = self.daemon.connect(self.plcobj,"PLCObject")
print "Pyro port :",self.port
print "Pyro object's uri :",uri
print "Current working directory :",self.workdir
# Configure and publish service
# Not publish service if localhost in address params
if (self.servicename is not None and
self.ip_addr is not None and
self.ip_addr != "localhost" and
self.ip_addr != "127.0.0.1"):
print "Publishing service on local network"
self.servicepublisher = ServicePublisher.ServicePublisher()
self.servicepublisher.RegisterService(self.servicename, self.ip_addr, self.port)
if self.autostart and self.plcobj.GetPLCstatus()[0] != "Empty":
self.plcobj.StartPLC()
sys.stdout.flush()
self.daemon.requestLoop()
def Stop(self):
if self.plcobj is not None:
self.plcobj.StopPLC()
if self.servicepublisher is not None:
self.servicepublisher.UnRegisterService()
self.servicepublisher = None
self.daemon.shutdown(True)
if enabletwisted:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
from threading import Thread, currentThread
if havewx:
from twisted.internet import wxreactor
wxreactor.install()
from twisted.internet import reactor, task
from twisted.python import log, util
from nevow import rend, appserver, inevow, tags, loaders, athena
from nevow.page import renderer
havetwisted = True
except:
print "Twisted unavailable !"
havetwisted = False
if havetwisted:
xhtml_header = '''<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
'''
class PLCHMI(athena.LiveElement):
initialised = False
def HMIinitialised(self, result):
self.initialised = True
def HMIinitialisation(self):
self.HMIinitialised(None)
class DefaultPLCStartedHMI(PLCHMI):
docFactory = loaders.stan(tags.div(render=tags.directive('liveElement'))[
tags.h1["PLC IS NOW STARTED"],
])
class PLCStoppedHMI(PLCHMI):
docFactory = loaders.stan(tags.div(render=tags.directive('liveElement'))[
tags.h1["PLC IS STOPPED"],
])
class MainPage(athena.LiveElement):
jsClass = u"WebInterface.PLC"
docFactory = loaders.stan(tags.div(render=tags.directive('liveElement'))[
tags.div(id='content')[
tags.div(render = tags.directive('PLCElement')),
]])
def __init__(self, *a, **kw):
athena.LiveElement.__init__(self, *a, **kw)
self.pcl_state = False
self.HMI = None
self.resetPLCStartedHMI()
def setPLCState(self, state):
self.pcl_state = state
if self.HMI is not None:
self.callRemote('updateHMI')
def setPLCStartedHMI(self, hmi):
self.PLCStartedHMIClass = hmi
def resetPLCStartedHMI(self):
self.PLCStartedHMIClass = DefaultPLCStartedHMI
def getHMI(self):
return self.HMI
def HMIexec(self, function, *args, **kwargs):
if self.HMI is not None:
getattr(self.HMI, function, lambda:None)(*args, **kwargs)
athena.expose(HMIexec)
def resetHMI(self):
self.HMI = None
def PLCElement(self, ctx, data):
return self.getPLCElement()
renderer(PLCElement)
def getPLCElement(self):
self.detachFragmentChildren()
if self.pcl_state:
f = self.PLCStartedHMIClass()
else:
f = PLCStoppedHMI()
f.setFragmentParent(self)
self.HMI = f
return f
athena.expose(getPLCElement)
def detachFragmentChildren(self):
for child in self.liveFragmentChildren[:]:
child.detach()
class WebInterface(athena.LivePage):
docFactory = loaders.stan([tags.raw(xhtml_header),
tags.html(xmlns="http://www.w3.org/1999/xhtml")[
tags.head(render=tags.directive('liveglue')),
tags.body[
tags.div[
tags.div( render = tags.directive( "MainPage" ))
]]]])
MainPage = MainPage()
PLCHMI = PLCHMI
def __init__(self, plcState=False, *a, **kw):
super(WebInterface, self).__init__(*a, **kw)
self.jsModules.mapping[u'WebInterface'] = util.sibpath(__file__, os.path.join('runtime', 'webinterface.js'))
self.plcState = plcState
self.MainPage.setPLCState(plcState)
def getHMI(self):
return self.MainPage.getHMI()
def LoadHMI(self, hmi, jsmodules):
for name, path in jsmodules.iteritems():
self.jsModules.mapping[name] = os.path.join(WorkingDir, path)
self.MainPage.setPLCStartedHMI(hmi)
def UnLoadHMI(self):
self.MainPage.resetPLCStartedHMI()
def PLCStarted(self):
self.plcState = True
self.MainPage.setPLCState(True)
def PLCStopped(self):
self.plcState = False
self.MainPage.setPLCState(False)
def renderHTTP(self, ctx):
"""
Force content type to fit with SVG
"""
req = inevow.IRequest(ctx)
req.setHeader('Content-type', 'application/xhtml+xml')
return super(WebInterface, self).renderHTTP(ctx)
def render_MainPage(self, ctx, data):
f = self.MainPage
f.setFragmentParent(self)
return ctx.tag[f]
def child_(self, ctx):
self.MainPage.detachFragmentChildren()
return WebInterface(plcState=self.plcState)
def beforeRender(self, ctx):
d = self.notifyOnDisconnect()
d.addErrback(self.disconnected)
def disconnected(self, reason):
self.MainPage.resetHMI()
#print reason
#print "We will be called back when the client disconnects"
if havewx:
reactor.registerWxApp(app)
website = WebInterface()
site = appserver.NevowSite(website)
website_port = 8009
listening = False
while not listening:
try:
reactor.listenTCP(website_port, site)
listening = True
except:
website_port += 1
print "Http interface port :",website_port
else:
website = None
if havewx:
from threading import Semaphore
wx_eval_lock = Semaphore(0)
main_thread = currentThread()
def statuschange(status):
wx.CallAfter(taskbar_instance.UpdateIcon,status)
def wx_evaluator(obj, *args, **kwargs):
tocall,args,kwargs = obj.call
obj.res = default_evaluator(tocall, *args, **kwargs)
wx_eval_lock.release()
def evaluator(tocall, *args, **kwargs):
global main_thread
if(main_thread == currentThread()):
# avoid dead lock if called from the wx mainloop
return default_evaluator(tocall, *args, **kwargs)
else:
o=type('',(object,),dict(call=(tocall, args, kwargs), res=None))
wx.CallAfter(wx_evaluator,o)
wx_eval_lock.acquire()
return o.res
pyroserver = Server(servicename, given_ip, port, WorkingDir, argv, autostart, statuschange, evaluator, website)
taskbar_instance = BeremizTaskBarIcon(pyroserver, enablewx)
else:
pyroserver = Server(servicename, given_ip, port, WorkingDir, argv, autostart, website=website)
# Exception hooks s
import threading, traceback
def LogException(*exp):
if pyroserver.plcobj is not None:
pyroserver.plcobj.LogMessage(0,'\n'.join(traceback.format_exception(*exp)))
else:
traceback.print_exception(*exp)
sys.excepthook = LogException
def installThreadExcepthook():
init_old = threading.Thread.__init__
def init(self, *args, **kwargs):
init_old(self, *args, **kwargs)
run_old = self.run
def run_with_except_hook(*args, **kw):
try:
run_old(*args, **kw)
except (KeyboardInterrupt, SystemExit):
raise
except:
sys.excepthook(*sys.exc_info())
self.run = run_with_except_hook
threading.Thread.__init__ = init
installThreadExcepthook()
if havetwisted or havewx:
pyro_thread=Thread(target=pyroserver.Loop)
pyro_thread.start()
if havetwisted:
reactor.run()
elif havewx:
app.MainLoop()
else:
try :
pyroserver.Loop()
except KeyboardInterrupt,e:
pass
pyroserver.Quit()
sys.exit(0)