Source code for toast.cache

# Copyright (c) 2015-2020 by the parties listed in the AUTHORS file.
# All rights reserved.  Use of this source code is governed by
# a BSD-style license that can be found in the LICENSE file.

import re
import ctypes
import numpy as np

from .utils import (
    Logger,
    AlignedI8,
    AlignedU8,
    AlignedI16,
    AlignedU16,
    AlignedI32,
    AlignedU32,
    AlignedI64,
    AlignedU64,
    AlignedF32,
    AlignedF64,
)


[docs]class Cache(object): """Data cache with explicit memory management. This class acts as a dictionary of named arrays. Each array may be multi-dimensional. Args: pymem (bool): if True, use python memory rather than external allocations in C. Only used for testing. """ def __init__(self, pymem=False): self._pymem = pymem self._buffers = dict() self._dtypes = dict() self._shapes = dict() self._aliases = dict() def __getitem__(self, key): return self.reference(key) def __setitem__(self, key, value): self.put(key, value, replace=True) def __delitem__(self, key): self.destroy(key) def __contains__(self, key): return self.exists(key) def __len__(self): return len(self.keys()) def __iter__(self): class CacheIterator: def __init__(self, cache): self.cache = cache self.keys = cache.keys() def __iter__(self): return self def __next__(self): if len(self.keys) == 0: raise StopIteration key = self.keys.pop() return self.cache[key] return CacheIterator(self)
[docs] def clear(self, pattern=None): """Clear one or more buffers. Args: pattern (str): a regular expression to match against the buffer names when determining what should be cleared. If None, then all buffers are cleared. Returns: None """ if pattern is None: # free all buffers self._aliases.clear() self._buffers.clear() self._dtypes.clear() self._shapes.clear() else: pat = re.compile(pattern) names = list(self._buffers.keys()) matching = list() for n in names: mat = pat.match(n) if mat is not None: matching.append(n) for n in matching: self.destroy(n) return
[docs] def create(self, name, type, shape): """Create a named data buffer of the given type and shape. Args: name (str): the name to assign to the buffer. type (numpy.dtype): one of the supported numpy types. shape (tuple): a tuple containing the shape of the buffer. Returns: (array): a reference to the allocated array. """ log = Logger.get() if name is None: raise ValueError("Cache name cannot be None") if type is None: raise ValueError("Cache type cannot be None") if shape is None: raise ValueError("Cache shape cannot be None") if self.exists(name): raise RuntimeError("Data buffer or alias {} already exists".format(name)) ttype = np.dtype(type) flatshape = 1 for dim in shape: flatshape *= dim if self._pymem: self._buffers[name] = np.zeros(flatshape, dtype=ttype) else: if ttype.char == "b": self._buffers[name] = AlignedI8.zeros(flatshape) elif ttype.char == "B": self._buffers[name] = AlignedU8.zeros(flatshape) elif ttype.char == "h": self._buffers[name] = AlignedI16.zeros(flatshape) elif ttype.char == "H": self._buffers[name] = AlignedU16.zeros(flatshape) elif ttype.char == "i": self._buffers[name] = AlignedI32.zeros(flatshape) elif ttype.char == "I": self._buffers[name] = AlignedU32.zeros(flatshape) elif (ttype.char == "q") or (ttype.char == "l"): self._buffers[name] = AlignedI64.zeros(flatshape) elif (ttype.char == "Q") or (ttype.char == "L"): self._buffers[name] = AlignedU64.zeros(flatshape) elif ttype.char == "f": self._buffers[name] = AlignedF32.zeros(flatshape) elif ttype.char == "d": self._buffers[name] = AlignedF64.zeros(flatshape) else: msg = "Unsupported data typecode '{}'".format(ttype.char) log.error(msg) raise ValueError(msg) self._dtypes[name] = ttype self._shapes[name] = shape if self._pymem: return self._buffers[name].reshape(self._shapes[name]) else: return self._buffers[name].array().reshape(self._shapes[name]) return self._buffers[name]
[docs] def put(self, name, data, replace=False): """Create a named data buffer to hold the provided data. If replace is True, existing buffer of the same name is first destroyed. If replace is True and the name is an alias, it is promoted to a new data buffer. Args: name (str): the name to assign to the buffer. data (numpy.ndarray): Numpy array replace (bool): Overwrite any existing keys Returns: (array): a numpy array wrapping the raw data buffer. """ if name is None: raise ValueError("Cache name cannot be None") indata = data if self.exists(name): # This buffer already exists. Is the input data buffer actually # the same memory as the buffer already stored? If so, just # return a new reference. realname = name if name in self._aliases: realname = self._aliases[name] addr = None if self._pymem: p_ref = self._buffers[realname].ctypes.data_as(ctypes.c_void_p).value else: p_ref = self._buffers[realname].address() p_data = data.ctypes.data_as(ctypes.c_void_p).value # print("p_ref = {}, p_data = {}".format(p_ref, p_data), flush=True) if ( (p_ref == p_data) and (self._shapes[realname] == data.shape) and (self._dtypes[realname] == data.dtype) ): return self.reference(realname) if not replace: raise RuntimeError( "Cache buffer named {} already exists " "and replace is False.".format(name) ) # At this point we have an existing memory buffer or alias with # the same name, and which is not identical to the input. If this # is an alias, just delete it. if name in self._aliases: del self._aliases[name] else: # This existing data is not an alias. However, the input # might be a view into this existing memory. Before deleting # the existing data, we copy the input just in case. indata = np.array(data) self.destroy(name) # Now create the new buffer and copy in the data. ref = self.create(name, indata.dtype, indata.shape) np.copyto(ref, indata) return ref
[docs] def add_alias(self, alias, name): """Add an alias to a name that already exists in the cache. Args: alias (str): alias to create name (str): an existing key in the cache Returns: None """ if alias is None or name is None: raise ValueError("Cache name or alias cannot be None") names = list(self._buffers.keys()) if name not in names: raise RuntimeError( "Data buffer {} does not exist for alias {}".format(name, alias) ) if alias in names: raise RuntimeError( "Proposed alias {} would shadow existing buffer.".format(alias) ) self._aliases[alias] = name return
[docs] def destroy(self, name): """Deallocate the specified buffer. Only call this if all numpy arrays that reference the memory are out of use. If the specified name is an alias, then the alias is simply deleted. If the specified name is an actual buffer, then all aliases pointing to that buffer are also deleted. Args: name (str): the name of the buffer or alias to destroy. Returns: None """ if name in self._aliases.keys(): # Name is an alias. Do not remove the buffer del self._aliases[name] return names = list(self._buffers.keys()) if name not in names: raise KeyError("Data buffer {} does not exist".format(name)) # Remove aliases to the buffer aliases_to_remove = [] for key, value in self._aliases.items(): if value == name: aliases_to_remove.append(key) for key in aliases_to_remove: del self._aliases[key] # Forcibly resize this buffer to length zero if not self._pymem: self._buffers[name].clear() # Remove actual buffer del self._buffers[name] del self._dtypes[name] del self._shapes[name] return
[docs] def exists(self, name): """Check whether a buffer exists. Args: name (str): the name of the buffer to search for. Returns: (bool): True if a buffer or alias exists with the given name. """ if name in self._aliases: # We have an alias with this name, so it exists. return True names = list(self._buffers.keys()) if name in names: return True return False
[docs] def reference(self, name): """Return a numpy array pointing to the buffer. The returned array will wrap a pointer to the raw buffer, but will not claim ownership. When the numpy array is garbage collected, it will NOT attempt to free the memory (you must manually use the destroy method). Args: name (str): the name of the buffer to return. Returns: (array): a numpy array wrapping the raw data buffer. """ # First check that it exists if not self.exists(name): raise KeyError("Data buffer (nor alias) {} does not exist".format(name)) realname = name if name in self._aliases: # This is an alias realname = self._aliases[name] if self._pymem: return self._buffers[realname].reshape(self._shapes[realname]) else: return self._buffers[realname].array().reshape(self._shapes[realname])
[docs] def keys(self): """Return a list of all the keys in the cache. Returns: (list): List of key strings. """ return sorted(list(self._buffers.keys()))
[docs] def aliases(self): """Return a dictionary of all the aliases to keys in the cache. Returns: (dict): Dictionary of aliases. """ return self._aliases.copy()
[docs] def report(self, silent=False): """Report memory usage. Args: silent (bool): Count and return the memory without printing. Returns: (int): Amount of allocated memory in bytes """ log = Logger.get() if not silent: log.info("Cache memory usage:") tot = 0 for key in self.keys(): ref = self.reference(key) sz = ref.nbytes del ref tot += sz if not silent: log.info(" - {:25} {:5.2f} MB".format(key, sz / 2 ** 20)) if not silent: log.info(" {:27} {:5.2f} MB".format("TOTAL", tot / 2 ** 20)) return tot