controls/DebugVariablePanel/RingBuffer.py
author Edouard Tisserant <edouard@beremiz.fr>
Mon, 30 Sep 2024 16:21:11 +0200
changeset 4023 b344393859df
parent 3916 6ca1adad3f0e
permissions -rw-r--r--
MQTT: Add status global variable MQTT_STATUS_n, one per MQTT client
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# This file is part of Beremiz
# Copyright (C) 2021: Edouard TISSERANT
#
# See COPYING file for copyrights details.

# Based on Eelco Hoogendoorn stackoverflow answer about RingBuffer with numpy

import numpy as np


class RingBuffer(object):
    def __init__(self, width=None, size=131072, padding=None):
        self.size = size
        self.padding = size if padding is None else padding
        shape = (self.size+self.padding,)
        if width :
            shape += (width,)
        self.buffer = np.zeros(shape)
        self.cursor = 0

    def append(self, data):
        """this is an O(n) operation"""
        data = data[-self.size:]
        n = len(data)
        if self.size + self.padding - self.cursor < n:
            self.compact()
        self.buffer[self.cursor:][:n] = data
        self.cursor += n

    @property
    def count(self):
        return min(self.size, self.cursor)

    @property
    def view(self):
        """this is always an O(1) operation"""
        return self.buffer[max(0, self.cursor - self.size):][:self.count]

    def compact(self):
        """
        note: only when this function is called, is an O(size) performance hit incurred,
        and this cost is amortized over the whole padding space
        """
        self.buffer[:self.count] = self.view
        self.cursor -= self.size