Pipelines¶
TOAST pipelines are a top-level python script which instantiates a toast.Data object as well as one or more toast.Operator classes which are applied to the data.
Each operator might take many arguments in its constructor. There are helper functions in toast.pipeline_tools that can be used to create some built-in operators in a pipeline script. Currently these helper functions add arguments to an argparse namespace for control at the command line. In the future, we intend to support loading operator configuration from other config file formats.
The details of how the global data object is created will depend on a particular project and likely use classes specific to that experiment. Here we look at several examples using built-in classes.
Example: Simple Satellite Simulation¶
TOAST includes several “generic” pipelines that simulate some fake data and then run some operators on that data. One of these is installed as toast_satellite_sim.py. There is some “set up” in the top of the script, but if we remove the timing code then the main() looks like this:
def main():
env = Environment.get()
log = Logger.get()
mpiworld, procs, rank, comm = pipeline_tools.get_comm()
args, comm, groupsize = parse_arguments(comm, procs)
# Parse options
if comm.world_rank == 0:
os.makedirs(args.outdir, exist_ok=True)
focalplane, gain, detweights = load_focalplane(args, comm)
data = create_observations(args, comm, focalplane, groupsize)
pipeline_tools.expand_pointing(args, comm, data)
signalname = None
skyname = pipeline_tools.simulate_sky_signal(
args, comm, data, [focalplane], "signal"
)
if skyname is not None:
signalname = skyname
skyname = pipeline_tools.apply_conviqt(args, comm, data, "signal")
if skyname is not None:
signalname = skyname
diponame = pipeline_tools.simulate_dipole(args, comm, data, "signal")
if diponame is not None:
signalname = diponame
# Mapmaking.
if not args.use_madam:
if comm.world_rank == 0:
log.info("Not using Madam, will only make a binned map")
npp, zmap = pipeline_tools.init_binner(args, comm, data, detweights)
# Loop over Monte Carlos
firstmc = args.MC_start
nmc = args.MC_count
for mc in range(firstmc, firstmc + nmc):
outpath = os.path.join(args.outdir, "mc_{:03d}".format(mc))
pipeline_tools.simulate_noise(
args, comm, data, mc, "tot_signal", overwrite=True
)
# add sky signal
pipeline_tools.add_signal(args, comm, data, "tot_signal", signalname)
if gain is not None:
op_apply_gain = OpApplyGain(gain, name="tot_signal")
op_apply_gain.exec(data)
if mc == firstmc:
# For the first realization, optionally export the
# timestream data. If we had observation intervals defined,
# we could pass "use_interval=True" to the export operators,
# which would ensure breaks in the exported data at
# acceptable places.
pipeline_tools.output_tidas(args, comm, data, "tot_signal")
pipeline_tools.output_spt3g(args, comm, data, "tot_signal")
pipeline_tools.apply_binner(
args, comm, data, npp, zmap, detweights, outpath, "tot_signal"
)
else:
# Initialize madam parameters
madampars = pipeline_tools.setup_madam(args)
# Loop over Monte Carlos
firstmc = args.MC_start
nmc = args.MC_count
for mc in range(firstmc, firstmc + nmc):
# create output directory for this realization
outpath = os.path.join(args.outdir, "mc_{:03d}".format(mc))
pipeline_tools.simulate_noise(
args, comm, data, mc, "tot_signal", overwrite=True
)
# add sky signal
pipeline_tools.add_signal(args, comm, data, "tot_signal", signalname)
if gain is not None:
op_apply_gain = OpApplyGain(gain, name="tot_signal")
op_apply_gain.exec(data)
pipeline_tools.apply_madam(
args, comm, data, madampars, outpath, detweights, "tot_signal"
)
if comm.comm_world is not None:
comm.comm_world.barrier()