|
1 import serial |
|
2 from threading import Lock |
|
3 |
|
4 LPC_CMDS=dict(IDLE = 0x00, |
|
5 START = 0x01, |
|
6 STOP = 0x02, |
|
7 SET_TRACE_VARIABLE = 0x04, |
|
8 GET_TRACE_VARIABLES = 0x05, |
|
9 SET_FORCED_VARIABLE = 0x06, |
|
10 GET_PLCID = 0x07) |
|
11 |
|
12 WAIT_DATA = 0x04 |
|
13 |
|
14 LPC_STATUS=dict(STARTED = 0x01, |
|
15 STOPPED = 0x02, |
|
16 DEBUG = 0x03) |
|
17 |
|
18 class LPCError(exceptions.Exception): |
|
19 """Exception class""" |
|
20 def __init__(self, msg): |
|
21 self.msg = msg |
|
22 return |
|
23 |
|
24 def __str__(self): |
|
25 return "LPC communication error ! " + str(self.msg) |
|
26 |
|
27 class LPCProto: |
|
28 def __init__(self, port, rate, timeout): |
|
29 # open serial port |
|
30 self.serialPort = serial.Serial( port, rate, timeout = timeout ) |
|
31 self.serialPort.flush() |
|
32 # handshake |
|
33 self.HandleTransaction(LPCTransaction("IDLE")) |
|
34 # serialize access lock |
|
35 self.TransactionLock = Lock() |
|
36 |
|
37 def HandleTransaction(self, transaction): |
|
38 self.TransactionLock.acquire() |
|
39 try: |
|
40 transaction.SetPseudoFile(self.serialPort) |
|
41 # send command, wait ack (timeout) |
|
42 transaction.SendCommand() |
|
43 current_plc_status = transaction.GetCommandAck() |
|
44 if current_plc_status is not None: |
|
45 res = transaction.ExchangeData() |
|
46 else: |
|
47 raise LPCError("LPC transaction error - controller did not answer as expected") |
|
48 finally: |
|
49 self.TransactionLock.release() |
|
50 return current_plc_status, res |
|
51 |
|
52 class LPCTransaction: |
|
53 def __init__(self, command, optdata): |
|
54 self.Command = LPC_CMDS[command] |
|
55 self.OptData = optdata[:] |
|
56 self.serialPort = None |
|
57 |
|
58 def SetPseudoFile(pseudofile): |
|
59 self.pseudofile = pseudofile |
|
60 |
|
61 def SendCommand(self): |
|
62 # send command thread |
|
63 self.pseudofile.write(chr(self.Command)) |
|
64 |
|
65 def GetCommandAck(self): |
|
66 comm_status, current_plc_status = map(ord, self.pseudofile.read(2)) |
|
67 # LPC returns command itself as an ack for command |
|
68 if(comm_status == self.Command): |
|
69 return current_plc_status |
|
70 return None |
|
71 |
|
72 def ExchangeData(self): |
|
73 if self.Command & WAIT_DATA : |
|
74 length = len(self.OptData) |
|
75 # transform length into a byte string |
|
76 # we presuppose endianess of LPC same as PC |
|
77 lengthstr = ctypes.string_at(ctypes.pointer(ctypes.c_int(length)),4) |
|
78 self.pseudofile.write(lengthstr + self.OptData) |
|
79 |
|
80 lengthstr = self.pseudofile.read(4) |
|
81 # transform a byte string into length |
|
82 length = ctypes.cast(ctypes.c_char_p(lengthstr), ctypes.POINTER(ctypes.c_int)).contents.value |
|
83 return self.pseudofile.read(length) |
|
84 return None |
|
85 |
|
86 if __name__ == "__main__": |
|
87 TestConnection = LPCProto() |
|
88 |