ed@448: import serial ed@448: from threading import Lock ed@448: ed@448: LPC_CMDS=dict(IDLE = 0x00, ed@448: START = 0x01, ed@448: STOP = 0x02, ed@448: SET_TRACE_VARIABLE = 0x04, ed@448: GET_TRACE_VARIABLES = 0x05, ed@448: SET_FORCED_VARIABLE = 0x06, ed@448: GET_PLCID = 0x07) ed@448: ed@448: WAIT_DATA = 0x04 ed@448: ed@448: LPC_STATUS=dict(STARTED = 0x01, ed@448: STOPPED = 0x02, ed@448: DEBUG = 0x03) ed@448: ed@448: class LPCError(exceptions.Exception): ed@448: """Exception class""" ed@448: def __init__(self, msg): ed@448: self.msg = msg ed@448: return ed@448: ed@448: def __str__(self): ed@448: return "LPC communication error ! " + str(self.msg) ed@448: ed@448: class LPCProto: ed@448: def __init__(self, port, rate, timeout): ed@448: # open serial port ed@448: self.serialPort = serial.Serial( port, rate, timeout = timeout ) ed@448: self.serialPort.flush() ed@448: # handshake ed@448: self.HandleTransaction(LPCTransaction("IDLE")) ed@448: # serialize access lock ed@448: self.TransactionLock = Lock() ed@448: ed@448: def HandleTransaction(self, transaction): ed@448: self.TransactionLock.acquire() ed@448: try: ed@448: transaction.SetPseudoFile(self.serialPort) ed@448: # send command, wait ack (timeout) ed@448: transaction.SendCommand() ed@448: current_plc_status = transaction.GetCommandAck() ed@448: if current_plc_status is not None: ed@448: res = transaction.ExchangeData() ed@448: else: ed@448: raise LPCError("LPC transaction error - controller did not answer as expected") ed@448: finally: ed@448: self.TransactionLock.release() ed@448: return current_plc_status, res ed@448: ed@448: class LPCTransaction: ed@448: def __init__(self, command, optdata): ed@448: self.Command = LPC_CMDS[command] ed@448: self.OptData = optdata[:] ed@448: self.serialPort = None ed@448: ed@448: def SetPseudoFile(pseudofile): ed@448: self.pseudofile = pseudofile ed@448: ed@448: def SendCommand(self): ed@448: # send command thread ed@448: self.pseudofile.write(chr(self.Command)) ed@448: ed@448: def GetCommandAck(self): ed@448: comm_status, current_plc_status = map(ord, self.pseudofile.read(2)) ed@448: # LPC returns command itself as an ack for command ed@448: if(comm_status == self.Command): ed@448: return current_plc_status ed@448: return None ed@448: ed@448: def ExchangeData(self): ed@448: if self.Command & WAIT_DATA : ed@448: length = len(self.OptData) ed@448: # transform length into a byte string ed@448: # we presuppose endianess of LPC same as PC ed@448: lengthstr = ctypes.string_at(ctypes.pointer(ctypes.c_int(length)),4) ed@448: self.pseudofile.write(lengthstr + self.OptData) ed@448: ed@448: lengthstr = self.pseudofile.read(4) ed@448: # transform a byte string into length ed@448: length = ctypes.cast(ctypes.c_char_p(lengthstr), ctypes.POINTER(ctypes.c_int)).contents.value ed@448: return self.pseudofile.read(length) ed@448: return None ed@448: ed@448: if __name__ == "__main__": ed@448: TestConnection = LPCProto() ed@448: