Source code for amari.cache

import asyncio
import json
import time
from collections import OrderedDict
from typing import Any, Optional, Tuple


class CacheEntry:
    __slots__ = ("data", "timestamp", "size")

    def __init__(self, data: Any, timestamp: float, size: int):
        self.data = data
        self.timestamp = timestamp
        self.size = size


[docs] class Cache: """ A simple LRU cache with TTL and size limit. Attributes ---------- ttl: int Time to live for cache entries, in seconds. maxbytes: int Maximum total size of cached data in bytes. """
[docs] def __init__(self, ttl: int, maxbytes: int = 25 * 1024 * 1024): # 25 MiB self.ttl = ttl self.maxbytes = maxbytes self.cache: OrderedDict[Tuple, CacheEntry] = OrderedDict() self.total_size = 0 self.lock = asyncio.Lock()
[docs] async def get(self, key: Tuple) -> Optional[Any]: async with self.lock: self._remove_expired_entries() entry = self.cache.get(key) if entry: if time.time() - entry.timestamp < self.ttl: self.cache.move_to_end(key) return entry.data else: self._remove_entry(key) return None
[docs] async def set(self, key: Tuple, data: Any): async with self.lock: size = len(json.dumps(data).encode("utf-8")) entry = CacheEntry(data, time.time(), size) self.cache[key] = entry self.cache.move_to_end(key) self.total_size += size self._remove_expired_entries() await self._enforce_size_limit()
def _remove_entry(self, key: Tuple): entry = self.cache.pop(key, None) if entry: self.total_size -= entry.size def _remove_expired_entries(self): current_time = time.time() keys_to_remove = [ key for key, entry in self.cache.items() if current_time - entry.timestamp >= self.ttl ] for key in keys_to_remove: self._remove_entry(key) async def _enforce_size_limit(self): while self.total_size > self.maxbytes: key, entry = self.cache.popitem(last=False) self.total_size -= entry.size await asyncio.sleep(0)