--- a/connectors/LPC/LPCObject.py Thu Jun 03 12:56:21 2010 +0200
+++ b/connectors/LPC/LPCObject.py Thu Jun 03 12:57:28 2010 +0200
@@ -22,187 +22,51 @@
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import ctypes, os, commands, types, sys
from LPCProto import *
class LPCObject():
- def __init__(self,pluginsroot, location):
+ def __init__(self, pluginsroot, comportstr):
self.PLCStatus = "Disconnected"
self.pluginsroot = pluginsroot
self.PLCprint = pluginsroot.logger.write
self._Idxs = []
- self.UpdateLocation(location)
+ comport = int(comportstr[3:comportstr.index(':')]) - 1
+ try:
+ self.connect(comportstr)
+ except Exception,e:
+ self.pluginsroot.logger.write_error(str(e)+"\n")
+ self.SerialConnection = None
+ self.PLCStatus = "Disconnected"
- def UpdateLocation(self, location):
- # Is that a comport ?
- if len(location) == 5 and\
- location.startswith("COM") and \
- location[3].isdigit() and \
- location[4]==":" :
- self.StorageConnection = None
- try:
- comport = int(location[3]) - 1
- self.SerialConnection = LPCProto(comport,#number
- 115200, #speed
- 2) #timeout
- # This will update status
- self.HandleSerialTransaction(IDLETransaction())
- except Exception,e:
- self.pluginsroot.logger.write_error(str(e)+"\n")
- self.SerialConnection = None
- self.PLCStatus = "Disconnected"
- # or a drive unit ?
- elif len(location)==2 and \
- location[0].isalpha() and \
- location[1] == ':' :
- self.SerialConnection = None
- if os.path.exists(location):
- self.StorageConnection = location
- self.PLCStatus = "Stopped"
- else:
- self.pluginsroot.logger.write_error("Drive "+
- location+
- " do not exist !\n")
- self.StorageConnection = None
- self.PLCStatus = "Disconnected"
-
def HandleSerialTransaction(self, transaction):
if self.SerialConnection is not None:
try:
self.PLCStatus, res = self.SerialConnection.HandleTransaction(transaction)
return res
- except LPCError,e:
+ except LPCProtoError,e:
self.pluginsroot.logger.write_error(str(e)+"\n")
self.SerialConnection = None
self.PLCStatus = "Disconnected"
return None
-
+
def StartPLC(self, debug=False):
- PLCprint("StartPLC")
- self.HandleSerialTransaction(STARTTransaction())
+ raise LPCProtoError("Not implemented")
def StopPLC(self):
- PLCprint("StopPLC")
- self.HandleSerialTransaction(STOPTransaction())
-
- def ForceReload(self):
- pass
+ raise LPCProtoError("Not implemented")
def GetPLCstatus(self):
- self.HandleSerialTransaction(IDLETransaction())
- return self.PLCStatus
+ raise LPCProtoError("Not implemented")
def NewPLC(self, md5sum, data, extrafiles):
- if os.path.exists(self.StorageConnection):
- firmwarepath = os.path.join(
- self.StorageConnection,
- "firmware.bin")
- try:
- if os.path.exists(firmwarepath ):
- os.unlink(firmwarepath)
- f = open(firmwarepath, "wb")
- f.write(data)
- f.close()
- return True
- except LPCError,e:
- self.StorageConnection = None
- self.PLCStatus = "Disconnected"
- self.pluginsroot.logger.write_error(
- "LPC transfer error : "+
- str(e)+"\n")
+ raise LPCProtoError("Not implemented")
def MatchMD5(self, MD5):
- data = self.HandleSerialTransaction(GET_PLCIDTransaction())
- print "PLCINFO",data[32:]
- return data[:32] == MD5
-
- class IEC_STRING(ctypes.Structure):
- """
- Must be changed according to changes in iec_types.h
- """
- _fields_ = [("len", ctypes.c_uint8),
- ("body", ctypes.c_char * 126)]
-
- TypeTranslator = {"BOOL" : (ctypes.c_uint8, lambda x:x.value!=0, lambda t,x:t(x)),
- "STEP" : (ctypes.c_uint8, lambda x:x.value, lambda t,x:t(x)),
- "TRANSITION" : (ctypes.c_uint8, lambda x:x.value, lambda t,x:t(x)),
- "ACTION" : (ctypes.c_uint8, lambda x:x.value, lambda t,x:t(x)),
- "SINT" : (ctypes.c_int8, lambda x:x.value, lambda t,x:t(x)),
- "USINT" : (ctypes.c_uint8, lambda x:x.value, lambda t,x:t(x)),
- "BYTE" : (ctypes.c_uint8, lambda x:x.value, lambda t,x:t(x)),
- "STRING" : (IEC_STRING, lambda x:x.body[:x.len], lambda t,x:t(len(x),x)),
- "INT" : (ctypes.c_int16, lambda x:x.value, lambda t,x:t(x)),
- "UINT" : (ctypes.c_uint16, lambda x:x.value, lambda t,x:t(x)),
- "WORD" : (ctypes.c_uint16, lambda x:x.value, lambda t,x:t(x)),
- "WSTRING" : (None, None, None),#TODO
- "DINT" : (ctypes.c_int32, lambda x:x.value, lambda t,x:t(x)),
- "UDINT" : (ctypes.c_uint32, lambda x:x.value, lambda t,x:t(x)),
- "DWORD" : (ctypes.c_uint32, lambda x:x.value, lambda t,x:t(x)),
- "LINT" : (ctypes.c_int64, lambda x:x.value, lambda t,x:t(x)),
- "ULINT" : (ctypes.c_uint64, lambda x:x.value, lambda t,x:t(x)),
- "LWORD" : (ctypes.c_uint64, lambda x:x.value, lambda t,x:t(x)),
- "REAL" : (ctypes.c_float, lambda x:x.value, lambda t,x:t(x)),
- "LREAL" : (ctypes.c_double, lambda x:x.value, lambda t,x:t(x)),
- }
+ raise LPCProtoError("Not implemented")
def SetTraceVariablesList(self, idxs):
- """
- Call ctype imported function to append
- these indexes to registred variables in PLC debugger
- """
- if idxs:
- buff = ""
- # keep a copy of requested idx
- self._Idxs = idxs[:]
- for idx,iectype,force in idxs:
- idxstr = ctypes.string_at(
- ctypes.pointer(
- ctypes.c_uint32(length)),4)
- if force !=None:
- c_type,unpack_func, pack_func = self.TypeTranslator.get(iectype, (None,None,None))
- forcedsizestr = chr(ctypes.sizeof(c_type))
- forcestr = ctypes.string_at(
- ctypes.pointer(
- pack_func(c_type,force)),
- forced_type_size)
- buff += idxstr + forced_type_size_str + forcestr
- else:
- buff += idxstr + chr(0)
- data = self.HandleSerialTransaction(
- SET_TRACE_VARIABLETransaction(buff))
- else:
- self._Idxs = []
+ raise LPCProtoError("Not implemented")
def GetTraceVariables(self):
- """
- Return a list of variables, corresponding to the list of required idx
- """
- offset = 0
- strbuf = self.HandleSerialTransaction(
- GET_TRACE_VARIABLETransaction())
- size = len(strbuf) - 4
- if size > 0 and self.PLCStatus == "Started":
- tick = ctypes.cast(
- ctypes.c_char_p(strbuf[:4]),
- ctypes.POINTER(ctypes.c_int)).contents
- buffer = ctypes.cast(
- ctypes.c_char_p(strbuf[4:]),
- ctypes.c_void_p)
- for idx, iectype, forced in self._Idxs:
- cursor = ctypes.c_void_p(buffer.value + offset)
- c_type,unpack_func, pack_func = self.TypeTranslator.get(iectype, (None,None,None))
- if c_type is not None and offset < size:
- res.append(unpack_func(ctypes.cast(cursor,
- ctypes.POINTER(c_type)).contents))
- offset += ctypes.sizeof(c_type)
- else:
- if c_type is None:
- PLCprint("Debug error - " + iectype + " not supported !")
- if offset >= size:
- PLCprint("Debug error - buffer too small !")
- break
- if offset and offset == size:
- return self.PLCStatus, tick.value, res
- PLCprint("Debug error - wrong buffer unpack !")
- return self.PLCStatus, None, None
+ raise LPCProtoError("Not implemented")