Edouard@3257: #!/usr/bin/env python Edouard@3257: # -*- coding: utf-8 -*- Edouard@3257: Edouard@3257: # This file is part of Beremiz Edouard@3257: # Copyright (C) 2021: Edouard TISSERANT Edouard@3257: # Edouard@3257: # See COPYING file for copyrights details. Edouard@3257: Edouard@3257: # Based on Eelco Hoogendoorn stackoverflow answer about RingBuffer with numpy Edouard@3257: Edouard@3257: import numpy as np Edouard@3257: Edouard@3257: Edouard@3257: class RingBuffer(object): Edouard@3257: def __init__(self, width=None, size=65536, padding=None): Edouard@3257: self.size = size Edouard@3257: self.padding = size if padding is None else padding Edouard@3257: shape = (self.size+self.padding,) Edouard@3257: if width : Edouard@3257: shape += (width,) Edouard@3257: self.buffer = np.zeros(shape) Edouard@3257: self.counter = 0 Edouard@3257: self.full = False Edouard@3257: Edouard@3257: def append(self, data): Edouard@3257: """this is an O(n) operation""" Edouard@3257: data = data[-self.padding:] Edouard@3257: n = len(data) Edouard@3257: if self.remaining < n: self.compact() Edouard@3257: self.buffer[self.counter+self.size:][:n] = data Edouard@3257: self.counter += n Edouard@3257: Edouard@3257: @property Edouard@3257: def count(self): Edouard@3257: return self.counter if not self.full else self.size Edouard@3257: Edouard@3257: @property Edouard@3257: def remaining(self): Edouard@3257: return self.padding-self.counter Edouard@3257: Edouard@3257: @property Edouard@3257: def view(self): Edouard@3257: """this is always an O(1) operation""" Edouard@3257: return self.buffer[self.counter:][:self.size] Edouard@3257: Edouard@3257: def compact(self): Edouard@3257: """ Edouard@3257: note: only when this function is called, is an O(size) performance hit incurred, Edouard@3257: and this cost is amortized over the whole padding space Edouard@3257: """ Edouard@3257: print 'compacting' Edouard@3257: self.buffer[:self.size] = self.view Edouard@3257: self.counter = 0 Edouard@3257: self.full = True Edouard@3257: