#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of Beremiz, a Integrated Development Environment for
# programming IEC 61131-3 automates supporting plcopen standard and CanFestival.
#
# Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
# Copyright (C) 2017: Andrey Skvortsov
#
# See COPYING file for copyrights details.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os
import sys
import getopt
import threading
import shlex
import traceback
import threading
from threading import Thread, Semaphore, Lock, current_thread
import builtins
from functools import partial
import runtime
from runtime.PyroServer import PyroServer
from runtime.xenomai import TryPreloadXenomai
from runtime import LogMessageAndException
from runtime import PlcStatus
from runtime import default_evaluator
from runtime.Stunnel import ensurePSK
import util.paths as paths
# In case system time is ajusted, it is better to use
# monotonic timers for timers and other timeout based operations.
# hot-patch threading module to force using monitonic time for all
# Thread/Timer/Event/Condition
from runtime.monotonic_time import monotonic
threading._time = monotonic
try:
from runtime.spawn_subprocess import Popen
except ImportError:
from subprocess import Popen
def version():
from version import app_version
print("Beremiz_service: ", app_version)
def usage():
version()
print("""
Usage of Beremiz PLC execution service :\n
%s {[-n servicename] [-i IP] [-p port] [-x enabletaskbar] [-a autostart]|-h|--help} working_dir
-n service name (default:None, zeroconf discovery disabled)
-i IP address of interface to bind to (default:localhost)
-p port number default:3000
-h print this help text and quit
-a autostart PLC (0:disable 1:enable) (default:0)
-x enable/disable wxTaskbarIcon (0:disable 1:enable) (default:1)
-t enable/disable Twisted web interface (0:disable 1:enable) (default:1)
-w web server port or "off" to disable web server (default:8009)
-c WAMP client config file (can be overriden by wampconf.json in project)
-s PSK secret path (default:PSK disabled)
-e python extension (absolute path .py)
working_dir - directory where are stored PLC files
""" % sys.argv[0])
try:
opts, argv = getopt.getopt(sys.argv[1:], "i:p:n:x:t:a:w:c:e:s:h", ["help", "version", "status-change=", "on-plc-start=", "on-plc-stop="])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
usage()
sys.exit(2)
# default values
interface = ''
port = 3000
webport = 8009
PSKpath = None
wampconf = None
servicename = None
autostart = False
enablewx = True
havewx = False
enabletwisted = True
havetwisted = False
extensions = []
statuschange = []
def status_change_call_factory(wanted, args):
def status_change_call(status):
if wanted is None or status is wanted:
cmd = shlex.split(args.format(status))
Popen(cmd)
return status_change_call
for o, a in opts:
if o == "-h" or o == "--help":
usage()
sys.exit()
if o == "--version":
version()
sys.exit()
if o == "--on-plc-start":
statuschange.append(status_change_call_factory(PlcStatus.Started, a))
elif o == "--on-plc-stop":
statuschange.append(status_change_call_factory(PlcStatus.Stopped, a))
elif o == "--status-change":
statuschange.append(status_change_call_factory(None, a))
elif o == "-i":
if len(a.split(".")) == 4:
interface = a
elif a == "localhost":
interface = '127.0.0.1'
else:
usage()
sys.exit()
elif o == "-p":
# port: port that the service runs on
port = int(a)
elif o == "-n":
servicename = a
elif o == "-x":
enablewx = int(a)
elif o == "-t":
enabletwisted = int(a)
elif o == "-a":
autostart = int(a)
elif o == "-w":
webport = None if a == "off" else int(a)
elif o == "-c":
wampconf = None if a == "off" else a
elif o == "-s":
PSKpath = None if a == "off" else a
elif o == "-e":
fnameanddirname = list(os.path.split(os.path.realpath(a)))
fnameanddirname.reverse()
extensions.append(fnameanddirname)
else:
usage()
sys.exit()
beremiz_dir = paths.AbsDir(__file__)
if len(argv) > 1:
usage()
sys.exit()
elif len(argv) == 1:
WorkingDir = argv[0]
os.chdir(WorkingDir)
elif len(argv) == 0:
WorkingDir = os.getcwd()
argv = [WorkingDir]
builtins.__dict__['_'] = lambda x: x
# TODO: add a cmdline parameter if Trying Preloading Xenomai makes problem
TryPreloadXenomai()
version()
def Bpath(*args):
return os.path.join(beremiz_dir, *args)
import locale
# Matiec's standard library relies on libC's locale-dependent
# string to/from number convertions, but IEC-61131 counts
# on '.' for decimal point. Therefore locale is reset to "C" */
locale.setlocale(locale.LC_NUMERIC, "C")
def SetupI18n():
# Get folder containing translation files
localedir = os.path.join(beremiz_dir, "locale")
# Get the default language
langid = wx.LANGUAGE_DEFAULT
# Define translation domain (name of translation files)
domain = "Beremiz"
# Define locale for wx
loc = builtins.__dict__.get('loc', None)
if loc is None:
wx.LogGui.EnableLogging(False)
loc = wx.Locale(langid)
wx.LogGui.EnableLogging(True)
builtins.__dict__['loc'] = loc
# Define location for searching translation files
loc.AddCatalogLookupPathPrefix(localedir)
# Define locale domain
loc.AddCatalog(domain)
builtins.__dict__['_'] = wx.GetTranslation
# Life is hard... have a candy.
# pylint: disable=wrong-import-position,wrong-import-order
if enablewx:
try:
import wx
havewx = True
except ImportError:
print("Wx unavailable !")
havewx = False
if havewx:
import re
import wx.adv
app = wx.App(redirect=False)
app.SetTopWindow(wx.Frame(None, -1))
default_locale = None
SetupI18n()
defaulticon = wx.Image(Bpath("images", "brz.png"))
starticon = wx.Image(Bpath("images", "icoplay24.png"))
stopicon = wx.Image(Bpath("images", "icostop24.png"))
class ParamsEntryDialog(wx.TextEntryDialog):
def __init__(self, parent, message, caption=_("Please enter text"), defaultValue="",
style=wx.OK | wx.CANCEL | wx.CENTRE, pos=wx.DefaultPosition):
wx.TextEntryDialog.__init__(self, parent, message, caption, defaultValue, style, pos)
self.Tests = []
self.Bind(wx.EVT_BUTTON, self.OnOK, id=self.GetAffirmativeId())
def OnOK(self, event):
value = self.GetValue()
texts = {"value": value}
for function, message in self.Tests:
if not function(value):
message = wx.MessageDialog(self, message % texts, _("Error"), wx.OK | wx.ICON_ERROR)
message.ShowModal()
message.Destroy()
return
self.EndModal(wx.ID_OK)
event.Skip()
def GetValue(self):
return self.GetSizer().GetItem(1).GetWindow().GetValue()
def SetTests(self, tests):
self.Tests = tests
class BeremizTaskBarIcon(wx.adv.TaskBarIcon):
TBMENU_START = wx.NewIdRef()
TBMENU_STOP = wx.NewIdRef()
TBMENU_CHANGE_NAME = wx.NewIdRef()
TBMENU_CHANGE_PORT = wx.NewIdRef()
TBMENU_CHANGE_INTERFACE = wx.NewIdRef()
TBMENU_LIVE_SHELL = wx.NewIdRef()
TBMENU_WXINSPECTOR = wx.NewIdRef()
TBMENU_CHANGE_WD = wx.NewIdRef()
TBMENU_QUIT = wx.NewIdRef()
def __init__(self, pyroserver):
wx.adv.TaskBarIcon.__init__(self)
self.pyroserver = pyroserver
# Set the image
self.UpdateIcon(None)
# bind some events
self.Bind(wx.EVT_MENU, self.OnTaskBarStartPLC, id=self.TBMENU_START)
self.Bind(wx.EVT_MENU, self.OnTaskBarStopPLC, id=self.TBMENU_STOP)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeName, id=self.TBMENU_CHANGE_NAME)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeInterface, id=self.TBMENU_CHANGE_INTERFACE)
self.Bind(wx.EVT_MENU, self.OnTaskBarLiveShell, id=self.TBMENU_LIVE_SHELL)
self.Bind(wx.EVT_MENU, self.OnTaskBarWXInspector, id=self.TBMENU_WXINSPECTOR)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangePort, id=self.TBMENU_CHANGE_PORT)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeWorkingDir, id=self.TBMENU_CHANGE_WD)
self.Bind(wx.EVT_MENU, self.OnTaskBarQuit, id=self.TBMENU_QUIT)
def CreatePopupMenu(self):
"""
This method is called by the base class when it needs to popup
the menu for the default EVT_RIGHT_DOWN event. Just create
the menu how you want it and return it from this function,
the base class takes care of the rest.
"""
menu = wx.Menu()
menu.Append(self.TBMENU_START, _("Start PLC"))
menu.Append(self.TBMENU_STOP, _("Stop PLC"))
menu.AppendSeparator()
menu.Append(self.TBMENU_CHANGE_NAME, _("Change Name"))
menu.Append(self.TBMENU_CHANGE_INTERFACE, _("Change IP of interface to bind"))
menu.Append(self.TBMENU_CHANGE_PORT, _("Change Port Number"))
menu.Append(self.TBMENU_CHANGE_WD, _("Change working directory"))
menu.AppendSeparator()
menu.Append(self.TBMENU_LIVE_SHELL, _("Launch a live Python shell"))
menu.Append(self.TBMENU_WXINSPECTOR, _("Launch WX GUI inspector"))
menu.AppendSeparator()
menu.Append(self.TBMENU_QUIT, _("Quit"))
return menu
def MakeIcon(self, img):
"""
The various platforms have different requirements for the
icon size...
"""
if "wxMSW" in wx.PlatformInfo:
img = img.Scale(16, 16)
elif "wxGTK" in wx.PlatformInfo:
img = img.Scale(22, 22)
# wxMac can be any size upto 128x128, so leave the source img alone....
icon = wx.Icon(img.ConvertToBitmap())
return icon
def OnTaskBarStartPLC(self, evt):
runtime.GetPLCObjectSingleton().StartPLC()
def OnTaskBarStopPLC(self, evt):
runtime.GetPLCObjectSingleton().StopPLC()
def OnTaskBarChangeInterface(self, evt):
ip_addr = self.pyroserver.ip_addr
ip_addr = '' if ip_addr is None else ip_addr
dlg = ParamsEntryDialog(None, _("Enter the IP of the interface to bind"), defaultValue=ip_addr)
dlg.SetTests([(re.compile(r'\d{1,3}(?:\.\d{1,3}){3}$').match, _("IP is not valid!")),
(lambda x:len([x for x in x.split(".") if 0 <= int(x) <= 255]) == 4,
_("IP is not valid!"))])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.ip_addr = dlg.GetValue()
self.pyroserver.Restart()
def OnTaskBarChangePort(self, evt):
dlg = ParamsEntryDialog(None, _("Enter a port number "), defaultValue=str(self.pyroserver.port))
dlg.SetTests([(str.isdigit, _("Port number must be an integer!")), (lambda port: 0 <= int(port) <= 65535, _("Port number must be 0 <= port <= 65535!"))])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.port = int(dlg.GetValue())
self.pyroserver.Restart()
def OnTaskBarChangeWorkingDir(self, evt):
dlg = wx.DirDialog(None, _("Choose a working directory "), self.pyroserver.workdir, wx.DD_NEW_DIR_BUTTON)
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.workdir = dlg.GetPath()
self.pyroserver.Restart()
def OnTaskBarChangeName(self, evt):
_servicename = self.pyroserver.servicename
_servicename = '' if _servicename is None else _servicename
dlg = ParamsEntryDialog(None, _("Enter a name "), defaultValue=_servicename)
dlg.SetTests([(lambda name: len(name) != 0, _("Name must not be null!"))])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.servicename = dlg.GetValue()
self.pyroserver.Restart()
def _LiveShellLocals(self):
return {"locals": runtime.GetPLCObjectSingleton().python_runtime_vars}
def OnTaskBarLiveShell(self, evt):
from wx import py
frame = py.crust.CrustFrame(**self._LiveShellLocals())
frame.Show()
def OnTaskBarWXInspector(self, evt):
# Activate the widget inspection tool
from wx.lib.inspection import InspectionTool
if not InspectionTool().initialized:
InspectionTool().Init(**self._LiveShellLocals())
wnd = wx.GetApp()
InspectionTool().Show(wnd, True)
def OnTaskBarQuit(self, evt):
if wx.Platform == '__WXMSW__':
Thread(target=self.pyroserver.Quit).start()
self.RemoveIcon()
wx.CallAfter(wx.GetApp().ExitMainLoop)
def UpdateIcon(self, plcstatus):
if plcstatus is PlcStatus.Started:
currenticon = self.MakeIcon(starticon)
elif plcstatus is PlcStatus.Stopped:
currenticon = self.MakeIcon(stopicon)
else:
currenticon = self.MakeIcon(defaulticon)
self.SetIcon(currenticon, "Beremiz Service")
if not os.path.isdir(WorkingDir):
os.mkdir(WorkingDir)
if enabletwisted:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
if havewx:
from twisted.internet import wxreactor
wxreactor.install()
from twisted.internet import reactor
reactor.registerWxApp(app)
else:
# from twisted.internet import pollreactor
# pollreactor.install()
from twisted.internet import reactor
havetwisted = True
except ImportError:
print(_("Twisted unavailable."))
havetwisted = False
pyruntimevars = {}
if havewx:
wx_eval_lock = Semaphore(0)
def statuschangeTskBar(status):
wx.CallAfter(taskbar_instance.UpdateIcon, status)
statuschange.append(statuschangeTskBar)
def wx_evaluator(obj, *args, **kwargs):
tocall, args, kwargs = obj.call
obj.res = default_evaluator(tocall, *args, **kwargs)
wx_eval_lock.release()
main_thread_id = current_thread().ident
def evaluator(tocall, *args, **kwargs):
# To prevent deadlocks, check if current thread is not one already main
current_id = current_thread().ident
if main_thread_id != current_id:
o = type('', (object,), dict(call=(tocall, args, kwargs), res=None))
wx.CallAfter(wx_evaluator, o)
wx_eval_lock.acquire()
return o.res
else:
# avoid dead lock if called from main : do job immediately
return default_evaluator(tocall, *args, **kwargs)
else:
evaluator = default_evaluator
# Exception hooks
def LogException(*exp):
LogMessageAndException("", exp)
sys.excepthook = LogException
def installThreadExcepthook():
init_old = threading.Thread.__init__
def init(self, *args, **kwargs):
init_old(self, *args, **kwargs)
run_old = self.run
def run_with_except_hook(*args, **kw):
try:
run_old(*args, **kw)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
sys.excepthook(*sys.exc_info())
self.run = run_with_except_hook
threading.Thread.__init__ = init
installThreadExcepthook()
havewamp = False
if havetwisted:
if webport is not None:
try:
import runtime.NevowServer as NS # pylint: disable=ungrouped-imports
NS.WorkingDir = WorkingDir
except Exception:
LogMessageAndException(_("Nevow/Athena import failed :"))
webport = None
try:
import runtime.WampClient as WC # pylint: disable=ungrouped-imports
WC.WorkingDir = WorkingDir
havewamp = True
except Exception:
LogMessageAndException(_("WAMP import failed :"))
# Load extensions
for extention_file, extension_folder in extensions:
sys.path.append(extension_folder)
exec(compile(open(os.path.join(extension_folder, extention_file), "rb").read(), os.path.join(extension_folder, extention_file), 'exec'), locals())
# Service name is used as an ID for stunnel's PSK
# Some extension may set 'servicename' to a computed ID or Serial Number
# instead of using commandline '-n'
if servicename is not None and PSKpath is not None:
ensurePSK(servicename, PSKpath)
runtime.CreatePLCObjectSingleton(
WorkingDir, argv, statuschange, evaluator, pyruntimevars)
pyroserver = PyroServer(servicename, interface, port)
if havewx:
taskbar_instance = BeremizTaskBarIcon(pyroserver)
if havetwisted:
if webport is not None:
try:
website = NS.RegisterWebsite(interface, webport)
pyruntimevars["website"] = website
except Exception:
LogMessageAndException(_("Nevow Web service failed. "))
if havewamp:
try:
WC.RegisterWampClient(wampconf, PSKpath)
WC.RegisterWebSettings(NS)
except Exception:
LogMessageAndException(_("WAMP client startup failed. "))
pyro_thread = None
def FirstWorkerJob():
"""
RPC through pyro/wamp/UI may lead to delegation to Worker,
then this function ensures that Worker is already
created when pyro starts
"""
global pyro_thread, pyroserver
pyro_thread_started = Lock()
pyro_thread_started.acquire()
pyro_thread = Thread(target=pyroserver.PyroLoop,
kwargs=dict(when_ready=pyro_thread_started.release),
name="PyroThread")
pyro_thread.start()
# Wait for pyro thread to be effective
pyro_thread_started.acquire()
pyroserver.PrintServerInfo()
# Beremiz IDE detects LOCAL:// runtime is ready by looking
# for self.workdir in the daemon's stdout.
if sys.stdout:
sys.stdout.write(_("Current working directory :") + WorkingDir + "\n")
sys.stdout.flush()
runtime.GetPLCObjectSingleton().AutoLoad(autostart)
if havetwisted and havewx:
waker_func = wx.CallAfter
# This orders ui loop to signal when ready on Stdout
waker_func(print,"UI thread started successfully.")
# interleaved worker copes with wxreactor by delegating all asynchronous
# calls to wx's mainloop
runtime.MainWorker.interleave(waker_func, reactor.stop, FirstWorkerJob)
try:
reactor.run(installSignalHandlers=False)
except KeyboardInterrupt:
pass
runtime.MainWorker.stop()
elif havewx:
try:
app.MainLoop
except KeyboardInterrupt:
pass
elif havetwisted:
ui_thread_started = Lock()
ui_thread_started.acquire()
reactor.callLater(0, ui_thread_started.release)
ui_thread = Thread(
target=partial(reactor.run, installSignalHandlers=False),
name="UIThread")
ui_thread.start()
ui_thread_started.acquire()
print("UI thread started successfully.")
try:
# blocking worker loop
runtime.MainWorker.runloop(FirstWorkerJob)
except KeyboardInterrupt:
pass
else:
try:
# blocking worker loop
runtime.MainWorker.runloop(FirstWorkerJob)
except KeyboardInterrupt:
pass
pyroserver.Quit()
pyro_thread.join()
plcobj = runtime.GetPLCObjectSingleton()
try:
plcobj.StopPLC()
plcobj.UnLoadPLC()
except:
print(traceback.format_exc())
if havetwisted:
reactor.stop()
if not havewx:
ui_thread.join()
elif havewx:
app.ExitMainLoop()
sys.exit(0)