Source code for rta_selection.dl2_to_dl3

import argparse
import json
import logging
from pathlib import Path

from astropy.io import fits

from rta_selection.dl2_reader import read_data_dl2_to_QTable
from rta_selection.dl3_writer import create_event_list
from rta_selection.filters import filter_events
from rta_selection.utils.rta_argparse import EnumAction
from rta_selection.utils.rta_logging import LOGGING_LEVELS, init_logging


[docs] def dl2_dl3_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="CTAO's Real-Time Analysis DL2 to DL3", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "--config", "-c", action="store", type=Path, dest="config_path", help="Path to the configuration file", required=True, ) parser.add_argument( "--input_dl2", "-i", nargs="+", type=Path, dest="dl2_files_paths", help="Path(s) to the input DL2 file(s) to process to DL3", required=True, ) parser.add_argument( "--log_file", "-l", type=Path, dest="log_file_path", help="Path to a file where this process will log its execution", ) parser.add_argument( "--log_level", type=LOGGING_LEVELS, action=EnumAction, dest="logging_level", help="Logging level: all messages with a level severity above or equal the specified level will be logged.", default=LOGGING_LEVELS.WARNING, ) parser.add_argument("--irf", "-f", type=Path, dest="irf_file_path", help="Path to the IRF file", required=True) parser.add_argument( "--output_dl3", "-o", nargs="+", type=Path, dest="dl3_files_paths", help="Path(s) to the output DL3 file(s)", required=True, ) return parser
[docs] def main(): """DL2 to DL3 file program of CTAO's SAG real time analysis""" parser = dl2_dl3_arg_parser() args = parser.parse_args() init_logging(log_header_str="RTA_DL2_DL3", log_filename=args.log_file_path, log_level=args.logging_level) logging.info("Reading configuration file {}".format(args.config_path)) with open(args.config_path, "r") as config_f: cuts = json.load(config_f) if len(args.dl2_files_paths) != len(args.dl3_files_paths): raise argparse.ArgumentError( "The number {} of input DL2 files must match the number {} of output DL3 files.".format( len(args.dl2_files_paths), len(args.dl3_files_paths) ) ) logging.info("Loading IRF {}".format(args.irf_file_path)) irf = fits.open(args.irf_file_path) aeff2d = irf["EFFECTIVE AREA"] edisp2d = irf["ENERGY DISPERSION"] for dl2_file_path, dl3_file_path in zip(args.dl2_files_paths, args.dl3_files_paths): # using the IRF is: # apply cuts # create event list (as a fits HDU) # add IRF to event list # write the event list as the DL3 file logging.info("Loading input file {}".format(dl2_file_path)) data = read_data_dl2_to_QTable(dl2_file_path) logging.info("Loaded {} events".format(len(data))) logging.info("Apply filters {}".format(cuts["events_filters"])) data = filter_events(data, cuts["events_filters"]) logging.info("Remaining events after filtering: {}".format(len(data))) logging.info("Applying gammaness cut") # Separate cuts for angular separations, for now. TODO: include in filter_events data = data[data["gh_score"] > cuts["fixed_cuts"]["gh_score"][0]] nb_remaining_events = len(data) logging.info("Remaining events after gammaness cut {}".format(nb_remaining_events)) if nb_remaining_events > 0: logging.info("Creating primary HDU") events, gti, pointing = create_event_list(data=data, run_number=data["obs_id"][0], source_name="None") logging.info("Event HDU: {}".format(events)) hdulist = fits.HDUList([fits.PrimaryHDU(), events, gti, pointing, aeff2d, edisp2d]) logging.info("Writing DL3 file {}".format(dl3_file_path)) hdulist.writeto(dl3_file_path, overwrite=True) else: logging.warning("Skipping file writing since {} events passed the cuts".format(nb_remaining_events))
if __name__ == "__main__": main()