Source code for toast.ops.copy
# Copyright (c) 2015-2020 by the parties listed in the AUTHORS file.
# All rights reserved. Use of this source code is governed by
# a BSD-style license that can be found in the LICENSE file.
import traitlets
from ..mpi import MPI
from ..timing import function_timer
from ..traits import Int, List, trait_docs
from ..utils import Logger
from .operator import Operator
[docs]@trait_docs
class Copy(Operator):
"""Class to copy data.
This operator takes lists of shared, detdata, and meta keys to copy to a new
location in each observation.
Each list contains tuples specifying the input and output key names.
"""
# Class traits
API = Int(0, help="Internal interface version for this operator")
meta = List([], help="List of tuples of Observation meta keys to copy")
detdata = List([], help="List of tuples of Observation detdata keys to copy")
shared = List([], help="List of tuples of Observation shared keys to copy")
intervals = List(
[],
help="List of tuples of Observation intervals keys to copy",
)
@traitlets.validate("meta")
def _check_meta(self, proposal):
val = proposal["value"]
for v in val:
if not isinstance(v, (tuple, list)):
raise traitlets.TraitError("trait should be a list of tuples")
if len(v) != 2:
raise traitlets.TraitError("key tuples should have 2 values")
if not isinstance(v[0], str) or not isinstance(v[1], str):
raise traitlets.TraitError("key tuples should have string values")
return val
@traitlets.validate("detdata")
def _check_detdata(self, proposal):
val = proposal["value"]
for v in val:
if not isinstance(v, (tuple, list)):
raise traitlets.TraitError("trait should be a list of tuples")
if len(v) != 2:
raise traitlets.TraitError("key tuples should have 2 values")
if not isinstance(v[0], str) or not isinstance(v[1], str):
raise traitlets.TraitError("key tuples should have string values")
return val
@traitlets.validate("shared")
def _check_shared(self, proposal):
val = proposal["value"]
for v in val:
if not isinstance(v, (tuple, list)):
raise traitlets.TraitError("trait should be a list of tuples")
if len(v) != 2:
raise traitlets.TraitError("key tuples should have 2 values")
if not isinstance(v[0], str) or not isinstance(v[1], str):
raise traitlets.TraitError("key tuples should have string values")
return val
def __init__(self, **kwargs):
super().__init__(**kwargs)
@function_timer
def _exec(self, data, detectors=None, **kwargs):
log = Logger.get()
for ob in data.obs:
for in_key, out_key in self.meta:
if out_key in ob:
# The key exists- issue a warning before overwriting.
msg = "Observation key {} already exists- overwriting".format(
out_key
)
log.warning(msg)
ob[out_key] = ob[in_key]
for in_key, out_key in self.shared:
# Although this is an internal function, the input arguments come
# from existing shared objects and so should already be valid.
ob.shared.assign_mpishared(
out_key, ob.shared[in_key], ob.shared.comm_type(in_key)
)
if len(self.detdata) > 0:
# Get the detectors we are using for this observation
dets = ob.select_local_detectors(detectors)
if len(dets) == 0:
# Nothing to do for this observation
continue
for in_key, out_key in self.detdata:
if out_key in ob.detdata:
# The key exists- verify that dimensions / dtype match
in_dtype = ob.detdata[in_key].dtype
out_dtype = ob.detdata[out_key].dtype
if out_dtype != in_dtype:
msg = f"Cannot copy to existing detdata key {out_key}"
msg += f" with different dtype ({out_dtype}) != {in_dtype}"
log.error(msg)
raise RuntimeError(msg)
in_shape = ob.detdata[in_key].detector_shape
out_shape = ob.detdata[out_key].detector_shape
if out_shape != in_shape:
msg = f"Cannot copy to existing detdata key {out_key}"
msg += f" with different detector shape ({out_shape})"
msg += f" != {in_shape}"
log.error(msg)
raise RuntimeError(msg)
if ob.detdata[out_key].detectors != dets:
# The output has a different set of detectors. Reallocate.
ob.detdata[out_key].change_detectors(dets)
# Copy units
ob.detdata[out_key].update_units(ob.detdata[in_key].units)
else:
sample_shape = None
shp = ob.detdata[in_key].detector_shape
if len(shp) > 1:
sample_shape = shp[1:]
ob.detdata.create(
out_key,
sample_shape=sample_shape,
dtype=ob.detdata[in_key].dtype,
detectors=dets,
units=ob.detdata[in_key].units,
)
# Copy detector data
for d in dets:
ob.detdata[out_key][d, :] = ob.detdata[in_key][d, :]
return
def _finalize(self, data, **kwargs):
return None
def _requires(self):
req = dict()
if self.meta is not None:
req["meta"] = [x[0] for x in self.meta]
if self.detdata is not None:
req["detdata"] = [x[0] for x in self.detdata]
if self.shared is not None:
req["shared"] = [x[0] for x in self.shared]
if self.intervals is not None:
req["intervals"] = [x[0] for x in self.intervals]
return req
def _provides(self):
prov = dict()
if self.meta is not None:
prov["meta"] = [x[1] for x in self.meta]
if self.detdata is not None:
prov["detdata"] = [x[1] for x in self.detdata]
if self.shared is not None:
prov["shared"] = [x[1] for x in self.shared]
if self.intervals is not None:
prov["intervals"] = [x[1] for x in self.intervals]
return prov