tests/cli_tests/opcua_test.bash
changeset 3820 46f3ca3f0157
parent 3718 7841b651d601
equal deleted inserted replaced
3819:4582f0fcf4c4 3820:46f3ca3f0157
    17 ) << EOF &
    17 ) << EOF &
    18 
    18 
    19 import sys
    19 import sys
    20 import os
    20 import os
    21 import time
    21 import time
       
    22 import asyncio
    22 
    23 
    23 from opcua import ua, Server
    24 from asyncua import Server
    24 
    25 
    25 server = Server()
    26 async def main():
    26 host = os.environ.get("OPCUA_DEFAULT_HOST", "127.0.0.1")
    27     server = Server()
    27 endpoint = "opc.tcp://"+host+":4840/freeopcua/server/"
    28     host = os.environ.get("OPCUA_DEFAULT_HOST", "127.0.0.1")
    28 server.set_endpoint(endpoint)
    29     endpoint = "opc.tcp://"+host+":4840/freeopcua/server/"
       
    30     await server.init()
       
    31     server.set_endpoint(endpoint)
    29 
    32 
    30 uri = "http://beremiz.github.io"
    33     uri = "http://beremiz.github.io"
    31 idx = server.register_namespace(uri)
    34     idx = await server.register_namespace(uri)
    32 
    35 
    33 objects = server.get_objects_node()
    36     objects = server.get_objects_node()
    34 
    37 
    35 testobj = objects.add_object(idx, "TestObject")
    38     testobj = await objects.add_object(idx, "TestObject")
    36 testvarout = testobj.add_variable(idx, "TestOut", 1.2)
    39     testvarout = await testobj.add_variable(idx, "TestOut", 1.2)
    37 testvar = testobj.add_variable(idx, "TestIn", 5.6)
    40     testvar = await testobj.add_variable(idx, "TestIn", 5.6)
    38 testvar.set_writable()
    41     await testvar.set_writable()
    39 
    42 
    40 server.start()
    43     await server.start()
       
    44     try:
       
    45         while True:
       
    46             await asyncio.sleep(1)
       
    47             print(await testvar.get_value())
       
    48             sys.stdout.flush()
       
    49     finally:
       
    50         await server.stop()
    41 
    51 
    42 try:
    52 asyncio.run(main())
    43     while True:
    53 
    44         time.sleep(1)
       
    45         print testvar.get_value()
       
    46         sys.stdout.flush()
       
    47 finally:
       
    48     server.stop()
       
    49 EOF
    54 EOF
    50 SERVER_PID=$!
    55 SERVER_PID=$!
    51 
    56 
    52 # Start PLC with opcua test
    57 # Start PLC with opcua test
    53 setsid $BEREMIZPYTHONPATH $BEREMIZPATH/Beremiz_cli.py -k \
    58 setsid $BEREMIZPYTHONPATH $BEREMIZPATH/Beremiz_cli.py -k \