MQTT: WIP, now connects to broker. Added MQTT 3 support and protocol version selection in IDE.
#
# Generated by erpcgen 1.12.0 on Mon May 20 17:53:37 2024.
#
# AUTOGENERATED - DO NOT EDIT
#
import erpc
from . import common, interface
# Client for BeremizPLCObjectService
class BeremizPLCObjectServiceService(erpc.server.Service):
def __init__(self, handler):
super(BeremizPLCObjectServiceService, self).__init__(interface.IBeremizPLCObjectService.SERVICE_ID)
self._handler = handler
self._methods = {
interface.IBeremizPLCObjectService.APPENDCHUNKTOBLOB_ID: self._handle_AppendChunkToBlob,
interface.IBeremizPLCObjectService.GETLOGMESSAGE_ID: self._handle_GetLogMessage,
interface.IBeremizPLCObjectService.GETPLCID_ID: self._handle_GetPLCID,
interface.IBeremizPLCObjectService.GETPLCSTATUS_ID: self._handle_GetPLCstatus,
interface.IBeremizPLCObjectService.GETTRACEVARIABLES_ID: self._handle_GetTraceVariables,
interface.IBeremizPLCObjectService.MATCHMD5_ID: self._handle_MatchMD5,
interface.IBeremizPLCObjectService.NEWPLC_ID: self._handle_NewPLC,
interface.IBeremizPLCObjectService.PURGEBLOBS_ID: self._handle_PurgeBlobs,
interface.IBeremizPLCObjectService.REPAIRPLC_ID: self._handle_RepairPLC,
interface.IBeremizPLCObjectService.RESETLOGCOUNT_ID: self._handle_ResetLogCount,
interface.IBeremizPLCObjectService.SEEDBLOB_ID: self._handle_SeedBlob,
interface.IBeremizPLCObjectService.SETTRACEVARIABLESLIST_ID: self._handle_SetTraceVariablesList,
interface.IBeremizPLCObjectService.STARTPLC_ID: self._handle_StartPLC,
interface.IBeremizPLCObjectService.STOPPLC_ID: self._handle_StopPLC,
}
def _handle_AppendChunkToBlob(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
newBlobID = erpc.Reference()
# Read incoming parameters.
data = codec.read_binary()
blobID = codec.read_binary()
# Invoke user implementation of remote function.
_result = self._handler.AppendChunkToBlob(data, blobID, newBlobID)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.APPENDCHUNKTOBLOB_ID,
sequence=sequence))
if newBlobID.value is None:
raise ValueError("newBlobID.value is None")
codec.write_binary(newBlobID.value)
codec.write_uint32(_result)
def _handle_GetLogMessage(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
message = erpc.Reference()
# Read incoming parameters.
level = codec.read_uint8()
msgID = codec.read_uint32()
# Invoke user implementation of remote function.
_result = self._handler.GetLogMessage(level, msgID, message)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.GETLOGMESSAGE_ID,
sequence=sequence))
if message.value is None:
raise ValueError("message.value is None")
message.value._write(codec)
codec.write_uint32(_result)
def _handle_GetPLCID(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
plcID = erpc.Reference()
# Read incoming parameters.
# Invoke user implementation of remote function.
_result = self._handler.GetPLCID(plcID)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.GETPLCID_ID,
sequence=sequence))
if plcID.value is None:
raise ValueError("plcID.value is None")
plcID.value._write(codec)
codec.write_uint32(_result)
def _handle_GetPLCstatus(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
status = erpc.Reference()
# Read incoming parameters.
# Invoke user implementation of remote function.
_result = self._handler.GetPLCstatus(status)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.GETPLCSTATUS_ID,
sequence=sequence))
if status.value is None:
raise ValueError("status.value is None")
status.value._write(codec)
codec.write_uint32(_result)
def _handle_GetTraceVariables(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
traces = erpc.Reference()
# Read incoming parameters.
debugToken = codec.read_uint32()
# Invoke user implementation of remote function.
_result = self._handler.GetTraceVariables(debugToken, traces)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.GETTRACEVARIABLES_ID,
sequence=sequence))
if traces.value is None:
raise ValueError("traces.value is None")
traces.value._write(codec)
codec.write_uint32(_result)
def _handle_MatchMD5(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
match = erpc.Reference()
# Read incoming parameters.
MD5 = codec.read_string()
# Invoke user implementation of remote function.
_result = self._handler.MatchMD5(MD5, match)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.MATCHMD5_ID,
sequence=sequence))
if match.value is None:
raise ValueError("match.value is None")
codec.write_bool(match.value)
codec.write_uint32(_result)
def _handle_NewPLC(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
success = erpc.Reference()
# Read incoming parameters.
md5sum = codec.read_string()
plcObjectBlobID = codec.read_binary()
_n0 = codec.start_read_list()
extrafiles = []
for _i0 in range(_n0):
_v0 = common.extra_file()._read(codec)
extrafiles.append(_v0)
# Invoke user implementation of remote function.
_result = self._handler.NewPLC(md5sum, plcObjectBlobID, extrafiles, success)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.NEWPLC_ID,
sequence=sequence))
if success.value is None:
raise ValueError("success.value is None")
codec.write_bool(success.value)
codec.write_uint32(_result)
def _handle_PurgeBlobs(self, sequence, codec):
# Read incoming parameters.
# Invoke user implementation of remote function.
_result = self._handler.PurgeBlobs()
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.PURGEBLOBS_ID,
sequence=sequence))
codec.write_uint32(_result)
def _handle_RepairPLC(self, sequence, codec):
# Read incoming parameters.
# Invoke user implementation of remote function.
_result = self._handler.RepairPLC()
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.REPAIRPLC_ID,
sequence=sequence))
codec.write_uint32(_result)
def _handle_ResetLogCount(self, sequence, codec):
# Read incoming parameters.
# Invoke user implementation of remote function.
_result = self._handler.ResetLogCount()
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.RESETLOGCOUNT_ID,
sequence=sequence))
codec.write_uint32(_result)
def _handle_SeedBlob(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
blobID = erpc.Reference()
# Read incoming parameters.
seed = codec.read_binary()
# Invoke user implementation of remote function.
_result = self._handler.SeedBlob(seed, blobID)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.SEEDBLOB_ID,
sequence=sequence))
if blobID.value is None:
raise ValueError("blobID.value is None")
codec.write_binary(blobID.value)
codec.write_uint32(_result)
def _handle_SetTraceVariablesList(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
debugtoken = erpc.Reference()
# Read incoming parameters.
_n0 = codec.start_read_list()
orders = []
for _i0 in range(_n0):
_v0 = common.trace_order()._read(codec)
orders.append(_v0)
# Invoke user implementation of remote function.
_result = self._handler.SetTraceVariablesList(orders, debugtoken)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.SETTRACEVARIABLESLIST_ID,
sequence=sequence))
if debugtoken.value is None:
raise ValueError("debugtoken.value is None")
codec.write_int32(debugtoken.value)
codec.write_uint32(_result)
def _handle_StartPLC(self, sequence, codec):
# Read incoming parameters.
# Invoke user implementation of remote function.
_result = self._handler.StartPLC()
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.STARTPLC_ID,
sequence=sequence))
codec.write_uint32(_result)
def _handle_StopPLC(self, sequence, codec):
# Create reference objects to pass into handler for out/inout parameters.
success = erpc.Reference()
# Read incoming parameters.
# Invoke user implementation of remote function.
_result = self._handler.StopPLC(success)
# Prepare codec for reply message.
codec.reset()
# Construct reply message.
codec.start_write_message(erpc.codec.MessageInfo(
type=erpc.codec.MessageType.kReplyMessage,
service=interface.IBeremizPLCObjectService.SERVICE_ID,
request=interface.IBeremizPLCObjectService.STOPPLC_ID,
sequence=sequence))
if success.value is None:
raise ValueError("success.value is None")
codec.write_bool(success.value)
codec.write_uint32(_result)