Tests: fix project edit test sikuli IDE test.
Background click based on bitmap matching doesn't work.
Grid dots are not good match candidates.
Rendering probably affected by virtual display's bpp or rasterizer approximations.
from __future__ import print_function
from __future__ import absolute_import
import csv
from opcua import Client
from opcua import ua
import wx
import wx.lib.gizmos as gizmos # Formerly wx.gizmos in Classic
import wx.dataview as dv
UA_IEC_types = dict(
# pyopcua | IEC61131| C type | sz | open62541 enum | open62541
Boolean = ("BOOL" , "uint8_t" , "X", "UA_TYPES_BOOLEAN", "UA_Boolean"),
SByte = ("SINT" , "int8_t" , "B", "UA_TYPES_SBYTE" , "UA_SByte" ),
Byte = ("USINT", "uint8_t" , "B", "UA_TYPES_BYTE" , "UA_Byte" ),
Int16 = ("INT" , "int16_t" , "W", "UA_TYPES_INT16" , "UA_Int16" ),
UInt16 = ("UINT" , "uint16_t", "W", "UA_TYPES_UINT16" , "UA_UInt16" ),
Int32 = ("DINT" , "uint32_t", "D", "UA_TYPES_INT32" , "UA_Int32" ),
UInt32 = ("UDINT", "int32_t" , "D", "UA_TYPES_UINT32" , "UA_UInt32" ),
Int64 = ("LINT" , "int64_t" , "L", "UA_TYPES_INT64" , "UA_Int64" ),
UInt64 = ("ULINT", "uint64_t", "L", "UA_TYPES_UINT64" , "UA_UInt64" ),
Float = ("REAL" , "float" , "D", "UA_TYPES_FLOAT" , "UA_Float" ),
Double = ("LREAL", "double" , "L", "UA_TYPES_DOUBLE" , "UA_Double" ),
)
UA_NODE_ID_types = {
"int" : ("UA_NODEID_NUMERIC", "{}" ),
"str" : ("UA_NODEID_STRING" , '"{}"'),
"UUID" : ("UA_NODEID_UUID" , '"{}"'),
}
lstcolnames = [ "Name", "NSIdx", "IdType", "Id", "Type", "IEC"]
lstcolwidths = [ 100, 50, 100, 100, 100, 50]
lstcoltypess = [ str, int, str, str, str, int]
directions = ["input", "output"]
class OPCUASubListModel(dv.DataViewIndexListModel):
def __init__(self, data, log):
dv.DataViewIndexListModel.__init__(self, len(data))
self.data = data
self.log = log
def GetColumnType(self, col):
return "string"
def GetValueByRow(self, row, col):
return str(self.data[row][col])
# This method is called when the user edits a data item in the view.
def SetValueByRow(self, value, row, col):
expectedtype = lstcoltypess[col]
try:
v = expectedtype(value)
except ValueError:
self.log("String {} is invalid for type {}\n".format(value,expectedtype.__name__))
return False
if col == lstcolnames.index("IdType") and v not in UA_NODE_ID_types:
self.log("{} is invalid for IdType\n".format(value))
return False
self.data[row][col] = v
return True
# Report how many columns this model provides data for.
def GetColumnCount(self):
return len(lstcolnames)
# Report the number of rows in the model
def GetCount(self):
#self.log.write('GetCount')
return len(self.data)
# Called to check if non-standard attributes should be used in the
# cell at (row, col)
def GetAttrByRow(self, row, col, attr):
if col == 5:
attr.SetColour('blue')
attr.SetBold(True)
return True
return False
def DeleteRows(self, rows):
# make a copy since we'll be sorting(mutating) the list
# use reverse order so the indexes don't change as we remove items
rows = sorted(rows, reverse=True)
for row in rows:
# remove it from our data structure
del self.data[row]
# notify the view(s) using this model that it has been removed
self.RowDeleted(row)
def AddRow(self, value):
if self.data.append(value):
# notify views
self.RowAppended()
def ResetData(self):
self.Reset(len(self.data))
OPCUAClientDndMagicWord = "text/beremiz-opcuaclient"
class NodeDropTarget(wx.DropTarget):
def __init__(self, parent):
data = wx.CustomDataObject(OPCUAClientDndMagicWord)
wx.DropTarget.__init__(self, data)
self.ParentWindow = parent
def OnDrop(self, x, y):
self.ParentWindow.OnNodeDnD()
return True
class OPCUASubListPanel(wx.Panel):
def __init__(self, parent, log, model, direction):
self.log = log
wx.Panel.__init__(self, parent, -1)
self.dvc = dv.DataViewCtrl(self,
style=wx.BORDER_THEME
| dv.DV_ROW_LINES
| dv.DV_HORIZ_RULES
| dv.DV_VERT_RULES
| dv.DV_MULTIPLE
)
self.model = model
self.dvc.AssociateModel(self.model)
for idx,(colname,width) in enumerate(zip(lstcolnames,lstcolwidths)):
self.dvc.AppendTextColumn(colname, idx, width=width, mode=dv.DATAVIEW_CELL_EDITABLE)
DropTarget = NodeDropTarget(self)
self.dvc.SetDropTarget(DropTarget)
self.Sizer = wx.BoxSizer(wx.VERTICAL)
self.direction = direction
titlestr = direction + " variables"
title = wx.StaticText(self, label = titlestr)
delbt = wx.Button(self, label="Delete Row(s)")
self.Bind(wx.EVT_BUTTON, self.OnDeleteRows, delbt)
topsizer = wx.BoxSizer(wx.HORIZONTAL)
topsizer.Add(title, 1, wx.ALIGN_CENTER_VERTICAL|wx.LEFT|wx.RIGHT, 5)
topsizer.Add(delbt, 0, wx.LEFT|wx.RIGHT, 5)
self.Sizer.Add(topsizer, 0, wx.EXPAND|wx.TOP|wx.BOTTOM, 5)
self.Sizer.Add(self.dvc, 1, wx.EXPAND)
def OnDeleteRows(self, evt):
items = self.dvc.GetSelections()
rows = [self.model.GetRow(item) for item in items]
self.model.DeleteRows(rows)
def OnNodeDnD(self):
# Have to find OPC-UA client extension panel from here
# in order to avoid keeping reference (otherwise __del__ isn't called)
# splitter. panel. splitter
ClientPanel = self.GetParent().GetParent().GetParent()
nodes = ClientPanel.GetSelectedNodes()
for node in nodes:
cname = node.get_node_class().name
dname = node.get_display_name().Text
if cname != "Variable":
self.log("Node {} ignored (not a variable)".format(dname))
continue
tname = node.get_data_type_as_variant_type().name
if tname not in UA_IEC_types:
self.log("Node {} ignored (unsupported type)".format(dname))
continue
access = node.get_access_level()
if {"input":ua.AccessLevel.CurrentRead,
"output":ua.AccessLevel.CurrentWrite}[self.direction] not in access:
self.log("Node {} ignored because of insuficient access rights".format(dname))
continue
nsid = node.nodeid.NamespaceIndex
nid = node.nodeid.Identifier
nid_type = type(nid).__name__
iecid = nid
value = [dname,
nsid,
nid_type,
nid,
tname,
iecid]
self.model.AddRow(value)
il = None
fldridx = None
fldropenidx = None
fileidx = None
smileidx = None
isz = (16,16)
treecolnames = [ "Name", "Class", "NSIdx", "Id"]
treecolwidths = [ 250, 100, 50, 200]
def prepare_image_list():
global il, fldridx, fldropenidx, fileidx, smileidx
if il is not None:
return
il = wx.ImageList(isz[0], isz[1])
fldridx = il.Add(wx.ArtProvider.GetBitmap(wx.ART_FOLDER, wx.ART_OTHER, isz))
fldropenidx = il.Add(wx.ArtProvider.GetBitmap(wx.ART_FILE_OPEN, wx.ART_OTHER, isz))
fileidx = il.Add(wx.ArtProvider.GetBitmap(wx.ART_NORMAL_FILE, wx.ART_OTHER, isz))
smileidx = il.Add(wx.ArtProvider.GetBitmap(wx.ART_ADD_BOOKMARK, wx.ART_OTHER, isz))
class OPCUAClientPanel(wx.SplitterWindow):
def __init__(self, parent, modeldata, log, uri_getter):
self.log = log
wx.SplitterWindow.__init__(self, parent, -1)
self.ordered_nodes = []
self.inout_panel = wx.Panel(self)
self.inout_sizer = wx.FlexGridSizer(cols=1, hgap=0, rows=2, vgap=0)
self.inout_sizer.AddGrowableCol(0)
self.inout_sizer.AddGrowableRow(1)
self.client = None
self.uri_getter = uri_getter
self.connect_button = wx.ToggleButton(self.inout_panel, -1, "Browse Server")
self.selected_splitter = wx.SplitterWindow(self.inout_panel, style=wx.SUNKEN_BORDER | wx.SP_3D)
self.selected_datas = modeldata
self.selected_models = { direction:OPCUASubListModel(self.selected_datas[direction], log) for direction in directions }
self.selected_lists = { direction:OPCUASubListPanel(
self.selected_splitter, log,
self.selected_models[direction], direction)
for direction in directions }
self.selected_splitter.SplitHorizontally(*[self.selected_lists[direction] for direction in directions]+[300])
self.inout_sizer.Add(self.connect_button, flag=wx.GROW)
self.inout_sizer.Add(self.selected_splitter, flag=wx.GROW)
self.inout_sizer.Layout()
self.inout_panel.SetAutoLayout(True)
self.inout_panel.SetSizer(self.inout_sizer)
self.Initialize(self.inout_panel)
self.Bind(wx.EVT_TOGGLEBUTTON, self.OnConnectButton, self.connect_button)
def OnClose(self):
if self.client is not None:
self.client.disconnect()
self.client = None
def __del__(self):
self.OnClose()
def OnConnectButton(self, event):
if self.connect_button.GetValue():
self.tree_panel = wx.Panel(self)
self.tree_sizer = wx.FlexGridSizer(cols=1, hgap=0, rows=2, vgap=0)
self.tree_sizer.AddGrowableCol(0)
self.tree_sizer.AddGrowableRow(0)
self.tree = gizmos.TreeListCtrl(self.tree_panel, -1, style=0, agwStyle=
gizmos.TR_DEFAULT_STYLE
| gizmos.TR_MULTIPLE
| gizmos.TR_FULL_ROW_HIGHLIGHT
)
prepare_image_list()
self.tree.SetImageList(il)
for idx,(colname, width) in enumerate(zip(treecolnames, treecolwidths)):
self.tree.AddColumn(colname)
self.tree.SetColumnWidth(idx, width)
self.tree.SetMainColumn(0)
self.client = Client(self.uri_getter())
self.client.connect()
self.client.load_type_definitions() # load definition of server specific structures/extension objects
rootnode = self.client.get_root_node()
rootitem = self.AddNodeItem(self.tree.AddRoot, rootnode)
# Populate first level so that root can be expanded
self.CreateSubItems(rootitem)
self.tree.Bind(wx.EVT_TREE_ITEM_EXPANDED, self.OnExpand)
self.tree.Bind(wx.EVT_TREE_SEL_CHANGED, self.OnTreeNodeSelection)
self.tree.Bind(wx.EVT_TREE_BEGIN_DRAG, self.OnTreeBeginDrag)
self.tree.Expand(rootitem)
hint = wx.StaticText(self, label = "Drag'n'drop desired variables from tree to Input or Output list")
self.tree_sizer.Add(self.tree, flag=wx.GROW)
self.tree_sizer.Add(hint, flag=wx.GROW)
self.tree_sizer.Layout()
self.tree_panel.SetAutoLayout(True)
self.tree_panel.SetSizer(self.tree_sizer)
self.SplitVertically(self.tree_panel, self.inout_panel, 500)
else:
self.client.disconnect()
self.client = None
self.Unsplit(self.tree_panel)
self.tree_panel.Destroy()
def CreateSubItems(self, item):
node, browsed = self.tree.GetPyData(item)
if not browsed:
for subnode in node.get_children():
self.AddNodeItem(lambda n: self.tree.AppendItem(item, n), subnode)
self.tree.SetPyData(item,(node, True))
def AddNodeItem(self, item_creation_func, node):
nsid = node.nodeid.NamespaceIndex
nid = node.nodeid.Identifier
dname = node.get_display_name().Text
cname = node.get_node_class().name
item = item_creation_func(dname)
if cname == "Variable":
access = node.get_access_level()
normalidx = fileidx
r = ua.AccessLevel.CurrentRead in access
w = ua.AccessLevel.CurrentWrite in access
if r and w:
ext = "RW"
elif r:
ext = "RO"
elif w:
ext = "WO" # not sure this one exist
else:
ext = "no access" # not sure this one exist
cname = "Var "+node.get_data_type_as_variant_type().name+" (" + ext + ")"
else:
normalidx = fldridx
self.tree.SetPyData(item,(node, False))
self.tree.SetItemText(item, cname, 1)
self.tree.SetItemText(item, str(nsid), 2)
self.tree.SetItemText(item, type(nid).__name__+": "+str(nid), 3)
self.tree.SetItemImage(item, normalidx, which = wx.TreeItemIcon_Normal)
self.tree.SetItemImage(item, fldropenidx, which = wx.TreeItemIcon_Expanded)
return item
def OnExpand(self, evt):
for item in evt.GetItem().GetChildren():
self.CreateSubItems(item)
# def OnActivate(self, evt):
# item = evt.GetItem()
# node, browsed = self.tree.GetPyData(item)
def OnTreeNodeSelection(self, event):
items = self.tree.GetSelections()
items_pydata = [self.tree.GetPyData(item) for item in items]
nodes = [node for node, _unused in items_pydata]
# append new nodes to ordered list
for node in nodes:
if node not in self.ordered_nodes:
self.ordered_nodes.append(node)
# filter out vanished items
self.ordered_nodes = [
node
for node in self.ordered_nodes
if node in nodes]
def GetSelectedNodes(self):
return self.ordered_nodes
def OnTreeBeginDrag(self, event):
"""
Called when a drag is started in tree
@param event: wx.TreeEvent
"""
if self.ordered_nodes:
# Just send a recognizable mime-type, drop destination
# will get python data from parent
data = wx.CustomDataObject(OPCUAClientDndMagicWord)
dragSource = wx.DropSource(self)
dragSource.SetData(data)
dragSource.DoDragDrop()
def Reset(self):
for direction in directions:
self.selected_models[direction].ResetData()
class OPCUAClientList(list):
def __init__(self, log = lambda m:None):
super(OPCUAClientList, self).__init__(self)
self.log = log
def append(self, value):
v = dict(zip(lstcolnames, value))
if type(v["IEC"]) != int:
if len(self) == 0:
v["IEC"] = 0
else:
iecnums = set(zip(*self)[lstcolnames.index("IEC")])
greatest = max(iecnums)
holes = set(range(greatest)) - iecnums
v["IEC"] = min(holes) if holes else greatest+1
if v["IdType"] not in UA_NODE_ID_types:
self.log("Unknown IdType\n".format(value))
return False
try:
for t,n in zip(lstcoltypess, lstcolnames):
v[n] = t(v[n])
except ValueError:
self.log("Variable {} (Id={}) has invalid type\n".format(v["Name"],v["Id"]))
return False
if len(self)>0 and v["Id"] in zip(*self)[lstcolnames.index("Id")]:
self.log("Variable {} (Id={}) already in list\n".format(v["Name"],v["Id"]))
return False
list.append(self, [v[n] for n in lstcolnames])
return True
class OPCUAClientModel(dict):
def __init__(self, log = lambda m:None):
super(OPCUAClientModel, self).__init__()
for direction in directions:
self[direction] = OPCUAClientList(log)
def LoadCSV(self,path):
with open(path, 'rb') as csvfile:
reader = csv.reader(csvfile, delimiter=',', quotechar='"')
buf = {direction:[] for direction, _model in self.iteritems()}
for direction, model in self.iteritems():
self[direction][:] = []
for row in reader:
direction = row[0]
self[direction].append(row[1:])
def SaveCSV(self,path):
with open(path, 'wb') as csvfile:
for direction, data in self.iteritems():
writer = csv.writer(csvfile, delimiter=',',
quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in data:
writer.writerow([direction] + row)
def GenerateC(self, path, locstr, server_uri):
template = """/* code generated by beremiz OPC-UA extension */
#include <open62541/client_config_default.h>
#include <open62541/client_highlevel.h>
#include <open62541/plugin/log_stdout.h>
static UA_Client *client;
#define DECL_VAR(ua_type, C_type, c_loc_name) \\
static UA_Variant c_loc_name##_variant; \\
static C_type c_loc_name##_buf = 0; \\
C_type *c_loc_name = &c_loc_name##_buf;
%(decl)s
void __cleanup_%(locstr)s(void)
{
UA_Client_disconnect(client);
UA_Client_delete(client);
}
#define INIT_READ_VARIANT(ua_type, c_loc_name) \\
UA_Variant_init(&c_loc_name##_variant);
#define INIT_WRITE_VARIANT(ua_type, ua_type_enum, c_loc_name) \\
UA_Variant_setScalar(&c_loc_name##_variant, (ua_type*)c_loc_name, &UA_TYPES[ua_type_enum]);
int __init_%(locstr)s(int argc,char **argv)
{
UA_StatusCode retval;
client = UA_Client_new();
UA_ClientConfig_setDefault(UA_Client_getConfig(client));
%(init)s
/* Connect to server */
retval = UA_Client_connect(client, "%(uri)s");
if(retval != UA_STATUSCODE_GOOD) {
UA_Client_delete(client);
return EXIT_FAILURE;
}
}
#define READ_VALUE(ua_type, ua_type_enum, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\
retval = UA_Client_readValueAttribute( \\
client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant); \\
if(retval == UA_STATUSCODE_GOOD && UA_Variant_isScalar(&c_loc_name##_variant) && \\
c_loc_name##_variant.type == &UA_TYPES[ua_type_enum]) { \\
c_loc_name##_buf = *(ua_type*)c_loc_name##_variant.data; \\
UA_Variant_clear(&c_loc_name##_variant); /* Unalloc requiered on each read ! */ \\
}
void __retrieve_%(locstr)s(void)
{
UA_StatusCode retval;
%(retrieve)s
}
#define WRITE_VALUE(ua_type, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\
UA_Client_writeValueAttribute( \\
client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant);
void __publish_%(locstr)s(void)
{
%(publish)s
}
"""
formatdict = dict(
locstr = locstr,
uri = server_uri,
decl = "",
cleanup = "",
init = "",
retrieve = "",
publish = ""
)
for direction, data in self.iteritems():
iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
for row in data:
name, ua_nsidx, ua_nodeid_type, _ua_node_id, ua_type, iec_number = row
iec_type, C_type, iec_size_prefix, ua_type_enum, ua_type = UA_IEC_types[ua_type]
c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
ua_nodeid_type, id_formating = UA_NODE_ID_types[ua_nodeid_type]
ua_node_id = id_formating.format(_ua_node_id)
formatdict["decl"] += """
DECL_VAR({ua_type}, {C_type}, {c_loc_name})""".format(**locals())
if direction == "input":
formatdict["init"] +="""
INIT_READ_VARIANT({ua_type}, {c_loc_name})""".format(**locals())
formatdict["retrieve"] += """
READ_VALUE({ua_type}, {ua_type_enum}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
if direction == "output":
formatdict["init"] +="""
INIT_WRITE_VARIANT({ua_type}, {ua_type_enum}, {c_loc_name})""".format(**locals())
formatdict["publish"] += """
WRITE_VALUE({ua_type}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
Ccode = template%formatdict
return Ccode
if __name__ == "__main__":
import wx.lib.mixins.inspection as wit
import sys,os
app = wit.InspectableApp()
frame = wx.Frame(None, -1, "OPCUA Client Test App", size=(800,600))
uri = sys.argv[1] if len(sys.argv)>1 else "opc.tcp://localhost:4840"
test_panel = wx.Panel(frame)
test_sizer = wx.FlexGridSizer(cols=1, hgap=0, rows=2, vgap=0)
test_sizer.AddGrowableCol(0)
test_sizer.AddGrowableRow(0)
modeldata = OPCUAClientModel(print)
opcuatestpanel = OPCUAClientPanel(test_panel, modeldata, print, lambda:uri)
def OnGenerate(evt):
dlg = wx.FileDialog(
frame, message="Generate file as ...", defaultDir=os.getcwd(),
defaultFile="",
wildcard="C (*.c)|*.c", style=wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT
)
if dlg.ShowModal() == wx.ID_OK:
path = dlg.GetPath()
Ccode = """
/*
In case open62541 was built just aside beremiz, you can build this test with:
gcc %s -o %s \\
-I ../../open62541/plugins/include/ \\
-I ../../open62541/build/src_generated/ \\
-I ../../open62541/include/ \\
-I ../../open62541/arch/ ../../open62541/build/bin/libopen62541.a
*/
"""%(path, path[:-2]) + modeldata.GenerateC(path, "test", uri) + """
int main(int argc, char *argv[]) {
__init_test(arc,argv);
__retrieve_test();
__publish_test();
__cleanup_test();
return EXIT_SUCCESS;
}
"""
with open(path, 'wb') as Cfile:
Cfile.write(Ccode)
dlg.Destroy()
def OnLoad(evt):
dlg = wx.FileDialog(
frame, message="Choose a file",
defaultDir=os.getcwd(),
defaultFile="",
wildcard="CSV (*.csv)|*.csv",
style=wx.FD_OPEN | wx.FD_CHANGE_DIR | wx.FD_FILE_MUST_EXIST )
if dlg.ShowModal() == wx.ID_OK:
path = dlg.GetPath()
modeldata.LoadCSV(path)
opcuatestpanel.Reset()
dlg.Destroy()
def OnSave(evt):
dlg = wx.FileDialog(
frame, message="Save file as ...", defaultDir=os.getcwd(),
defaultFile="",
wildcard="CSV (*.csv)|*.csv", style=wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT
)
if dlg.ShowModal() == wx.ID_OK:
path = dlg.GetPath()
modeldata.SaveCSV(path)
dlg.Destroy()
test_sizer.Add(opcuatestpanel, flag=wx.GROW)
testbt_sizer = wx.BoxSizer(wx.HORIZONTAL)
loadbt = wx.Button(test_panel, label="Load")
test_panel.Bind(wx.EVT_BUTTON, OnLoad, loadbt)
savebt = wx.Button(test_panel, label="Save")
test_panel.Bind(wx.EVT_BUTTON, OnSave, savebt)
genbt = wx.Button(test_panel, label="Generate")
test_panel.Bind(wx.EVT_BUTTON, OnGenerate, genbt)
testbt_sizer.Add(loadbt, 0, wx.LEFT|wx.RIGHT, 5)
testbt_sizer.Add(savebt, 0, wx.LEFT|wx.RIGHT, 5)
testbt_sizer.Add(genbt, 0, wx.LEFT|wx.RIGHT, 5)
test_sizer.Add(testbt_sizer, flag=wx.GROW)
test_sizer.Layout()
test_panel.SetAutoLayout(True)
test_panel.SetSizer(test_sizer)
def OnClose(evt):
opcuatestpanel.OnClose()
evt.Skip()
frame.Bind(wx.EVT_CLOSE, OnClose)
frame.Show()
app.MainLoop()