diff -r af3399aca7b7 -r 8ef035de86de connectors/LPC/LPCProto.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/connectors/LPC/LPCProto.py Tue Dec 01 13:48:47 2009 +0100 @@ -0,0 +1,88 @@ +import serial +from threading import Lock + +LPC_CMDS=dict(IDLE = 0x00, + START = 0x01, + STOP = 0x02, + SET_TRACE_VARIABLE = 0x04, + GET_TRACE_VARIABLES = 0x05, + SET_FORCED_VARIABLE = 0x06, + GET_PLCID = 0x07) + +WAIT_DATA = 0x04 + +LPC_STATUS=dict(STARTED = 0x01, + STOPPED = 0x02, + DEBUG = 0x03) + +class LPCError(exceptions.Exception): + """Exception class""" + def __init__(self, msg): + self.msg = msg + return + + def __str__(self): + return "LPC communication error ! " + str(self.msg) + +class LPCProto: + def __init__(self, port, rate, timeout): + # open serial port + self.serialPort = serial.Serial( port, rate, timeout = timeout ) + self.serialPort.flush() + # handshake + self.HandleTransaction(LPCTransaction("IDLE")) + # serialize access lock + self.TransactionLock = Lock() + + def HandleTransaction(self, transaction): + self.TransactionLock.acquire() + try: + transaction.SetPseudoFile(self.serialPort) + # send command, wait ack (timeout) + transaction.SendCommand() + current_plc_status = transaction.GetCommandAck() + if current_plc_status is not None: + res = transaction.ExchangeData() + else: + raise LPCError("LPC transaction error - controller did not answer as expected") + finally: + self.TransactionLock.release() + return current_plc_status, res + +class LPCTransaction: + def __init__(self, command, optdata): + self.Command = LPC_CMDS[command] + self.OptData = optdata[:] + self.serialPort = None + + def SetPseudoFile(pseudofile): + self.pseudofile = pseudofile + + def SendCommand(self): + # send command thread + self.pseudofile.write(chr(self.Command)) + + def GetCommandAck(self): + comm_status, current_plc_status = map(ord, self.pseudofile.read(2)) + # LPC returns command itself as an ack for command + if(comm_status == self.Command): + return current_plc_status + return None + + def ExchangeData(self): + if self.Command & WAIT_DATA : + length = len(self.OptData) + # transform length into a byte string + # we presuppose endianess of LPC same as PC + lengthstr = ctypes.string_at(ctypes.pointer(ctypes.c_int(length)),4) + self.pseudofile.write(lengthstr + self.OptData) + + lengthstr = self.pseudofile.read(4) + # transform a byte string into length + length = ctypes.cast(ctypes.c_char_p(lengthstr), ctypes.POINTER(ctypes.c_int)).contents.value + return self.pseudofile.read(length) + return None + +if __name__ == "__main__": + TestConnection = LPCProto() + \ No newline at end of file