# Copyright (c) 2015-2020 by the parties listed in the AUTHORS file.
# All rights reserved. Use of this source code is governed by
# a BSD-style license that can be found in the LICENSE file.
from datetime import datetime, timedelta, timezone
import numpy as np
import traitlets
from astropy import units as u
from scipy.constants import degree
from .. import qarray as qa
from ..dist import distribute_discrete
from ..healpix import ang2vec
from ..instrument import Session, Telescope
from ..noise_sim import AnalyticNoise
from ..observation import Observation
from ..observation import default_values as defaults
from ..schedule import SatelliteSchedule
from ..timing import Timer, function_timer
from ..traits import Bool, Float, Instance, Int, Quantity, Unicode, Unit, trait_docs
from ..utils import Environment, Logger, name_UID, rate_from_times
from .operator import Operator
from .sim_hwp import simulate_hwp_response
@function_timer
def satellite_scanning(
ob,
ob_key,
sample_offset=0,
q_prec=None,
spin_period=1.0 * u.minute,
spin_angle=85.0 * u.degree,
prec_period=0.0 * u.minute,
prec_angle=0.0 * u.degree,
):
"""Generate boresight quaternions for a generic satellite.
Given scan strategy parameters and the relevant angles and rates, generate an array
of quaternions representing the rotation of the ecliptic coordinate axes to the
boresight.
The boresight / focalplane frame has the Z-axis pointed along the line of sight
and has the Y-axis oriented to be parallel to the scan direction.
In terms of relative rotations, this function:
- Rotates the ecliptic Z-axis to precession axis
- Rotates about the precession axis
- Rotates by the opening angle to the spin axis
- Rotates about the spin axis
- Rotates by the opening angle to the boresight line-of-sight
- Rotates by PI/2 about the line of sight to match the focalplane conventions
used internally in TOAST.
Args:
ob (Observation): The observation to populate.
ob_key (str): The observation shared key to create.
sample_offset (int): The global offset in samples from the start of the
mission.
q_prec (ndarray): If None (the default), then the precession axis will be fixed
along the ecliptic X-axis. If a 1D array of size 4 is given, This will be
the fixed quaternion used to rotate the ecliptic Z-axis to the precession
axis. If a 2D array of shape (n_samp, 4) is given, this is the
time-varying rotation of the ecliptic Z-axis to the precession axis.
spin_period (Quantity): The period of the rotation about the spin axis.
spin_angle (Quantity): The opening angle of the boresight from the spin axis.
prec_period (Quantity): The period of the rotation about the precession axis.
prec_angle (Quantity): The opening angle of the spin axis from the precession
axis.
"""
env = Environment.get()
tod_buffer_length = env.tod_buffer_length()
first_samp = ob.local_index_offset
n_samp = ob.n_local_samples
ob.shared.create_column(ob_key, shape=(n_samp, 4), dtype=np.float64)
# Temporary buffer
boresight = None
# Only the first process in each grid column simulates the shared boresight data
if ob.comm_col_rank == 0:
boresight = np.zeros((n_samp, 4), dtype=np.float64)
# Compute effective sample rate
(sample_rate, dt, _, _, _) = rate_from_times(ob.shared["times"])
spin_rate = 1.0 / spin_period.to_value(u.second)
spin_angle_rad = spin_angle.to_value(u.radian)
prec_rate = 1.0 / prec_period.to_value(u.second)
prec_angle_rad = prec_angle.to_value(u.radian)
xaxis = np.array([1, 0, 0], dtype=np.float64)
yaxis = np.array([0, 1, 0], dtype=np.float64)
zaxis = np.array([0, 0, 1], dtype=np.float64)
if q_prec is not None:
if (q_prec.shape[0] != n_samp) or (q_prec.shape[1] != 4):
raise RuntimeError("q_prec array has wrong dimensions")
buf_off = 0
buf_n = tod_buffer_length
while buf_off < n_samp:
if buf_off + buf_n > n_samp:
buf_n = n_samp - buf_off
bslice = slice(buf_off, buf_off + buf_n)
# Rotation of the Ecliptic coordinate axis to the precession axis
satrot = np.empty((buf_n, 4), np.float64)
if q_prec is None:
# in this case, we just have a fixed precession axis, pointing
# along the ecliptic X axis.
satrot[:, :] = np.tile(qa.rotation(yaxis, np.pi / 2), buf_n).reshape(
-1, 4
)
elif q_prec.flatten().shape[0] == 4:
# we have a fixed precession axis.
satrot[:, :] = np.tile(q_prec.flatten(), buf_n).reshape(-1, 4)
else:
# we have full vector of quaternions
satrot[:, :] = q_prec[bslice, :]
# Time-varying rotation about precession axis. Increment per sample is
# (2pi radians) X (precrate) / (samplerate)
precang = np.arange(buf_n, dtype=np.float64)
precang += float(buf_off + first_samp + sample_offset)
precang *= prec_rate / sample_rate
precang = 2.0 * np.pi * (precang - np.floor(precang))
precrot = qa.rotation(zaxis, precang)
# Rotation which performs the precession opening angle
precopen = qa.rotation(xaxis, prec_angle_rad)
# Time-varying rotation about spin axis. Increment per sample is
# (2pi radians) X (spinrate) / (samplerate)
spinang = np.arange(buf_n, dtype=np.float64)
spinang += float(buf_off + first_samp + sample_offset)
spinang *= spin_rate / sample_rate
spinang = 2.0 * np.pi * (spinang - np.floor(spinang))
spinrot = qa.rotation(zaxis, spinang)
# Rotation which performs the spin axis opening angle
spinopen = qa.rotation(xaxis, spin_angle_rad)
# Rotation of focalplane by PI/2
fprot = qa.rotation(zaxis, 0.5 * np.pi)
# Compose the final rotation. These are relative rotations, so note
# the order.
boresight[bslice, :] = qa.mult(
satrot,
qa.mult(
precrot,
qa.mult(precopen, qa.mult(spinrot, qa.mult(spinopen, fprot))),
),
)
buf_off += buf_n
ob.shared[ob_key].set(boresight, offset=(0, 0), fromrank=0)
return
[docs]@trait_docs
class SimSatellite(Operator):
"""Simulate a generic satellite motion.
This simulates satellite pointing in regular intervals ("science scans") that
may have some gaps in between for cooler cycles or other events. The precession
axis (anti-sun direction) is continuously slewed.
"""
# Class traits
API = Int(0, help="Internal interface version for this operator")
telescope = Instance(
klass=Telescope, allow_none=True, help="This must be an instance of a Telescope"
)
schedule = Instance(
klass=SatelliteSchedule, allow_none=True, help="Instance of a SatelliteSchedule"
)
spin_angle = Quantity(
30.0 * u.degree, help="The opening angle of the boresight from the spin axis"
)
prec_angle = Quantity(
65.0 * u.degree,
help="The opening angle of the spin axis from the precession axis",
)
hwp_rpm = Float(None, allow_none=True, help="The rate (in RPM) of the HWP rotation")
hwp_step = Quantity(
None, allow_none=True, help="For stepped HWP, the angle of each step"
)
hwp_step_time = Quantity(
None, allow_none=True, help="For stepped HWP, the time between steps"
)
distribute_time = Bool(
False,
help="Distribute observation data along the time axis rather than detector axis",
)
detset_key = Unicode(
None,
allow_none=True,
help="If specified, use this column of the focalplane detector_data to group detectors",
)
times = Unicode(defaults.times, help="Observation shared key for timestamps")
shared_flags = Unicode(
defaults.shared_flags,
allow_none=True,
help="Observation shared key for common flags",
)
hwp_angle = Unicode(
None, allow_none=True, help="Observation shared key for HWP angle"
)
boresight = Unicode(
defaults.boresight_radec, help="Observation shared key for boresight"
)
position = Unicode(defaults.position, help="Observation shared key for position")
velocity = Unicode(defaults.velocity, help="Observation shared key for velocity")
det_data = Unicode(
defaults.det_data,
allow_none=True,
help="Observation detdata key to initialize",
)
det_data_units = Unit(
defaults.det_data_units, help="Output units if creating detector data"
)
det_flags = Unicode(
defaults.det_flags,
allow_none=True,
help="Observation detdata key for flags to initialize",
)
@traitlets.validate("telescope")
def _check_telescope(self, proposal):
tele = proposal["value"]
if tele is not None:
try:
dets = tele.focalplane.detectors
except Exception:
raise traitlets.TraitError(
"telescope must be a Telescope instance with a focalplane"
)
return tele
@traitlets.validate("schedule")
def _check_schedule(self, proposal):
sch = proposal["value"]
if sch is not None:
if not isinstance(sch, SatelliteSchedule):
raise traitlets.TraitError(
"schedule must be an instance of a SatelliteSchedule"
)
return sch
@traitlets.validate("hwp_angle")
def _check_hwp_angle(self, proposal):
hwp_angle = proposal["value"]
if hwp_angle is None:
if self.hwp_rpm is not None or self.hwp_step is not None:
raise traitlets.TraitError(
"Cannot simulate HWP without a shared data key"
)
else:
if self.hwp_rpm is None and self.hwp_step is None:
raise traitlets.TraitError("Cannot simulate HWP without parameters")
return hwp_angle
@traitlets.validate("hwp_rpm")
def _check_hwp_rpm(self, proposal):
hwp_rpm = proposal["value"]
if hwp_rpm is not None:
if self.hwp_angle is None:
raise traitlets.TraitError(
"Cannot simulate rotating HWP without a shared data key"
)
if self.hwp_step is not None:
raise traitlets.TraitError("HWP cannot rotate *and* step.")
else:
if self.hwp_angle is not None and self.hwp_step is None:
raise traitlets.TraitError("Cannot simulate HWP without parameters")
return hwp_rpm
@traitlets.validate("hwp_step")
def _check_hwp_step(self, proposal):
hwp_step = proposal["value"]
if hwp_step is not None:
if self.hwp_angle is None:
raise traitlets.TraitError(
"Cannot simulate stepped HWP without a shared data key"
)
if self.hwp_rpm is not None:
raise traitlets.TraitError("HWP cannot rotate *and* step.")
else:
if self.hwp_angle is not None and self.hwp_rpm is None:
raise traitlets.TraitError("Cannot simulate HWP without parameters")
return hwp_step
def __init__(self, **kwargs):
super().__init__(**kwargs)
@function_timer
def _exec(self, data, detectors=None, **kwargs):
zaxis = np.array([0, 0, 1], dtype=np.float64)
log = Logger.get()
if self.telescope is None:
raise RuntimeError(
"The telescope attribute must be set before calling exec()"
)
if self.schedule is None:
raise RuntimeError(
"The schedule attribute must be set before calling exec()"
)
focalplane = self.telescope.focalplane
rate = focalplane.sample_rate.to_value(u.Hz)
site = self.telescope.site
comm = data.comm
# List of detectors in this pipeline
pipedets = None
if detectors is None:
pipedets = focalplane.detectors
else:
pipedets = list()
for det in focalplane.detectors:
if det in detectors:
pipedets.append(det)
# Group by detector sets and prune to include only the detectors we
# are using.
detsets = None
if self.detset_key is not None:
detsets = dict()
dsets = focalplane.detector_groups(self.detset_key)
for k, v in dsets.items():
detsets[k] = list()
for d in v:
if d in pipedets:
detsets[k].append(d)
# Data distribution in the detector direction
det_ranks = comm.group_size
if self.distribute_time:
det_ranks = 1
# Verify that we have enough data for all of our processes. If we are
# distributing by time, we have no sample sets, so can accomodate any
# number of processes. If we are distributing by detector, we must have
# at least one detector set for each process.
if not self.distribute_time:
# distributing by detector...
n_detset = None
if detsets is None:
# Every detector is independently distributed
n_detset = len(pipedets)
else:
n_detset = len(detsets)
if det_ranks > n_detset:
if comm.group_rank == 0:
msg = f"Group {comm.group} has {comm.group_size} processes but {n_detset} detector sets."
log.error(msg)
raise RuntimeError(msg)
# The global start is the beginning of the first scan
mission_start = self.schedule.scans[0].start
# Satellite motion is continuous across multiple observations, so we simulate
# continuous sampling and find the actual start / stop times for the samples
# that fall in each scan time range.
if len(self.schedule.scans) == 0:
raise RuntimeError("Schedule has no scans!")
scan_starts = list()
scan_stops = list()
scan_offsets = list()
scan_samples = list()
incr = 1.0 / rate
off = 0
for scan in self.schedule.scans:
ffirst = rate * (scan.start - mission_start).total_seconds()
first = int(ffirst)
if ffirst - first > 1.0e-3 * incr:
first += 1
start = first * incr + mission_start.timestamp()
ns = 1 + int(rate * (scan.stop.timestamp() - start))
stop = (ns - 1) * incr + mission_start.timestamp()
scan_starts.append(start)
scan_stops.append(stop)
scan_samples.append(ns)
scan_offsets.append(off)
off += ns
# Distribute the observations uniformly among groups. We take each scan and
# weight it by the duration.
groupdist = distribute_discrete(scan_samples, comm.ngroups)
# Every process group creates its observations
group_firstobs = groupdist[comm.group][0]
group_numobs = groupdist[comm.group][1]
for obindx in range(group_firstobs, group_firstobs + group_numobs):
scan = self.schedule.scans[obindx]
ses_start = scan_starts[obindx]
ses_end = ses_start + float(scan_samples[obindx] - 1) / rate
session = Session(
f"{scan.name}_{int(ses_start):10d}",
start=datetime.fromtimestamp(ses_start).astimezone(timezone.utc),
end=datetime.fromtimestamp(ses_end).astimezone(timezone.utc),
)
ob = Observation(
comm,
self.telescope,
scan_samples[obindx],
name=f"{scan.name}_{int(scan.start.timestamp())}",
uid=name_UID(scan.name),
session=session,
detector_sets=detsets,
process_rows=det_ranks,
)
# Create shared objects for timestamps, common flags, position,
# and velocity.
ob.shared.create_column(
self.times,
shape=(ob.n_local_samples,),
dtype=np.float64,
)
ob.shared.create_column(
self.shared_flags,
shape=(ob.n_local_samples,),
dtype=np.uint8,
)
ob.shared.create_column(
self.position,
shape=(ob.n_local_samples, 3),
dtype=np.float64,
)
ob.shared.create_column(
self.velocity,
shape=(ob.n_local_samples, 3),
dtype=np.float64,
)
# Rank zero of each grid column creates the data
stamps = None
position = None
velocity = None
q_prec = None
if ob.comm_col_rank == 0:
start_time = scan_starts[obindx] + float(ob.local_index_offset) / rate
stop_time = start_time + float(ob.n_local_samples - 1) / rate
stamps = np.linspace(
start_time,
stop_time,
num=ob.n_local_samples,
endpoint=True,
dtype=np.float64,
)
# Get the motion of the site for these times.
position, velocity = site.position_velocity(stamps)
# Get the quaternions for the precession axis. For now, assume that
# it simply points away from the solar system barycenter
pos_norm = np.sqrt((position * position).sum(axis=1)).reshape(-1, 1)
pos_norm = 1.0 / pos_norm
prec_axis = pos_norm * position
q_prec = qa.from_vectors(
np.tile(zaxis, ob.n_local_samples).reshape(-1, 3), prec_axis
)
ob.shared[self.times].set(stamps, offset=(0,), fromrank=0)
ob.shared[self.position].set(position, offset=(0, 0), fromrank=0)
ob.shared[self.velocity].set(velocity, offset=(0, 0), fromrank=0)
# Create boresight pointing
satellite_scanning(
ob,
self.boresight,
sample_offset=scan_offsets[obindx],
q_prec=q_prec,
spin_period=scan.spin_period,
spin_angle=self.spin_angle,
prec_period=scan.prec_period,
prec_angle=self.prec_angle,
)
# Set HWP angle
simulate_hwp_response(
ob,
ob_time_key=self.times,
ob_angle_key=self.hwp_angle,
ob_mueller_key=None,
hwp_start=scan_starts[obindx] * u.second,
hwp_rpm=self.hwp_rpm,
hwp_step=self.hwp_step,
hwp_step_time=self.hwp_step_time,
)
# Optionally initialize detector data
dets = ob.select_local_detectors(detectors)
if self.det_data is not None:
exists_data = ob.detdata.ensure(
self.det_data,
dtype=np.float64,
detectors=dets,
create_units=self.det_data_units,
)
if self.det_flags is not None:
exists_flags = ob.detdata.ensure(
self.det_flags, dtype=np.uint8, detectors=dets
)
data.obs.append(ob)
return
def _finalize(self, data, **kwargs):
return
def _requires(self):
return dict()
def _provides(self):
prov = {
"shared": [
self.times,
self.shared_flags,
self.boresight,
self.hwp_angle,
self.position,
self.velocity,
]
}
if self.det_data is not None:
prov["detdata"].append(self.det_data)
if self.det_flags is not None:
prov["detdata"].append(self.det_flags)
return prov