erpc_interface/erpc_PLCObject/server.py
author Edouard Tisserant <edouard@beremiz.fr>
Fri, 12 Jul 2024 11:24:06 +0200
changeset 3986 98bd0bb33ce4
parent 3946 8815b44be31e
child 4032 1ffff67678ad
permissions -rw-r--r--
MQTT: WIP, now connects to broker. Added MQTT 3 support and protocol version selection in IDE.
#
# Generated by erpcgen 1.12.0 on Mon May 20 17:53:37 2024.
#
# AUTOGENERATED - DO NOT EDIT
#

import erpc
from . import common, interface

# Client for BeremizPLCObjectService
class BeremizPLCObjectServiceService(erpc.server.Service):
    def __init__(self, handler):
        super(BeremizPLCObjectServiceService, self).__init__(interface.IBeremizPLCObjectService.SERVICE_ID)
        self._handler = handler
        self._methods = {
                interface.IBeremizPLCObjectService.APPENDCHUNKTOBLOB_ID: self._handle_AppendChunkToBlob,
                interface.IBeremizPLCObjectService.GETLOGMESSAGE_ID: self._handle_GetLogMessage,
                interface.IBeremizPLCObjectService.GETPLCID_ID: self._handle_GetPLCID,
                interface.IBeremizPLCObjectService.GETPLCSTATUS_ID: self._handle_GetPLCstatus,
                interface.IBeremizPLCObjectService.GETTRACEVARIABLES_ID: self._handle_GetTraceVariables,
                interface.IBeremizPLCObjectService.MATCHMD5_ID: self._handle_MatchMD5,
                interface.IBeremizPLCObjectService.NEWPLC_ID: self._handle_NewPLC,
                interface.IBeremizPLCObjectService.PURGEBLOBS_ID: self._handle_PurgeBlobs,
                interface.IBeremizPLCObjectService.REPAIRPLC_ID: self._handle_RepairPLC,
                interface.IBeremizPLCObjectService.RESETLOGCOUNT_ID: self._handle_ResetLogCount,
                interface.IBeremizPLCObjectService.SEEDBLOB_ID: self._handle_SeedBlob,
                interface.IBeremizPLCObjectService.SETTRACEVARIABLESLIST_ID: self._handle_SetTraceVariablesList,
                interface.IBeremizPLCObjectService.STARTPLC_ID: self._handle_StartPLC,
                interface.IBeremizPLCObjectService.STOPPLC_ID: self._handle_StopPLC,
            }

    def _handle_AppendChunkToBlob(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        newBlobID = erpc.Reference()

        # Read incoming parameters.
        data = codec.read_binary()
        blobID = codec.read_binary()

        # Invoke user implementation of remote function.
        _result = self._handler.AppendChunkToBlob(data, blobID, newBlobID)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.APPENDCHUNKTOBLOB_ID,
            sequence=sequence))
        if newBlobID.value is None:
            raise ValueError("newBlobID.value is None")
        codec.write_binary(newBlobID.value)
        codec.write_uint32(_result)

    def _handle_GetLogMessage(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        message = erpc.Reference()

        # Read incoming parameters.
        level = codec.read_uint8()
        msgID = codec.read_uint32()

        # Invoke user implementation of remote function.
        _result = self._handler.GetLogMessage(level, msgID, message)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.GETLOGMESSAGE_ID,
            sequence=sequence))
        if message.value is None:
            raise ValueError("message.value is None")
        message.value._write(codec)
        codec.write_uint32(_result)

    def _handle_GetPLCID(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        plcID = erpc.Reference()

        # Read incoming parameters.

        # Invoke user implementation of remote function.
        _result = self._handler.GetPLCID(plcID)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.GETPLCID_ID,
            sequence=sequence))
        if plcID.value is None:
            raise ValueError("plcID.value is None")
        plcID.value._write(codec)
        codec.write_uint32(_result)

    def _handle_GetPLCstatus(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        status = erpc.Reference()

        # Read incoming parameters.

        # Invoke user implementation of remote function.
        _result = self._handler.GetPLCstatus(status)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.GETPLCSTATUS_ID,
            sequence=sequence))
        if status.value is None:
            raise ValueError("status.value is None")
        status.value._write(codec)
        codec.write_uint32(_result)

    def _handle_GetTraceVariables(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        traces = erpc.Reference()

        # Read incoming parameters.
        debugToken = codec.read_uint32()

        # Invoke user implementation of remote function.
        _result = self._handler.GetTraceVariables(debugToken, traces)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.GETTRACEVARIABLES_ID,
            sequence=sequence))
        if traces.value is None:
            raise ValueError("traces.value is None")
        traces.value._write(codec)
        codec.write_uint32(_result)

    def _handle_MatchMD5(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        match = erpc.Reference()

        # Read incoming parameters.
        MD5 = codec.read_string()

        # Invoke user implementation of remote function.
        _result = self._handler.MatchMD5(MD5, match)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.MATCHMD5_ID,
            sequence=sequence))
        if match.value is None:
            raise ValueError("match.value is None")
        codec.write_bool(match.value)
        codec.write_uint32(_result)

    def _handle_NewPLC(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        success = erpc.Reference()

        # Read incoming parameters.
        md5sum = codec.read_string()
        plcObjectBlobID = codec.read_binary()
        _n0 = codec.start_read_list()
        extrafiles = []
        for _i0 in range(_n0):
            _v0 = common.extra_file()._read(codec)
            extrafiles.append(_v0)


        # Invoke user implementation of remote function.
        _result = self._handler.NewPLC(md5sum, plcObjectBlobID, extrafiles, success)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.NEWPLC_ID,
            sequence=sequence))
        if success.value is None:
            raise ValueError("success.value is None")
        codec.write_bool(success.value)
        codec.write_uint32(_result)

    def _handle_PurgeBlobs(self, sequence, codec):
        # Read incoming parameters.

        # Invoke user implementation of remote function.
        _result = self._handler.PurgeBlobs()

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.PURGEBLOBS_ID,
            sequence=sequence))
        codec.write_uint32(_result)

    def _handle_RepairPLC(self, sequence, codec):
        # Read incoming parameters.

        # Invoke user implementation of remote function.
        _result = self._handler.RepairPLC()

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.REPAIRPLC_ID,
            sequence=sequence))
        codec.write_uint32(_result)

    def _handle_ResetLogCount(self, sequence, codec):
        # Read incoming parameters.

        # Invoke user implementation of remote function.
        _result = self._handler.ResetLogCount()

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.RESETLOGCOUNT_ID,
            sequence=sequence))
        codec.write_uint32(_result)

    def _handle_SeedBlob(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        blobID = erpc.Reference()

        # Read incoming parameters.
        seed = codec.read_binary()

        # Invoke user implementation of remote function.
        _result = self._handler.SeedBlob(seed, blobID)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.SEEDBLOB_ID,
            sequence=sequence))
        if blobID.value is None:
            raise ValueError("blobID.value is None")
        codec.write_binary(blobID.value)
        codec.write_uint32(_result)

    def _handle_SetTraceVariablesList(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        debugtoken = erpc.Reference()

        # Read incoming parameters.
        _n0 = codec.start_read_list()
        orders = []
        for _i0 in range(_n0):
            _v0 = common.trace_order()._read(codec)
            orders.append(_v0)


        # Invoke user implementation of remote function.
        _result = self._handler.SetTraceVariablesList(orders, debugtoken)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.SETTRACEVARIABLESLIST_ID,
            sequence=sequence))
        if debugtoken.value is None:
            raise ValueError("debugtoken.value is None")
        codec.write_int32(debugtoken.value)
        codec.write_uint32(_result)

    def _handle_StartPLC(self, sequence, codec):
        # Read incoming parameters.

        # Invoke user implementation of remote function.
        _result = self._handler.StartPLC()

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.STARTPLC_ID,
            sequence=sequence))
        codec.write_uint32(_result)

    def _handle_StopPLC(self, sequence, codec):
        # Create reference objects to pass into handler for out/inout parameters.
        success = erpc.Reference()

        # Read incoming parameters.

        # Invoke user implementation of remote function.
        _result = self._handler.StopPLC(success)

        # Prepare codec for reply message.
        codec.reset()

        # Construct reply message.
        codec.start_write_message(erpc.codec.MessageInfo(
            type=erpc.codec.MessageType.kReplyMessage,
            service=interface.IBeremizPLCObjectService.SERVICE_ID,
            request=interface.IBeremizPLCObjectService.STOPPLC_ID,
            sequence=sequence))
        if success.value is None:
            raise ValueError("success.value is None")
        codec.write_bool(success.value)
        codec.write_uint32(_result)