ed@448: #!/usr/bin/env python ed@448: # -*- coding: utf-8 -*- ed@448: ed@448: #This file is part of Beremiz, a Integrated Development Environment for ed@448: #programming IEC 61131-3 automates supporting plcopen standard and CanFestival. ed@448: # ed@448: #Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD ed@448: # ed@448: #See COPYING file for copyrights details. ed@448: # ed@448: #This library is free software; you can redistribute it and/or ed@448: #modify it under the terms of the GNU General Public ed@448: #License as published by the Free Software Foundation; either ed@448: #version 2.1 of the License, or (at your option) any later version. ed@448: # ed@448: #This library is distributed in the hope that it will be useful, ed@448: #but WITHOUT ANY WARRANTY; without even the implied warranty of ed@448: #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ed@448: #General Public License for more details. ed@448: # ed@448: #You should have received a copy of the GNU General Public ed@448: #License along with this library; if not, write to the Free Software ed@448: #Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ed@448: ed@448: from threading import Timer, Thread, Lock ed@448: import ctypes, os, commands, types, sys ed@448: import traceback ed@448: ed@448: class LPCObject(): ed@448: _Idxs = [] ed@448: def __init__(self,pluginsroot): ed@448: self.PLCStatus = "Stopped" ed@448: self.pluginsroot = pluginsroot ed@448: self.PLCprint = pluginsroot.logger.write ed@448: ed@448: def StartPLC(self, debug=False): ed@448: PLCprint("StartPLC") ed@448: if self.CurrentPLCFilename is not None: ed@448: self.PLCStatus = "Started" ed@448: self.PythonThread = Thread(target=self.PythonThreadProc, args=[debug]) ed@448: self.PythonThread.start() ed@448: ed@448: def StopPLC(self): ed@448: PLCprint("StopPLC") ed@448: if self.PLCStatus == "Started": ed@448: self._stopPLC() ed@448: return True ed@448: return False ed@448: ed@448: def ForceReload(self): ed@448: # respawn python interpreter ed@448: Timer(0.1,self._Reload).start() ed@448: return True ed@448: ed@448: def GetPLCstatus(self): ed@448: return self.PLCStatus ed@448: ed@448: def NewPLC(self, md5sum, data, extrafiles): ed@448: PLCprint("NewPLC (%s)"%md5sum) ed@448: if self.PLCStatus in ["Stopped", "Empty", "Broken"]: ed@448: NewFileName = md5sum + lib_ext ed@448: extra_files_log = os.path.join(self.workingdir,"extra_files.txt") ed@448: try: ed@448: os.remove(os.path.join(self.workingdir, ed@448: self.CurrentPLCFilename)) ed@448: for filename in file(extra_files_log, "r").readlines() + [extra_files_log]: ed@448: try: ed@448: os.remove(os.path.join(self.workingdir, filename.strip())) ed@448: except: ed@448: pass ed@448: except: ed@448: pass ed@448: ed@448: try: ed@448: # Create new PLC file ed@448: open(os.path.join(self.workingdir,NewFileName), ed@448: 'wb').write(data) ed@448: ed@448: # Store new PLC filename based on md5 key ed@448: open(self._GetMD5FileName(), "w").write(md5sum) ed@448: ed@448: # Then write the files ed@448: log = file(extra_files_log, "w") ed@448: for fname,fdata in extrafiles: ed@448: fpath = os.path.join(self.workingdir,fname) ed@448: open(fpath, "wb").write(fdata) ed@448: log.write(fname+'\n') ed@448: ed@448: # Store new PLC filename ed@448: self.CurrentPLCFilename = NewFileName ed@448: except: ed@448: PLCprint(traceback.format_exc()) ed@448: return False ed@448: if self.PLCStatus == "Empty": ed@448: self.PLCStatus = "Stopped" ed@448: return True ed@448: return False ed@448: ed@448: def MatchMD5(self, MD5): ed@448: try: ed@448: last_md5 = open(self._GetMD5FileName(), "r").read() ed@448: return last_md5 == MD5 ed@448: except: ed@448: return False ed@448: ed@448: def SetTraceVariablesList(self, idxs): ed@448: """ ed@448: Call ctype imported function to append ed@448: these indexes to registred variables in PLC debugger ed@448: """ ed@448: self._suspendDebug() ed@448: # keep a copy of requested idx ed@448: self._Idxs = idxs[:] ed@448: self._ResetDebugVariables() ed@448: for idx in idxs: ed@448: self._RegisterDebugVariable(idx) ed@448: self._resumeDebug() ed@448: ed@448: class IEC_STRING(ctypes.Structure): ed@448: """ ed@448: Must be changed according to changes in iec_types.h ed@448: """ ed@448: _fields_ = [("len", ctypes.c_uint8), ed@448: ("body", ctypes.c_char * 127)] ed@448: ed@448: TypeTranslator = {"BOOL" : (ctypes.c_uint8, lambda x:x.value!=0), ed@448: "STEP" : (ctypes.c_uint8, lambda x:x.value), ed@448: "TRANSITION" : (ctypes.c_uint8, lambda x:x.value), ed@448: "ACTION" : (ctypes.c_uint8, lambda x:x.value), ed@448: "SINT" : (ctypes.c_int8, lambda x:x.value), ed@448: "USINT" : (ctypes.c_uint8, lambda x:x.value), ed@448: "BYTE" : (ctypes.c_uint8, lambda x:x.value), ed@448: "STRING" : (IEC_STRING, lambda x:x.body[:x.len]), ed@448: "INT" : (ctypes.c_int16, lambda x:x.value), ed@448: "UINT" : (ctypes.c_uint16, lambda x:x.value), ed@448: "WORD" : (ctypes.c_uint16, lambda x:x.value), ed@448: "WSTRING" : (None, None),#TODO ed@448: "DINT" : (ctypes.c_int32, lambda x:x.value), ed@448: "UDINT" : (ctypes.c_uint32, lambda x:x.value), ed@448: "DWORD" : (ctypes.c_uint32, lambda x:x.value), ed@448: "LINT" : (ctypes.c_int64, lambda x:x.value), ed@448: "ULINT" : (ctypes.c_uint64, lambda x:x.value), ed@448: "LWORD" : (ctypes.c_uint64, lambda x:x.value), ed@448: "REAL" : (ctypes.c_float, lambda x:x.value), ed@448: "LREAL" : (ctypes.c_double, lambda x:x.value), ed@448: } ed@448: ed@448: def GetTraceVariables(self): ed@448: """ ed@448: Return a list of variables, corresponding to the list of required idx ed@448: """ ed@448: if self.PLCStatus == "Started": ed@448: self.PLClibraryLock.acquire() ed@448: tick = self._WaitDebugData() ed@448: #PLCprint("Debug tick : %d"%tick) ed@448: if tick == 2**32 - 1: ed@448: tick = -1 ed@448: res = None ed@448: else: ed@448: idx = ctypes.c_int() ed@448: typename = ctypes.c_char_p() ed@448: res = [] ed@448: ed@448: for given_idx in self._Idxs: ed@448: buffer=self._IterDebugData(ctypes.byref(idx), ctypes.byref(typename)) ed@448: c_type,unpack_func = self.TypeTranslator.get(typename.value, (None,None)) ed@448: if c_type is not None and given_idx == idx.value: ed@448: res.append(unpack_func(ctypes.cast(buffer, ed@448: ctypes.POINTER(c_type)).contents)) ed@448: else: ed@448: PLCprint("Debug error idx : %d, expected_idx %d, type : %s"%(idx.value, given_idx,typename.value)) ed@448: res.append(None) ed@448: self._FreeDebugData() ed@448: self.PLClibraryLock.release() ed@448: return tick, res ed@448: return -1, None ed@448: