|
1 #!/usr/bin/env python |
|
2 # -*- coding: utf-8 -*- |
|
3 |
|
4 #This file is part of PLCOpenEditor, a library implementing an IEC 61131-3 editor |
|
5 #based on the plcopen standard. |
|
6 # |
|
7 #Copyright (C): Edouard TISSERANT and Laurent BESSARD |
|
8 # |
|
9 #See COPYING file for copyrights details. |
|
10 # |
|
11 #This library is free software; you can redistribute it and/or |
|
12 #modify it under the terms of the GNU Lesser General Public |
|
13 #License as published by the Free Software Foundation; either |
|
14 #version 2.1 of the License, or (at your option) any later version. |
|
15 # |
|
16 #This library is distributed in the hope that it will be useful, |
|
17 #but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
18 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
19 #Lesser General Public License for more details. |
|
20 # |
|
21 #You should have received a copy of the GNU Lesser General Public |
|
22 #License along with this library; if not, write to the Free Software |
|
23 #Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
|
24 |
|
25 from plcopen import plcopen |
|
26 from plcopen.structures import * |
|
27 from types import * |
|
28 |
|
29 varTypeNames = {"localVars" : "VAR", "tempVars" : "VAR_TEMP", "inputVars" : "VAR_INPUT", |
|
30 "outputVars" : "VAR_OUTPUT", "inOutVars" : "VAR_IN_OUT", "externalVars" : "VAR_EXTERNAL", |
|
31 "globalVars" : "VAR_GLOBAL", "accessVars" : "VAR_ACCESS"} |
|
32 """ |
|
33 Module implementing methods for generating PLC programs in ST or IL |
|
34 """ |
|
35 |
|
36 class PouProgram: |
|
37 |
|
38 def __init__(self, name, type): |
|
39 self.Name = name |
|
40 self.Type = type |
|
41 self.Interface = {} |
|
42 self.Steps = {} |
|
43 self.Transitions = {} |
|
44 self.Order = [] |
|
45 self.Program = "" |
|
46 |
|
47 def GenerateInterface(self, interface): |
|
48 if self.Type == "FUNCTION": |
|
49 self.Interface["returnType"] = interface.getReturnType().getValue() |
|
50 for varlist in interface.getContent(): |
|
51 variables = {} |
|
52 for var in varlist["value"].getVariable(): |
|
53 type = var.getType().getValue() |
|
54 if type not in variables: |
|
55 variables[type] = [] |
|
56 variables[type].append(var.getName()) |
|
57 self.Interface[(varTypeNames[varlist["name"]], varlist["value"].getRetain(), |
|
58 varlist["value"].getConstant())] = variables |
|
59 |
|
60 def GenerateProgram(self, pou): |
|
61 body = pou.getBody() |
|
62 body_content = body.getContent() |
|
63 body_type = body_content["name"] |
|
64 if body_type in ["IL","ST"]: |
|
65 self.Program = "%s\n"%body_content["value"].getText() |
|
66 elif body_type == "FBD": |
|
67 for instance in body.getContentInstances(): |
|
68 if isinstance(instance, plcopen.outVariable): |
|
69 var = instance.getExpression() |
|
70 connections = instance.connectionPointIn.getConnections() |
|
71 if connections and len(connections) == 1: |
|
72 expression = self.ComputeFBDExpression(body, connections[0]) |
|
73 self.Program += " %s := %s;\n"%(var, expression) |
|
74 elif body_type == "LD": |
|
75 for instance in body.getContentInstances(): |
|
76 if isinstance(instance, plcopen.coil): |
|
77 paths = self.GenerateLDPaths(instance, body) |
|
78 variable = self.ExtractModifier(instance, instance.getVariable()) |
|
79 expression = self.ComputeLDExpression(paths, True) |
|
80 self.Program += " %s := %s;\n"%(variable, expression) |
|
81 elif body_type == "SFC": |
|
82 for instance in body.getContentInstances(): |
|
83 if isinstance(instance, plcopen.step): |
|
84 self.GenerateSFCSteps(instance, pou) |
|
85 elif isinstance(instance, plcopen.actionBlock): |
|
86 self.GenerateSFCActions(instance, pou) |
|
87 elif isinstance(instance, plcopen.transition): |
|
88 self.GenerateSFCTransitions(instance, pou) |
|
89 elif isinstance(instance, plcopen.jumpStep): |
|
90 self.GenerateSFCJump(instance, pou) |
|
91 for name, values in self.Steps.items(): |
|
92 if values['initial']: |
|
93 self.GenerateSFCStepOrder(name, []) |
|
94 steps_type = "ARRAY [1..%d] OF BOOL"%len(self.Order) |
|
95 if ("VAR", False, False) not in self.Interface: |
|
96 self.Interface[("VAR", False, False)] = {} |
|
97 if steps_type not in self.Interface[("VAR", False, False)]: |
|
98 self.Interface[("VAR", False, False)][steps_type] = ["Steps"] |
|
99 else: |
|
100 self.Interface[("VAR", False, False)][steps_type].append("Steps") |
|
101 for index, name in enumerate(self.Order): |
|
102 values = self.Steps[name] |
|
103 self.Program += " IF Steps[%d] THEN\n"%index |
|
104 for action in values["actions"]: |
|
105 if action["qualifier"] == "N": |
|
106 for line in action["content"].splitlines(): |
|
107 self.Program += " %s\n"%line |
|
108 elif action["qualifier"] == "S": |
|
109 if "R_TRIG" not in self.Interface[("VAR", False, False)]: |
|
110 self.Interface[("VAR", False, False)]["R_TRIG"] = [] |
|
111 i = 1 |
|
112 name = "R_TRIG%d"%i |
|
113 while name in self.Interface[("VAR", False, False)]["R_TRIG"]: |
|
114 i += 1 |
|
115 name = "R_TRIG%d"%i |
|
116 self.Interface[("VAR", False, False)]["R_TRIG"].append(name) |
|
117 self.Program += " IF %s(CLK := Steps[%d]) THEN\n"%(name, index) |
|
118 self.Program += " %s := TRUE;\n"%action["content"] |
|
119 for transition in values["transitions"]: |
|
120 if transition["compute"] != '': |
|
121 self.Program += " %s %s"%(transition["condition"], transition["compute"]) |
|
122 self.Program += " IF %s THEN\n"%transition["condition"] |
|
123 for target in transition["targets"]: |
|
124 self.Program += " Steps[%d] := TRUE;\n"%self.Order.index(target) |
|
125 self.Program += " Steps[%d] := FALSE;\n"%index |
|
126 |
|
127 def ComputeFBDExpression(self, body, link): |
|
128 localid = link.getRefLocalId() |
|
129 instance = body.getContentInstance(localid) |
|
130 if isinstance(instance, plcopen.inVariable): |
|
131 return instance.getExpression() |
|
132 elif isinstance(instance, plcopen.block): |
|
133 name = instance.getInstanceName() |
|
134 type = instance.getTypeName() |
|
135 block_infos = GetBlockType(type) |
|
136 if block_infos["type"] == "function": |
|
137 vars = [] |
|
138 for variable in instance.inputVariables.getVariable(): |
|
139 connections = variable.connectionPointIn.getConnections() |
|
140 if connections and len(connections) == 1: |
|
141 value = self.ComputeFBDExpression(body, connections[0]) |
|
142 vars.append(self.ExtractModifier(variable, value)) |
|
143 variable = instance.outputVariables.getVariable()[0] |
|
144 return self.ExtractModifier(variable, "%s(%s)"%(type, ", ".join(vars))) |
|
145 elif block_infos["type"] == "functionBlock": |
|
146 if ("VAR", False, False) not in self.Interface: |
|
147 self.Interface[("VAR", False, False)] = {} |
|
148 if type not in self.Interface[("VAR", False, False)]: |
|
149 self.Interface[("VAR", False, False)][type] = [] |
|
150 if name not in self.Interface[("VAR", False, False)][type]: |
|
151 self.Interface[("VAR", False, False)][type].append(name) |
|
152 vars = [] |
|
153 for variable in instance.inputVariables.getVariable(): |
|
154 connections = variable.connectionPointIn.getConnections() |
|
155 if connections and len(connections) == 1: |
|
156 parameter = variable.getFormalParameter() |
|
157 value = self.ComputeFBDExpression(body, connections[0]) |
|
158 vars.append(self.ExtractModifier(variable, "%s := %s"%(parameter, value))) |
|
159 self.Program += " %s(%s);\n"%(name, ", ".join(vars)) |
|
160 connectionPoint = link.getPosition()[-1] |
|
161 for variable in instance.outputVariables.getVariable(): |
|
162 blockPointx, blockPointy = variable.connectionPointOut.getRelPosition() |
|
163 if instance.getX() + blockPointx == connectionPoint.getX() and instance.getY() + blockPointy == connectionPoint.getY(): |
|
164 return self.ExtractModifier(variable, "%s.%s"%(name, variable.getFormalParameter())) |
|
165 raise ValueError, "No output variable found" |
|
166 |
|
167 def GenerateLDPaths(self, instance, body): |
|
168 paths = [] |
|
169 variable = self.ExtractModifier(instance, instance.getVariable()) |
|
170 connections = instance.connectionPointIn.getConnections() |
|
171 for connection in connections: |
|
172 localId = connection.getRefLocalId() |
|
173 next = body.getContentInstance(localId) |
|
174 if isinstance(next, plcopen.leftPowerRail): |
|
175 paths.append(None) |
|
176 else: |
|
177 paths.append(self.GenerateLDPaths(next, body)) |
|
178 if isinstance(instance, plcopen.coil): |
|
179 if len(paths) > 1: |
|
180 return tuple(paths) |
|
181 else: |
|
182 return paths |
|
183 else: |
|
184 if len(paths) > 1: |
|
185 return [variable, tuple(paths)] |
|
186 elif type(paths[0]) == ListType: |
|
187 return [variable] + paths[0] |
|
188 elif paths[0]: |
|
189 return [variable, paths[0]] |
|
190 else: |
|
191 return variable |
|
192 |
|
193 def GenerateSFCSteps(self, step, pou): |
|
194 step_name = step.getName() |
|
195 if step_name not in self.Steps: |
|
196 step_infos = {"initial" : step.getInitialStep(), "transitions" : [], "actions" : []} |
|
197 self.Steps[step_name] = step_infos |
|
198 if step.connectionPointIn: |
|
199 instances = [] |
|
200 connections = step.connectionPointIn.getConnections() |
|
201 if len(connections) == 1: |
|
202 instanceLocalId = connections[0].getRefLocalId() |
|
203 instance = pou.body.getContentInstance(instanceLocalId) |
|
204 if isinstance(instance, plcopen.transition): |
|
205 self.GenerateSFCTransitions(instance, pou) |
|
206 instances.append(instance) |
|
207 elif isinstance(instance, plcopen.selectionConvergence): |
|
208 for connectionPointIn in instance.getConnectionPointIn(): |
|
209 divergence_connections = connectionPointIn.getConnections() |
|
210 if len(divergence_connections) == 1: |
|
211 transitionLocalId = connections[0].getRefLocalId() |
|
212 transition = pou.body.getContentInstance(transitionLocalId) |
|
213 self.GenerateSFCTransitions(transition, pou) |
|
214 instances.append(transition) |
|
215 for instance in instances: |
|
216 self.Transitions[instance]["targets"].append(step_name) |
|
217 |
|
218 def GenerateSFCJump(self, jump, pou): |
|
219 jump_target = jump.getTargetName() |
|
220 if jump.connectionPointIn: |
|
221 instances = [] |
|
222 connections = jump.connectionPointIn.getConnections() |
|
223 if len(connections) == 1: |
|
224 instanceLocalId = connections[0].getRefLocalId() |
|
225 instance = pou.body.getContentInstance(instanceLocalId) |
|
226 if isinstance(instance, plcopen.transition): |
|
227 self.GenerateSFCTransitions(instance, pou) |
|
228 instances.append(instance) |
|
229 elif isinstance(instance, plcopen.selectionConvergence): |
|
230 for connectionPointIn in instance.getConnectionPointIn(): |
|
231 divergence_connections = connectionPointIn.getConnections() |
|
232 if len(divergence_connections) == 1: |
|
233 transitionLocalId = divergence_connections[0].getRefLocalId() |
|
234 transition = pou.body.getContentInstance(transitionLocalId) |
|
235 self.GenerateSFCTransitions(transition, pou) |
|
236 instances.append(transition) |
|
237 for instance in instances: |
|
238 self.Transitions[instance]["targets"].append(jump_target) |
|
239 |
|
240 def GenerateSFCActions(self, actionBlock, pou): |
|
241 connections = actionBlock.connectionPointIn.getConnections() |
|
242 if len(connections) == 1: |
|
243 stepLocalId = connections[0].getRefLocalId() |
|
244 step = pou.body.getContentInstance(stepLocalId) |
|
245 step_name = step.getName() |
|
246 if step_name not in self.Steps: |
|
247 self.GenerateSFCSteps(step, pou) |
|
248 if step_name in self.Steps: |
|
249 actions = actionBlock.getActions() |
|
250 for action in actions: |
|
251 action_infos = {"qualifier" : action["qualifier"], "content" : ""} |
|
252 if action["type"] == "inline": |
|
253 action_infos["content"] = action["value"] |
|
254 elif action["type"] == "reference": |
|
255 actionContent = pou.getAction(action["value"]) |
|
256 if actionContent: |
|
257 actionType = actionContent.getBodyType() |
|
258 actionBody = actionContent.getBody() |
|
259 if actionType in ["ST", "IL"]: |
|
260 action_infos["content"] = actionContent.getText() |
|
261 elif actionType == "FBD": |
|
262 for instance in actionBody.getContentInstances(): |
|
263 if isinstance(instance, plcopen.outVariable): |
|
264 var = instance.getExpression() |
|
265 connections = instance.connectionPointIn.getConnections() |
|
266 if connections and len(connections) == 1: |
|
267 expression = self.ComputeFBDExpression(actionBody, connections[0]) |
|
268 action_infos["content"] = self.Program + " %s := %s;"%(var, expression) |
|
269 self.Program = "" |
|
270 elif actionType == "LD": |
|
271 for instance in actionbody.getContentInstances(): |
|
272 if isinstance(instance, plcopen.coil): |
|
273 paths = self.GenerateLDPaths(instance, actionBody) |
|
274 variable = self.ExtractModifier(instance, instance.getVariable()) |
|
275 expression = self.ComputeLDExpression(paths, True) |
|
276 action_infos["content"] = self.Program + " %s := %s;"%(variable, expression) |
|
277 self.Program = "" |
|
278 else: |
|
279 action_infos["content"] = action["value"] |
|
280 self.Steps[step_name]["actions"].append(action_infos) |
|
281 |
|
282 def GenerateSFCTransitions(self, transition, pou): |
|
283 if transition not in self.Transitions: |
|
284 connections = transition.connectionPointIn.getConnections() |
|
285 if len(connections) == 1: |
|
286 instanceLocalId = connections[0].getRefLocalId() |
|
287 instance = pou.body.getContentInstance(instanceLocalId) |
|
288 if isinstance(instance, plcopen.step): |
|
289 step_name = instance.getName() |
|
290 if step_name not in self.Steps: |
|
291 self.GenerateSFCSteps(instance, pou) |
|
292 elif isinstance(instance, plcopen.selectionDivergence): |
|
293 divergence_connections = instance.connectionPointIn.getConnections() |
|
294 if len(divergence_connections) == 1: |
|
295 stepLocalId = divergence_connections[0].getRefLocalId() |
|
296 divergence_instance = pou.body.getContentInstance(stepLocalId) |
|
297 if isinstance(divergence_instance, plcopen.step): |
|
298 step_name = divergence_instance.getName() |
|
299 if step_name not in self.Steps: |
|
300 self.GenerateSFCSteps(divergence_instance, pou) |
|
301 if step_name in self.Steps: |
|
302 transition_infos = {"targets" : []} |
|
303 transitionValues = transition.getConditionContent() |
|
304 transition_infos["condition"] = transitionValues["value"] |
|
305 if transitionValues["type"] == "inline": |
|
306 transition_infos["compute"] = "" |
|
307 else: |
|
308 transitionContent = pou.getTransition(transitionValues["value"]) |
|
309 transitionType = transitionContent.getBodyType() |
|
310 transitionBody = transitionContent.getBody() |
|
311 if transitionType in ["ST", "IL"]: |
|
312 transition_infos["compute"] = "%s\n"%transitionContent.getText() |
|
313 elif conditionType == "FBD": |
|
314 for instance in conditionBody.getContentInstances(): |
|
315 if isinstance(instance, plcopen.outVariable): |
|
316 var = instance.getExpression() |
|
317 connections = instance.connectionPointIn.getConnections() |
|
318 if connections and len(connections) == 1: |
|
319 expression = self.ComputeFBDExpression(actionBody, connections[0]) |
|
320 transition_infos["compute"] = self.Program + ":= %s;\n"%(var, expression) |
|
321 self.Program = "" |
|
322 elif actionType == "LD": |
|
323 for instance in conditionbody.getContentInstances(): |
|
324 if isinstance(instance, plcopen.coil): |
|
325 paths = self.GenerateLDPaths(instance, conditionBody) |
|
326 variable = self.ExtractModifier(instance, instance.getVariable()) |
|
327 expression = self.ComputeLDExpression(paths, True) |
|
328 transition_infos["compute"] = self.Program + ":= %s;\n"%(variable, expression) |
|
329 self.Program = "" |
|
330 self.Steps[step_name]["transitions"].append(transition_infos) |
|
331 self.Transitions[transition] = transition_infos |
|
332 |
|
333 def GenerateSFCStepOrder(self, name, path): |
|
334 self.Order.append(name) |
|
335 for transition in self.Steps[name]["transitions"]: |
|
336 for target in transition["targets"]: |
|
337 if target not in self.Order or target not in path: |
|
338 if target in self.Order: |
|
339 self.Order.remove(target) |
|
340 self.GenerateSFCStepOrder(target, path + [name]) |
|
341 |
|
342 def ComputeLDExpression(self, paths, first = False): |
|
343 if type(paths) == TupleType: |
|
344 if None in paths: |
|
345 return "TRUE" |
|
346 else: |
|
347 var = [self.ComputeLDExpression(path) for path in paths] |
|
348 if first: |
|
349 return " OR ".join(var) |
|
350 else: |
|
351 return "(%s)"%" OR ".join(var) |
|
352 elif type(paths) == ListType: |
|
353 var = [self.ComputeLDExpression(path) for path in paths] |
|
354 return " AND ".join(var) |
|
355 else: |
|
356 return paths |
|
357 |
|
358 def ExtractModifier(self, variable, text): |
|
359 if variable.getNegated(): |
|
360 return "NOT(%s)"%text |
|
361 else: |
|
362 edge = variable.getEdge() |
|
363 if edge: |
|
364 if edge.getValue() == "rising": |
|
365 return self.AddTrigger("R_TRIG", text) |
|
366 elif edge.getValue() == "falling": |
|
367 return self.AddTrigger("F_TRIG", text) |
|
368 return text |
|
369 |
|
370 def AddTrigger(self, edge, text): |
|
371 if ("VAR", False, False) not in self.Interface: |
|
372 self.Interface[("VAR", False, False)] = {} |
|
373 if type not in self.Interface[("VAR", False, False)]: |
|
374 self.Interface[("VAR", False, False)][edge] = [] |
|
375 i = 1 |
|
376 name = "%s%d"%(edge, i) |
|
377 while name in self.Interface[("VAR", False, False)][edge]: |
|
378 i += 1 |
|
379 name = "%s%d"%(edge, i) |
|
380 self.Interface[("VAR", False, False)][edge].append(name) |
|
381 self.Program += " %s(CLK := %s);\n"%(name, text) |
|
382 return "%s.Q"%name |
|
383 |
|
384 def GenerateSTProgram(self): |
|
385 program = "" |
|
386 if "returnType" in self.Interface: |
|
387 program += "%s %s : %s\n"%(self.Type, self.Name, self.Interface["returnType"]) |
|
388 else: |
|
389 program += "%s %s\n"%(self.Type, self.Name) |
|
390 for values, variables in self.Interface.items(): |
|
391 if values != "returnType": |
|
392 program += " %s"%values[0] |
|
393 if values[1]: |
|
394 program += " RETAIN" |
|
395 if values[2]: |
|
396 program += " CONSTANT" |
|
397 program += "\n" |
|
398 for vartype, list in variables.items(): |
|
399 program += " %s : %s;\n"%(", ".join(list), vartype) |
|
400 program += " END_%s\n"%values[0] |
|
401 program += "\n" |
|
402 program += self.Program |
|
403 program += "END_%s\n\n"%self.Type |
|
404 return program |
|
405 |
|
406 def GenerateCurrentProgram(project): |
|
407 program = "" |
|
408 for pou in project.getPous(): |
|
409 pou_type = pou.getPouType().getValue() |
|
410 if pou_type == "function": |
|
411 pou_program = PouProgram(pou.getName(), "FUNCTION") |
|
412 elif pou_type == "functionBlock": |
|
413 pou_program = PouProgram(pou.getName(), "FUNCTION_BLOCK") |
|
414 elif pou_type == "program": |
|
415 pou_program = PouProgram(pou.getName(), "PROGRAM") |
|
416 else: |
|
417 raise ValueError, "Undefined pou type" |
|
418 pou_program.GenerateInterface(pou.getInterface()) |
|
419 pou_program.GenerateProgram(pou) |
|
420 program += pou_program.GenerateSTProgram() |
|
421 return program |
|
422 |