Pipelines¶
TOAST workflows are usually called “pipelines” and consist of a toast.Data
object that is passed through one or more “operators”:
-
class
toast.
Operator
[source]¶ Base class for an operator that acts on collections of observations.
An operator takes as input a toast.dist.Data object and returns a new instance of the same size. For each observation in the distributed data, an operator may pass some data types forward unchanged, or it may replace or modify data.
Parameters: None –
There are very few restrictions on an “operator” class. It can have arbitrary constructor arguments and must define an exec() method which takes a toast.Data instance.
Each operator might take many arguments. There are helper functions in toast.pipeline_tools that can be used to create an operator in a pipeline. Currently these helper functions add arguments to argparse for control at the command line. In the future, we intend to support loading operator configuration from other config file formats.
Example: Simple Satellite Simulation¶
TOAST includes several “generic” pipelines that simulate some fake data and then run some operators on that data. One of these is installed as toast_satellite_sim.py. There is some “set up” in the top of the script, but if we remove the timing code then the main() looks like this:
def main():
env = Environment.get()
log = Logger.get()
mpiworld, procs, rank, comm = pipeline_tools.get_comm()
args, comm, groupsize = parse_arguments(comm, procs)
# Parse options
if comm.world_rank == 0:
os.makedirs(args.outdir, exist_ok=True)
focalplane, gain, detweights = load_focalplane(args, comm)
data = create_observations(args, comm, focalplane, groupsize)
pipeline_tools.expand_pointing(args, comm, data)
signalname = None
skyname = pipeline_tools.simulate_sky_signal(
args, comm, data, [focalplane], "signal"
)
if skyname is not None:
signalname = skyname
skyname = pipeline_tools.apply_conviqt(args, comm, data, "signal")
if skyname is not None:
signalname = skyname
diponame = pipeline_tools.simulate_dipole(args, comm, data, "signal")
if diponame is not None:
signalname = diponame
# Mapmaking.
if not args.use_madam:
if comm.world_rank == 0:
log.info("Not using Madam, will only make a binned map")
npp, zmap = pipeline_tools.init_binner(args, comm, data, detweights)
# Loop over Monte Carlos
firstmc = args.MC_start
nmc = args.MC_count
for mc in range(firstmc, firstmc + nmc):
outpath = os.path.join(args.outdir, "mc_{:03d}".format(mc))
pipeline_tools.simulate_noise(
args, comm, data, mc, "tot_signal", overwrite=True
)
# add sky signal
pipeline_tools.add_signal(args, comm, data, "tot_signal", signalname)
if gain is not None:
op_apply_gain = OpApplyGain(gain, name="tot_signal")
op_apply_gain.exec(data)
if mc == firstmc:
# For the first realization, optionally export the
# timestream data. If we had observation intervals defined,
# we could pass "use_interval=True" to the export operators,
# which would ensure breaks in the exported data at
# acceptable places.
pipeline_tools.output_tidas(args, comm, data, "tot_signal")
pipeline_tools.output_spt3g(args, comm, data, "tot_signal")
pipeline_tools.apply_binner(
args, comm, data, npp, zmap, detweights, outpath, "tot_signal"
)
else:
# Initialize madam parameters
madampars = pipeline_tools.setup_madam(args)
# Loop over Monte Carlos
firstmc = args.MC_start
nmc = args.MC_count
for mc in range(firstmc, firstmc + nmc):
# create output directory for this realization
outpath = os.path.join(args.outdir, "mc_{:03d}".format(mc))
pipeline_tools.simulate_noise(
args, comm, data, mc, "tot_signal", overwrite=True
)
# add sky signal
pipeline_tools.add_signal(args, comm, data, "tot_signal", signalname)
if gain is not None:
op_apply_gain = OpApplyGain(gain, name="tot_signal")
op_apply_gain.exec(data)
pipeline_tools.apply_madam(
args, comm, data, madampars, outpath, detweights, "tot_signal"
)
if comm.comm_world is not None:
comm.comm_world.barrier()