Source code for toast.tod.tod_math

# Copyright (c) 2015-2020 by the parties listed in the AUTHORS file.
# All rights reserved.  Use of this source code is governed by
# a BSD-style license that can be found in the LICENSE file.

import numpy as np

import scipy.interpolate as si
from scipy.signal import fftconvolve

from ..op import Operator

from ..timing import function_timer

from ..utils import Logger, AlignedF64

from .. import rng as rng

from .._libtoast import tod_sim_noise_timestream


class OpCacheInit(Operator):
    """This operator initializes cache objects with the specified value"""

    def __init__(self, init_val=0, name=None):
        """
        Args:
            init_val (float) :  initial value to use
            name (str) :  cache prefix to use, can be None
        """
        self.init_val = init_val
        self.name = name

    def exec(self, data):
        """Create cache objects as <self.name>_<detector> and initialize
        them to self.init_val
        """
        for obs in data.obs:
            tod = obs["tod"]
            offset, nsamp = tod.local_samples
            for det in tod.local_dets:
                try:
                    # Initialize an existing cache object
                    tod.local_signal(det, self.name)[:] = self.init_val
                except:
                    # Object does not exist yet, create
                    cachename = "{}_{}".format(self.name, det)
                    tod.cache.put(
                        cachename,
                        np.full(nsamp, self.init_val, dtype=np.float64),
                        replace=True,
                    )
        return


[docs]class OpFlagsApply(Operator): """This operator sets the flagged signal values to zero""" def __init__( self, name=None, common_flags=None, flags=None, common_flag_mask=1, flag_mask=1 ): """ Args: name (str) : cache prefix to apply the flags common_flags (str) : cache_name to use for common flags flags (str) : cache prefix to use for detector flags common_flag_mask (uint8) : bit pattern to check the common flags against flag_mask (uint8) : bit pattern to check the detector flags against """ self.name = name self.common_flags = common_flags self.flags = flags self.common_flag_mask = common_flag_mask self.flag_mask = flag_mask
[docs] def exec(self, data): for obs in data.obs: tod = obs["tod"] common_flags = tod.local_common_flags(self.common_flags) common_flags = (common_flags & self.common_flag_mask) != 0 for det in tod.local_dets: flags = tod.local_flags(det, self.flags) flags = (flags & self.flag_mask) != 0 flags[common_flags] = True signal = tod.local_signal(det, self.name) signal[flags] = 0 return
@function_timer def calibrate(toitimes, toi, gaintimes, gains, order=0, inplace=False): """Interpolate the gains to TOI samples and apply them. Args: toitimes (float): Increasing TOI sample times in same units as gaintimes toi (float): TOI samples to calibrate gaintimes (float): Increasing timestamps of the gain values in same units as toitimes gains (float): Multiplicative gains order (int): Gain interpolation order. 0 means steps at the gain times, all other are polynomial interpolations. inplace (bool): Overwrite input TOI. Returns: calibrated timestream. """ if len(gaintimes) == 1: g = gains else: if order == 0: ind = np.searchsorted(gaintimes, toitimes, side="right") - 1 g = gains[ind] else: if len(gaintimes) <= order: order = len(gaintimes) - 1 p = np.polyfit(gaintimes, gains, order) g = np.polyval(p, toitimes) if inplace: toi_out = toi else: toi_out = np.zeros_like(toi) toi_out[:] = toi * g return toi_out @function_timer def sim_noise_timestream( realization, telescope, component, obsindx, detindx, rate, firstsamp, samples, oversample, freq, psd, py=False, ): """Generate a noise timestream, given a starting RNG state. Use the RNG parameters to generate unit-variance Gaussian samples and then modify the Fourier domain amplitudes to match the desired PSD. The RNG (Threefry2x64 from Random123) takes a "key" and a "counter" which each consist of two unsigned 64bit integers. These four numbers together uniquely identify a single sample. We construct those four numbers in the following way: key1 = realization * 2^32 + telescope * 2^16 + component key2 = obsindx * 2^32 + detindx counter1 = currently unused (0) counter2 = sample in stream counter2 is incremented internally by the RNG function as it calls the underlying Random123 library for each sample. Args: realization (int): the Monte Carlo realization. telescope (int): a unique index assigned to a telescope. component (int): a number representing the type of timestream we are generating (detector noise, common mode noise, atmosphere, etc). obsindx (int): the global index of this observation. detindx (int): the global index of this detector. rate (float): the sample rate. firstsamp (int): the start sample in the stream. samples (int): the number of samples to generate. oversample (int): the factor by which to expand the FFT length beyond the number of samples. freq (array): the frequency points of the PSD. psd (array): the PSD values. py (bool): if True, use a pure-python implementation. This is useful for testing. If True, also return the interpolated PSD. Returns: (array): the noise timestream. If py=True, returns a tuple of timestream, interpolated frequencies, and interpolated PSD. """ tdata = None if py: fftlen = 2 while fftlen <= (oversample * samples): fftlen *= 2 npsd = fftlen // 2 + 1 norm = rate * float(npsd - 1) interp_freq = np.fft.rfftfreq(fftlen, 1 / rate) if interp_freq.size != npsd: raise RuntimeError( "interpolated PSD frequencies do not have expected length" ) # Ensure that the input frequency range includes all the frequencies # we need. Otherwise the extrapolation is not well defined. if np.amin(freq) < 0.0: raise RuntimeError("input PSD frequencies should be >= zero") if np.amin(psd) < 0.0: raise RuntimeError("input PSD values should be >= zero") increment = rate / fftlen if freq[0] > increment: raise RuntimeError( "input PSD does not go to low enough frequency to " "allow for interpolation" ) nyquist = rate / 2 if np.abs((freq[-1] - nyquist) / nyquist) > 0.01: raise RuntimeError( "last frequency element does not match Nyquist " "frequency for given sample rate: {} != {}".format(freq[-1], nyquist) ) # Perform a logarithmic interpolation. In order to avoid zero values, we # shift the PSD by a fixed amount in frequency and amplitude. psdshift = 0.01 * np.amin(psd[(psd > 0.0)]) freqshift = increment loginterp_freq = np.log10(interp_freq + freqshift) logfreq = np.log10(freq + freqshift) logpsd = np.log10(psd + psdshift) interp = si.interp1d(logfreq, logpsd, kind="linear", fill_value="extrapolate") loginterp_psd = interp(loginterp_freq) interp_psd = np.power(10.0, loginterp_psd) - psdshift # Zero out DC value interp_psd[0] = 0.0 scale = np.sqrt(interp_psd * norm) # gaussian Re/Im randoms, packed into a complex valued array key1 = realization * 4294967296 + telescope * 65536 + component key2 = obsindx * 4294967296 + detindx counter1 = 0 counter2 = firstsamp * oversample rngdata = rng.random( fftlen, sampler="gaussian", key=(key1, key2), counter=(counter1, counter2) ).array() fdata = np.zeros(npsd, dtype=np.complex) # Set the DC and Nyquist frequency imaginary part to zero fdata[0] = rngdata[0] + 0.0j fdata[-1] = rngdata[npsd - 1] + 0.0j # Repack the other values. fdata[1:-1] = rngdata[1 : npsd - 1] + 1j * rngdata[-1 : npsd - 1 : -1] # scale by PSD fdata *= scale # inverse FFT tdata = np.fft.irfft(fdata) # subtract the DC level- for just the samples that we are returning offset = (fftlen - samples) // 2 DC = np.mean(tdata[offset : offset + samples]) tdata[offset : offset + samples] -= DC return (tdata[offset : offset + samples], interp_freq, interp_psd) else: tdata = AlignedF64(samples) tod_sim_noise_timestream( realization, telescope, component, obsindx, detindx, rate, firstsamp, oversample, freq.astype(np.float64), psd.astype(np.float64), tdata, ) return tdata.array() class OpCacheCopy(Operator): """Operator which copies sets of timestreams between cache locations. This simply copies data from one set of per-detector cache objects to another set. At some point we will likely move away from persistent caching of intermediate timestreams and this operator will become irrelevant. Args: in (str): use cache objects with name <in>_<detector>. out (str): copy data to the cache with name <out>_<detector>. If the named cache objects do not exist, then they are created. force (bool): force creating the target cache object. """ def __init__(self, input, output, force=False): # Call the parent class constructor. super().__init__() self._in = input self._out = output self._force = force @function_timer def exec(self, data): """Copy timestreams. This iterates over all observations and detectors and copies cache objects whose names match the specified pattern. Args: data (toast.Data): The distributed data. """ for obs in data.obs: tod = obs["tod"] for det in tod.local_dets: inref = tod.local_signal(det, self._in) outname = "{}_{}".format(self._out, det) outref = tod.cache.put(outname, inref, replace=self._force) del outref del inref return class OpCacheClear(Operator): """Operator which destroys cache objects matching the given pattern. Args: name (str): use cache objects with name <name>_<detector>. """ def __init__(self, name): # Call the parent class constructor. super().__init__() self._name = name @function_timer def exec(self, data): """Clear timestreams. This iterates over all observations and detectors and clears cache objects whose names match the specified pattern. Args: data (toast.Data): The distributed data. """ for obs in data.obs: tod = obs["tod"] for det in tod.local_dets: # if the cache object exists, destroy it name = "{}_{}".format(self._name, det) if tod.cache.exists(name): tod.cache.destroy(name) return @function_timer def flagged_running_average( signal, flag, wkernel, return_flags=False, downsample=False ): """Compute a running average considering only the unflagged samples. Args: signal (float) flag (bool) wkernel (int): Running average width return_flags (bool): If true, also return flags which are a subset of the input flags. downsample (bool): If True, return a downsampled version of the filtered timestream. Returns: (array or tuple): The filtered signal and optionally the flags. """ if len(signal) != len(flag): raise Exception("Signal and flag lengths do not match.") bad = flag != 0 masked_signal = signal.copy() masked_signal[bad] = 0 good = np.ones(len(signal), dtype=np.float64) good[bad] = 0 kernel = np.ones(wkernel, dtype=np.float64) filtered_signal = fftconvolve(masked_signal, kernel, mode="same") filtered_hits = fftconvolve(good, kernel, mode="same") hit = filtered_hits > 0.1 nothit = np.logical_not(hit) filtered_signal[hit] /= filtered_hits[hit] filtered_signal[nothit] = 0 if return_flags or downsample: filtered_flags = np.zeros_like(flag) filtered_flags[nothit] = True if downsample: good = filtered_flags == 0 if return_flags: filtered_flags[good][::wkernel] filtered_signal[good][::wkernel] if return_flags: return filtered_signal, filtered_flags else: return filtered_signal