Source code for mugatu.designmode

# @Author: Ilija Medan
# @Date: May 18, 2021
# @Filename: obsmode.py
# @License: BSD 3-Clause
# @Copyright: Ilija Medan

import numpy as np
import warnings
import fitsio
import collections
from scipy.spatial import cKDTree
from astropy.time import Time

from mugatu.exceptions import MugatuError, MugatuWarning
from coordio.utils import (radec2wokxy, wokxy2radec,
                           offset_definition, Moffat2dInterp)

fmagloss = Moffat2dInterp()

try:
    from sdssdb.peewee.sdss5db import database
    database.set_profile('operations')
    _database = True
except:
    _database = False

from sdssdb.peewee.sdss5db.targetdb import Carton, Category, Magnitude, CartonToTarget, Target
from sdssdb.peewee.sdss5db.targetdb import DesignMode as DesignModeDB
from sdssdb.peewee.sdss5db import catalogdb
from sdssdb.peewee.sdss5db import targetdb
from sdssdb.peewee.sdss5db import opsdb


[docs] def ang_sep(ra1, dec1, ra2, dec2): """ Returns angular separation between objects. Parameters ---------- ra1: float or np.array Right ascension(s) of first object(s). dec1: float or np.array Declination(s) of first object(s). ra2: float or np.array Right ascension(s) of second object(s). dec2: float or np.array Declination(s) of second object(s). Returns ------- sep: float or np.array Angular seperation between objects in degrees. """ ra1 = np.radians(ra1) dec1 = np.radians(dec1) ra2 = np.radians(ra2) dec2 = np.radians(dec2) sep = (180 / np.pi) * np.arccos(np.sin(dec1) * np.sin(dec2) + np.cos(dec1) * np.cos(dec2) * np.cos(ra1 - ra2)) return sep
[docs] def allDesignModes(filename=None, ext=1): """Function to return a dictionary with all design modes Parameters ---------- filename : str FITS file name to read array from (default None) ext : int or str FITS file extension to read array from (default 1) Comments -------- Returns an OrderedDict If filename is not provided, it reads from that file name and the given extension. If not, it reads from the database. If neither the filename is given nor the db is available, it returns None. """ if(filename is not None): dmas = fitsio.read(filename, ext=ext) dmd = collections.OrderedDict() for dma in dmas: dmd[dma['label']] = DesignMode() dmd[dma['label']].fromarray(dma) return(dmd) if(_database | (filename is not None)): desmodes = DesignModeDB.select() dmd = collections.OrderedDict() for desmode in desmodes: dmd[desmode.label] = DesignMode(label=desmode.label) return(dmd) print("No file name input or db access.") return(None)
[docs] class DesignMode(object): """Class to store parameters for a design mode Parameters ---------- label : str label for design mode Attributes ---------- n_stds_min: dict Dictonary with the minimum number of standards for a design for each instrument ('APOGEE' and 'BOSS'). min_stds_fovmetric: dict Dictonary wih the FOV metric for the standards in a design for each instrument ('APOGEE' and 'BOSS'). The FOV metric is described by three parameters, the nth neighbor to get distances to, the percentle distance to calculate and the distace to compare against for validation (in mm). stds_mags: dict Dictonary for the min/max magnitude for the standards in a design for each instrument ('APOGEE' and 'BOSS'). Indexes correspond to magntidues: [g, r, i, z, bp, gaia_g, rp, J, H, K]. bright_limit_targets: dict Dictonary for the min/max magnitude for the science targets in adesign for each instrument ('APOGEE' and 'BOSS'). Indexes correspond to magntidues: [g, r, i, z, bp, gaia_g, rp, J, H, K]. sky_neighbors_targets: dict Dictonary for the parameters used to check distance between skies and all possible sources in field for each instrument ('APOGEE' and 'BOSS'). Distances to targets (r, in arcseconds) must be r > R_0 * (lim - mags) ** beta, where mags is G band for BOSS and H band for APOGEE. Indexes correspond to: [R_0, beta, lim] trace_diff_targets: dict Dictonary for the maximum magnitude difference allowed between fibers next to each ther on the chip for each instrument ('APOGEE' and 'BOSS'). Here the magntidue difference is checked in the G band for BOSS and H band for APOGEE. """ def __init__(self, label=None): if(label is not None): self.fromdb(label=label) return
[docs] def fromdb(self, label=None): """Read in parameters for design mode from db Parameters ---------- label : str name of design mode """ if(_database is False): print("No database, cannot read.") return self.desmode_label = label desmode = DesignModeDB.select().where(DesignModeDB.label == self.desmode_label)[0] self.n_skies_min = {} self.n_skies_min['BOSS'] = desmode.boss_skies_min self.n_skies_min['APOGEE'] = desmode.apogee_skies_min self.min_skies_fovmetric = {} self.min_skies_fovmetric['BOSS'] = desmode.boss_skies_fov self.min_skies_fovmetric['APOGEE'] = desmode.apogee_skies_fov self.n_stds_min = {} self.n_stds_min['BOSS'] = desmode.boss_stds_min self.n_stds_min['APOGEE'] = desmode.apogee_stds_min self.min_stds_fovmetric = {} self.min_stds_fovmetric['BOSS'] = desmode.boss_stds_fov self.min_stds_fovmetric['APOGEE'] = desmode.apogee_stds_fov self.stds_mags = {} self.stds_mags['BOSS'] = np.zeros((len(desmode.boss_stds_mags_min), 2), dtype=np.float64) self.stds_mags['BOSS'][:, 0] = desmode.boss_stds_mags_min self.stds_mags['BOSS'][:, 1] = desmode.boss_stds_mags_max self.stds_mags['APOGEE'] = np.zeros((len(desmode.apogee_stds_mags_min), 2), dtype=np.float64) self.stds_mags['APOGEE'][:, 0] = desmode.apogee_stds_mags_min self.stds_mags['APOGEE'][:, 1] = desmode.apogee_stds_mags_max self.bright_limit_targets = {} self.bright_limit_targets['BOSS'] = np.zeros((len(desmode.boss_bright_limit_targets_min), 2), dtype=np.float64) self.bright_limit_targets['BOSS'][:, 0] = desmode.boss_bright_limit_targets_min self.bright_limit_targets['BOSS'][:, 1] = desmode.boss_bright_limit_targets_max self.bright_limit_targets['APOGEE'] = np.zeros((len(desmode.apogee_bright_limit_targets_min), 2), dtype=np.float64) self.bright_limit_targets['APOGEE'][:, 0] = desmode.apogee_bright_limit_targets_min self.bright_limit_targets['APOGEE'][:, 1] = desmode.apogee_bright_limit_targets_max self.sky_neighbors_targets = {} self.sky_neighbors_targets['BOSS'] = desmode.boss_sky_neighbors_targets self.sky_neighbors_targets['APOGEE'] = desmode.apogee_sky_neighbors_targets self.trace_diff_targets = {} self.trace_diff_targets['APOGEE'] = desmode.apogee_trace_diff_targets return
[docs] def toarray(self): """Returns an ndarray form of the object """ attrnames = ['n_skies_min', 'min_skies_fovmetric', 'n_stds_min', 'min_stds_fovmetric', 'stds_mags', 'bright_limit_targets', 'sky_neighbors_targets', 'trace_diff_targets'] dtype = [('label', 'U30')] for attrname in attrnames: attr = getattr(self, attrname) for instrument in attr: name = instrument.lower() + '_' + attrname if(type(attr[instrument]) == np.ndarray): dtype.append((name, np.float64, attr[instrument].shape)) elif(type(attr[instrument]) == list): dtype.append((name, np.float64, len(attr[instrument]))) else: dtype.append((name, np.float64)) arr = np.zeros(1, dtype=dtype) arr['label'] = self.desmode_label for attrname in attrnames: attr = getattr(self, attrname) for instrument in attr: name = instrument.lower() + '_' + attrname arr[name] = attr[instrument] return(arr[0])
[docs] def fromarray(self, arr=None): """Reads an ndarray form of the object created by toarray() """ attrnames = ['n_skies_min', 'min_skies_fovmetric', 'n_stds_min', 'min_stds_fovmetric', 'stds_mags', 'bright_limit_targets', 'sky_neighbors_targets', 'trace_diff_targets'] self.desmode_label = arr['label'] for attrname in attrnames: setattr(self, attrname, dict()) for instrument in ['BOSS', 'APOGEE']: name = instrument.lower() + '_' + attrname if(name in arr.dtype.names): getattr(self, attrname)[instrument] = arr[name] return
[docs] def todict(self): """Returns a dictionary form of the object """ attrs = ['n_skies_min', 'min_skies_fovmetric', 'n_stds_min', 'min_stds_fovmetric', 'stds_mags', 'bright_limit_targets', 'sky_neighbors_targets', 'trace_diff_targets'] dmd = dict() dmd['label'] = self.desmode_label for attr in attrs: dmda = getattr(self, attr) dmd[attr] = dict() for key in dmda: if(type(dmda[key]) == np.ndarray): dmd[attr][key] = dmda[key].tolist() else: dmd[attr][key] = dmda[key] return(dmd)
[docs] def fromdict(self, designmode_dict=None): """Builds from a dictionary form of the object Parameters ---------- designmode_dict : dict dictionary form of DesignMode object created by todict() """ attrs = ['n_skies_min', 'min_skies_fovmetric', 'n_stds_min', 'min_stds_fovmetric', 'stds_mags', 'bright_limit_targets', 'sky_neighbors_targets', 'trace_diff_targets'] self.desmod_label = designmode_dict['label'] for param in attrs: setattr(self, param, dict()) for instrument in designmode_dict[param]: if(type(designmode_dict[param][instrument]) == list): getattr(self, param)[instrument] = np.array(designmode_dict[param][instrument]) else: getattr(self, param)[instrument] = designmode_dict[param][instrument] return
[docs] def frommanual(self, label=None, desmode_manual=None): """Read in parameters for design mode from dictionary input Parameters ---------- label : str name of design mode desmode_manual : dict Dictonary of DesignMode parameters to be used as manual inputs to validate the design, rather than from targetdb. """ self.desmode_label = label self.n_skies_min = {} self.n_skies_min['BOSS'] = desmode_manual['boss_skies_min'] self.n_skies_min['APOGEE'] = desmode_manual['apogee_skies_min'] self.min_skies_fovmetric = {} self.min_skies_fovmetric['BOSS'] = desmode_manual['boss_skies_fov'] self.min_skies_fovmetric['APOGEE'] = desmode_manual['apogee_skies_fov'] self.n_stds_min = {} self.n_stds_min['BOSS'] = desmode_manual['boss_stds_min'] self.n_stds_min['APOGEE'] = desmode_manual['apogee_stds_min'] self.min_stds_fovmetric = {} self.min_stds_fovmetric['BOSS'] = desmode_manual['boss_stds_fov'] self.min_stds_fovmetric['APOGEE'] = desmode_manual['apogee_stds_fov'] self.stds_mags = {} self.stds_mags['BOSS'] = desmode_manual['boss_stds_mags'] self.stds_mags['APOGEE'] = desmode_manual['apogee_stds_mags'] self.bright_limit_targets = {} self.bright_limit_targets['BOSS'] = desmode_manual['boss_bright_limit_targets'] self.bright_limit_targets['APOGEE'] = desmode_manual['apogee_bright_limit_targets'] self.sky_neighbors_targets = {} self.sky_neighbors_targets['BOSS'] = desmode_manual['boss_sky_neighbors_targets'] self.sky_neighbors_targets['APOGEE'] = desmode_manual['apogee_sky_neighbors_targets'] self.trace_diff_targets = {} self.trace_diff_targets['APOGEE'] = desmode_manual['apogee_trace_diff_targets'] return
[docs] def check_assign_mag_limit(mag_metric_min, mag_metric_max, assign_mag): """ Checks the if magnitude of one assignment agrees with design mode for some instrument and carton class Parameters ---------- mag_metric_min: float The minimum magitude for the specific DesignMode. mag_metric_min: float The maximum magitude for the specific DesignMode. assign_mag: float The magntiude of the assignment being checkaged against the DesignMode. Returns ------- targ_check: boolean True of the assignment magntiude passed the DesignMode for the magnitude, False if not. complete_check: str 'COMPLETE' if assign_mag is value, 'INCOMPLETE' if Null. """ # set default values complete_check = 'COMPLETE' targ_check = False # do checks # define Null cases for targetdb.magnitude table cases = [-999, -9999, 999, 0.0, np.nan, 99.9, None] if assign_mag in cases or np.isnan(assign_mag): complete_check = 'INCOMPLETE' # set True, no mag is not a fail targ_check = True # check when greater than and less than elif mag_metric_min != -999. and mag_metric_max != -999.: if (mag_metric_min < assign_mag < mag_metric_max): targ_check = True # check when just greater than elif mag_metric_min != -999.: if assign_mag > mag_metric_min: targ_check = True # check when less than else: if assign_mag < mag_metric_max: targ_check = True return targ_check, complete_check
[docs] def bright_neigh_exclusion_r(mag_bs, mag_limit_r, lunation): """ returns the exclusion radius for a fiber around a bright star in arcseconds for a given designmode. This is for the piecewise appromixation used based on: https://wiki.sdss.org/pages/viewpage.action?pageId=100173069 Parameters ---------- mag_bs: float or np.array The magniutde in the G band for the bright star(s) mag_limit_r: float Magnitude limit for the designmode in the r-SDSS band lunation: str: If the designmode is bright time ('bright') or dark time ('dark') Returns ------- r_exclude: float or np.array exclusion radius in arcseconds around bright star(s) """ flag = ('mugatu.designmode.bright_neigh_exclusion_r will be depriciated ' 'in future releases. For the same functionalitiy, please use ' 'coordio.utils.offset_definition') warnings.warn(flag, MugatuWarning) # linear portion in the wings r_wings = (mag_limit_r - mag_bs - 8.2) / 0.05 # linear portion in transition area r_trans = (mag_limit_r - mag_bs - 4.5) / 0.25 # core area if lunation == 'bright': r_core = 1.75 * (mag_limit_r - mag_bs) ** 0.6 else: r_core = 1.5 * (mag_limit_r - mag_bs) ** 0.8 # exlusion radius is the max of each section if isinstance(mag_bs, float): if mag_bs <= mag_limit_r: r_exclude = max(r_wings, r_trans, r_core) else: r_exclude = 0. else: r_exclude = np.nanmax(np.column_stack((r_wings, r_trans, r_core)), axis=1) r_exclude[mag_bs > mag_limit_r] = 0. return r_exclude
[docs] def adjusted_brigh_neigh_mag(mag_bs, r, lunation): """ returns the approximate adjusted magntidue of a bright source at the position of a nearby fiber. This is for the piecewise appromixation used based on: https://wiki.sdss.org/pages/viewpage.action?pageId=100173069 Parameters ---------- mag_bs: float The magniutde in the G band for the bright star r: float or np.array distance between fiber and bright star (in arcseconds) lunation: str: If the designmode is bright time ('bright') or dark time ('dark') Returns ------- adjusted_mag_bs: float The adjusted magntiude of the bright star in SDSS r-band """ mag_wings = 8.2 + 0.05 * r + mag_bs mag_trans = 4.5 + 0.25 * r + mag_bs if lunation == 'bright': mag_core = (r / 1.75) ** (1 / 0.6) + mag_bs else: mag_core = (r / 1.5) ** (1 / 0.8) + mag_bs if isinstance(r, float): adjusted_mag_bs = np.nanmin([mag_wings, mag_trans, mag_core]) else: adjusted_mag_bs = np.nanmin(np.column_stack((mag_wings, mag_trans, mag_core)), axis=1) return adjusted_mag_bs
[docs] def build_brigh_neigh_query(check_type, instrument, mag_lim, racen, deccen, observatory=None, version_catdb='1.0.0'): """ Builds the database query needed to run bright neighbor check Parameters ---------- check_type: str Either 'designmode' for full designmode check with all stars down to mag_lim in catalogdb, or 'safety' for safety check using bright stars in targetdb. instrument: str Fiber instrument being checked. Either 'BOSS' or 'APOGEE'. mag_lim: float Magnitude limit in the r_SDSS band for BOSS or H band for APOGEE. racen: float Field center in right ascension. deccen: float Feild center in declination observatory: str Observatory where observation is taking place, either 'LCO' or 'APO'. version_catdb: str catalogdb.Version.plan to use for the query Outputs ------- db_query_results: tuple Tuple of (ra, dec, mag, catalogid, pmra, pmdec) for the appropriate database query """ # return empty tuple if no mag limit if mag_lim == -999.: return () # change search radius based on observatory if observatory is None: r_search = 1.5 elif observatory == 'APO': r_search = 1.5 else: r_search = 1.0 if check_type == 'designmode': if instrument == 'BOSS': cat = catalogdb.Gaia_DR3_g_lt_16 ra_col = catalogdb.Catalog.ra dec_col = catalogdb.Catalog.dec ra_col_str = 'ra' dec_col_str = 'dec' mag_col = catalogdb.Gaia_DR3_g_lt_16.phot_g_mean_mag # run the query db_query_gaia = (catalogdb.Catalog.select( ra_col, dec_col, mag_col, catalogdb.Catalog.catalogid, catalogdb.Catalog.pmra, catalogdb.Catalog.pmdec) .join(catalogdb.Version) .switch(catalogdb.Catalog) .join(catalogdb.CatalogToGaia_DR3) .join(catalogdb.Gaia_DR3_g_lt_16, on=(catalogdb.Gaia_DR3_g_lt_16.source_id == catalogdb.CatalogToGaia_DR3.target)) .where((cat.cone_search(racen, deccen, r_search, ra_col=ra_col_str, dec_col=dec_col_str)) & (mag_col < mag_lim) & (catalogdb.Version.plan == version_catdb))) rasg, decsg, magsg, catalogidsg, pmrasg, pmdecsg = map(list, zip(*list(db_query_gaia.tuples()))) rasg = np.array(rasg, dtype=np.float64) decsg = np.array(decsg, dtype=np.float64) magsg = np.array(magsg, dtype=np.float64) catalogidsg = np.array(catalogidsg, dtype=int) pmrasg = np.array(pmrasg, dtype=np.float64) pmdecsg = np.array(pmdecsg, dtype=np.float64) cat = catalogdb.Tycho2 ra_col = catalogdb.Catalog.ra dec_col = catalogdb.Catalog.dec ra_col_str = 'radeg' dec_col_str = 'dedeg' mag_colbt = catalogdb.Tycho2.btmag mag_colvt = catalogdb.Tycho2.vtmag # run the query db_query_tych = (catalogdb.Catalog.select( ra_col, dec_col, mag_colbt, mag_colvt, catalogdb.Catalog.catalogid, catalogdb.Catalog.pmra, catalogdb.Catalog.pmdec) .join(catalogdb.Version) .switch(catalogdb.Catalog) .join(catalogdb.CatalogToTycho2) .join(catalogdb.Tycho2) .where((cat.cone_search(racen, deccen, r_search, ra_col=ra_col_str, dec_col=dec_col_str)) & (mag_colvt < mag_lim) & (catalogdb.Version.plan == version_catdb))) rast, decst, magsbt, magsvt, catalogidst, pmrast, pmdecst = map(list, zip(*list(db_query_tych.tuples()))) rast = np.array(rast, dtype=np.float64) decst = np.array(decst, dtype=np.float64) magsbt = np.array(magsbt, dtype=np.float64) magsvt = np.array(magsvt, dtype=np.float64) catalogidst = np.array(catalogidst, dtype=int) pmrast = np.array(pmrast, dtype=np.float64) pmdecst = np.array(pmdecst, dtype=np.float64) magsg_tych = np.zeros(len(magsbt)) magsg_tych[magsbt != None] = (magsvt[magsbt != None] - 0.02051 - 0.2706 * (magsbt[magsbt != None] - magsvt[magsbt != None]) + 0.03394 * (magsbt[magsbt != None] - magsvt[magsbt != None]) ** 2 - 0.05937 * (magsbt[magsbt != None] - magsvt[magsbt != None]) ** 3) magsg_tych[magsbt == None] = magsvt[magsbt == None] - 1 # set up the logic to only include tycho # stars not found in Gaia tych_in_gaia = np.isin(catalogidst, catalogidsg) # evals for each now tych_eval = (~tych_in_gaia) ras = np.append(rasg, rast[tych_eval]) decs = np.append(decsg, decst[tych_eval]) mags = np.append(magsg, magsg_tych[tych_eval]) catalogids = np.append(catalogidsg, catalogidst[tych_eval]) pmras = np.append(pmrasg, pmrast[tych_eval]) pmdecs = np.append(pmdecsg, pmdecst[tych_eval]) db_query_results = (ras, decs, mags, catalogids, pmras, pmdecs) else: cat = catalogdb.TwoMassPSC ra_col = catalogdb.Catalog.ra dec_col = catalogdb.Catalog.dec ra_col_str = 'ra' dec_col_str = 'decl' mag_col = catalogdb.TwoMassPSC.h_m # run the query db_query = (catalogdb.Catalog.select( ra_col, dec_col, mag_col, catalogdb.Catalog.catalogid, catalogdb.Catalog.pmra, catalogdb.Catalog.pmdec) .join(catalogdb.Version) .switch(catalogdb.Catalog) .join(catalogdb.CatalogToTwoMassPSC) .join(cat) .where((cat.cone_search(racen, deccen, r_search, ra_col=ra_col_str, dec_col=dec_col_str)) & (mag_col < mag_lim) & (catalogdb.Version.plan == version_catdb))) ras, decs, mags, catalogids, pmras, pmdecs = map(list, zip(*list(db_query.tuples()))) ras = np.array(ras, dtype=np.float64) decs = np.array(decs, dtype=np.float64) mags = np.array(mags, dtype=np.float64) catalogids = np.array(catalogids, dtype=int) pmras = np.array(pmras, dtype=np.float64) pmdecs = np.array(pmdecs, dtype=np.float64) db_query_results = (ras, decs, mags, catalogids, pmras, pmdecs) else: if instrument == 'BOSS': carts = ['ops_tycho2_brightneighbors', 'ops_gaia_brightneighbors'] mag_col = targetdb.Magnitude.gaia_g # tycho query db_queryt = (targetdb.CartonToTarget.select(targetdb.Target.ra, targetdb.Target.dec, mag_col, targetdb.Target.catalogid, targetdb.Target.pmra, targetdb.Target.pmdec) .join(targetdb.Target) .switch(targetdb.CartonToTarget) .join(targetdb.Magnitude) .switch(targetdb.CartonToTarget) .join(targetdb.Carton) .join(targetdb.Version) .where((targetdb.Target.cone_search(racen, deccen, r_search)) & (targetdb.Carton.carton == carts[0]) & (targetdb.Version.plan >= version_catdb) & (~(targetdb.Version.plan % '%-test')))) rast, decst, magst, catalogidst, pmrast, pmdecst = map(list, zip(*list(db_queryt.tuples()))) rast = np.array(rast, dtype=np.float64) decst = np.array(decst, dtype=np.float64) magst = np.array(magst, dtype=np.float64) catalogidst = np.array(catalogidst, dtype=int) pmrast = np.array(pmrast, dtype=np.float64) pmdecst = np.array(pmdecst, dtype=np.float64) # gaia query db_queryg = (targetdb.CartonToTarget.select(targetdb.Target.ra, targetdb.Target.dec, mag_col, targetdb.Target.catalogid, targetdb.Target.pmra, targetdb.Target.pmdec) .join(targetdb.Target) .switch(targetdb.CartonToTarget) .join(targetdb.Magnitude) .switch(targetdb.CartonToTarget) .join(targetdb.Carton) .join(targetdb.Version) .where((targetdb.Target.cone_search(racen, deccen, r_search)) & (targetdb.Carton.carton == carts[1]) & (targetdb.Version.plan >= version_catdb) & (~(targetdb.Version.plan % '%-test')))) rasg, decsg, magsg, catalogidsg, pmrasg, pmdecsg = map(list, zip(*list(db_queryg.tuples()))) rasg = np.array(rasg, dtype=np.float64) decsg = np.array(decsg, dtype=np.float64) magsg = np.array(magsg, dtype=np.float64) catalogidsg = np.array(catalogidsg, dtype=int) pmrasg = np.array(pmrasg, dtype=np.float64) pmdecsg = np.array(pmdecsg, dtype=np.float64) # remove tycho objects in gaia tych_in_gaia = np.isin(catalogidst, catalogidsg) tych_eval = (~tych_in_gaia) ras = np.append(rasg, rast[tych_eval]) decs = np.append(decsg, decst[tych_eval]) mags = np.append(magsg, magst[tych_eval]) catalogids = np.append(catalogidsg, catalogidst[tych_eval]) pmras = np.append(pmrasg, pmrast[tych_eval]) pmdecs = np.append(pmdecsg, pmdecst[tych_eval]) else: carts = ['ops_2mass_psc_brightneighbors'] mag_col = targetdb.Magnitude.h # run the query db_query = (targetdb.CartonToTarget.select(targetdb.Target.ra, targetdb.Target.dec, mag_col, targetdb.Target.catalogid, targetdb.Target.pmra, targetdb.Target.pmdec) .join(targetdb.Target) .switch(targetdb.CartonToTarget) .join(targetdb.Magnitude) .switch(targetdb.CartonToTarget) .join(targetdb.Carton) .join(targetdb.Version) .where((targetdb.Target.cone_search(racen, deccen, r_search)) & (targetdb.Carton.carton == carts[0]) & (targetdb.Version.plan >= version_catdb) & (~(targetdb.Version.plan % '%-test')))) ras, decs, mags, catalogids, pmras, pmdecs = map(list, zip(*list(db_query.tuples()))) ras = np.array(ras, dtype=np.float64) decs = np.array(decs, dtype=np.float64) mags = np.array(mags, dtype=np.float64) catalogids = np.array(catalogids, dtype=int) pmras = np.array(pmras, dtype=np.float64) pmdecs = np.array(pmdecs, dtype=np.float64) db_query_results = (ras, decs, mags, catalogids, pmras, pmdecs) return db_query_results
[docs] class DesignModeCheck(DesignMode): """ Parameters ---------- FPSDesign: object mugatu.fpsdesign.FPSDesign object with a built design. desmode_label: str The DesignMode label from targetdb. Options for label are: bright_time, dark_plane, dark_monit, dark_rm and dark_faint. desmode_manual: dict Dictonary of DesignMode parameters to be used as manual inputs to validate the design, rather than from targetdb. db_query_results_boss: dict Database query results for BOSS bright neighbor check. Each index of dict is a tuple of (ras, decs, mags, catalogids) with one index for designmode and the other safety. db_query_results_apogee: dict Database query results for APOGEE bright neighbor check. Each index of dict is a tuple of (ras, decs, mags, catalogids) with one index for designmode and the other safety. Attributes ---------- design: dict FPSDesign dictonary. racen: float RA center of the field (degrees) deccen: float DEC center of the field (degrees) position_angle: float Position angle of the field E of N in degrees. observatory: str Observatory where observation is taking place, either 'LCO' or 'APO'. obsTime: float Julian date of the observation. rg: kaiju.robotGrid Kaiju robotGrid object for the design. n_skies_min: dict Dictonary with the minimum number of skies for a design for each instrument ('APOGEE' and 'BOSS'). min_skies_fovmetric: dict Dictonary wih the FOV metric for the skies in a design for each instrument('APOGEE' and 'BOSS'). The FOV metric is described by three parameters, the nth neighbor to get distances to, the percentle distance to calculate and the distace to compare against for validation (in mm). n_stds_min: dict Dictonary with the minimum number of standards for a design for each instrument ('APOGEE' and 'BOSS'). min_stds_fovmetric: dict Dictonary wih the FOV metric for the standards in a design for each instrument ('APOGEE' and 'BOSS'). The FOV metric is described by three parameters, the nth neighbor to get distances to, the percentle distance to calculate and the distace to compare against for validation (in mm). stds_mags: dict Dictonary for the min/max magnitude for the standards in a design for each instrument ('APOGEE' and 'BOSS'). Indexes correspond to magntidues: [g, r, i, bp, gaia_g, rp, h]. bright_limit_targets: dict Dictonary for the min/max magnitude for the science targets in adesign for each instrument ('APOGEE' and 'BOSS'). Indexes correspond to magntidues: [g, r, i, bp, gaia_g, rp, h]. sky_neighbors_targets: dict Dictonary for the parameters used to check distance between skies and all possible sources in field for each instrument ('APOGEE' and 'BOSS'). Distances to targets (r, in arcseconds) must be r > R_0 * (lim - mags) ** beta, where mags is G band for BOSS and H band for APOGEE. Indexes correspond to: [R_0, beta, lim] trace_diff_targets: dict Dictonary for the maximum magnitude difference allowed between fibers next to each ther on the chip for each instrument ('APOGEE' and 'BOSS'). Here the magntidue difference is checked in the G band for BOSS and H band for APOGEE. carton_classes: dict Dictonary of arrays for the carton pks in each category (i.e. science, standards and skies). """ def __init__(self, FPSDesign, desmode_label, desmode_manual=None, db_query_results_boss=None, db_query_results_apogee=None): # grab needed info from FPSDesign object self.design = FPSDesign.design self.racen = FPSDesign.racen self.deccen = FPSDesign.deccen self.position_angle = FPSDesign.position_angle self.observatory = FPSDesign.observatory self.obsTime = FPSDesign.obsTime self.rg = FPSDesign.rg self.desmode_label = desmode_label self.db_query_results_boss = db_query_results_boss self.db_query_results_apogee = db_query_results_apogee # grab the design mode params if desmode_manual is None: self.fromdb(label=self.desmode_label) else: self.fromdict(designmode_dict=desmode_manual) # classify cartons as skies, standards or science self.carton_classes = {} self.carton_classes['science'] = [] self.carton_classes['sky'] = [] self.carton_classes['std'] = [] for cat in np.unique(self.design['category'][self.design['catalogID'] != -1]): if 'sky' in cat: self.carton_classes['sky'] += list(self.design['category'][(self.design['catalogID'] != -1) & (self.design['category'] == cat)]) elif 'standard' in cat: self.carton_classes['std'] += list(self.design['category'][(self.design['catalogID'] != -1) & (self.design['category'] == cat)]) # I think else makes sense here as there are science # and open fiber labels? else: self.carton_classes['science'] += list(self.design['category'][(self.design['catalogID'] != -1) & (self.design['category'] == cat)]) # collect magntiudes of design # here I am doing g,r,i,BP,G,RP,H self.mags = self.design['magnitudes']
[docs] def skies_min(self, instrument, return_metric=False): """ Checks if design has the required number of skies for some instrument. Returns True if number skies is greater than the minimum and False if not. Parameters ---------- instrument: str Instrument to check number of sky fibers. Must be 'BOSS' or 'APOGEE'. return_metric: boolean If True, will return the number of skies in the design. Returns ------- : boolean True if number skies is greater than the minimum and False if not. n_skies: int Number of skies in design, only returned if return_metric=True. """ n_skies = len(self.design['catalogID'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['sky'])) & (self.design['obsWavelength'] == instrument)]) if return_metric: if n_skies >= self.n_skies_min[instrument]: return True, n_skies else: return False, n_skies else: if n_skies >= self.n_skies_min[instrument]: return True else: return False
[docs] def stds_min(self, instrument, return_metric=False): """ Checks if design has the required number of standards for some instrument. Returns True if number standards is greater than the minimum and False if not. Parameters ---------- instrument: str Instrument to check number of standard fibers. Must be 'BOSS' or 'APOGEE'. return_metric: boolean If True, will return the number of standards in the design. Returns ------- : boolean True if number standards is greater than the minimum and False if not. n_stds: int Number of standards in design, only returned if return_metric=True. """ n_stds = len(self.design['catalogID'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['std'])) & (self.design['obsWavelength'] == instrument)]) if return_metric: if n_stds >= self.n_stds_min[instrument]: return True, n_stds else: return False, n_stds else: if n_stds >= self.n_stds_min[instrument]: return True else: return False
[docs] def skies_fov(self, instrument, return_metric=False): """ Checks if design meets the FOV metric for the skies for some instrument. Returns True FOV metric met, False if not. Also, if no science targets in design returns True, while no skies returns False. Parameters ---------- instrument: str Instrument to FOV metric. Must be 'BOSS' or 'APOGEE'. return_metric: boolean If True, will return the FOV metric for the design. Returns ------- : boolean True FOV metric met, False if not. Also, if no science targets in design returns True, while no skies returns False. perc_dist: float Percentile distance between science assignments and skies (FOV metric) for the design. Only returned if return_metric=True. """ # for now, return True (passing) if no FOV metric supplied if np.all(self.min_skies_fovmetric[instrument] == -999.): if return_metric: return True, -1. else: return True # get x,y of the skies x_sky = self.design['x'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['sky'])) & (self.design['obsWavelength'] == instrument)] y_sky = self.design['y'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['sky'])) & (self.design['obsWavelength'] == instrument)] x_sci = self.design['x'][(self.design['catalogID'] != -1) & (~np.isin(self.design['category'], self.carton_classes['sky'])) & (self.design['obsWavelength'] == instrument)] y_sci = self.design['y'][(self.design['catalogID'] != -1) & (~np.isin(self.design['category'], self.carton_classes['sky'])) & (self.design['obsWavelength'] == instrument)] # if no science in band, doesnt matter? if len(x_sci) == 0: if return_metric: return True, -1. else: return True # if no skies required, dont do check if self.n_skies_min[instrument] == 0: if return_metric: return True, -1. else: return True # if no skies, dont do check if len(x_sky) == 0: if return_metric: return False, -1. else: return False # create KDE tree tree = cKDTree(np.column_stack((x_sky, y_sky))) # get distances for nearest neighbors dd, ii = tree.query(np.column_stack((x_sci, y_sci)), k=self.min_skies_fovmetric[instrument][0]) # second column is the nth neighbor distance if k>1 if self.min_skies_fovmetric[instrument][0] == 1: dists = dd else: dists = dd[:, 1] # this assumes percentile is on 0 to 100 scale perc_dist = np.percentile(dists, self.min_skies_fovmetric[instrument][1]) if return_metric: if perc_dist < self.min_skies_fovmetric[instrument][2]: return True, perc_dist else: return False, perc_dist else: if perc_dist < self.min_skies_fovmetric[instrument][2]: return True else: return False
[docs] def stds_fov(self, instrument, return_metric=False): """ Checks if design meets the FOV metric for the standards for some instrument. Returns True FOV metric met, False if not. Also, if no science targets in design returns True, while no standards returns False. Parameters ---------- instrument: str Instrument to FOV metric. Must be 'BOSS' or 'APOGEE'. return_metric: boolean If True, will return the FOV metric for the design. Returns ------- : boolean True FOV metric met, False if not. Also, if no science targets in design returns True, while no standards returns False. perc_dist: float Percentile distance between science assignments and standards (FOV metric) for the design. Only returned if return_metric=True. """ # for now, return True (passing) if no FOV metric supplied if np.all(self.min_stds_fovmetric[instrument] == -999.): if return_metric: return True, -1. else: return True # get x,y of the standards x_std = self.design['x'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['std'])) & (self.design['obsWavelength'] == instrument)] y_std = self.design['y'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['std'])) & (self.design['obsWavelength'] == instrument)] x_sci = self.design['x'][(self.design['catalogID'] != -1) & (~np.isin(self.design['category'], self.carton_classes['std'])) & (self.design['obsWavelength'] == instrument)] y_sci = self.design['y'][(self.design['catalogID'] != -1) & (~np.isin(self.design['category'], self.carton_classes['std'])) & (self.design['obsWavelength'] == instrument)] # if no science in band, doesnt matter? if len(x_sci) == 0: if return_metric: return True, -1. else: return True # if no stds required, dont do check if self.n_stds_min[instrument] == 0: if return_metric: return True, -1. else: return True # if no stds, dont do check if len(x_std) == 0: if return_metric: return False, -1. else: return False # create KDE tree tree = cKDTree(np.column_stack((x_std, y_std))) # get distances for nearest neighbors dd, ii = tree.query(np.column_stack((x_sci, y_sci)), k=self.min_stds_fovmetric[instrument][0]) # second column is the nth neighbor distance if k>1 if self.min_stds_fovmetric[instrument][0] == 1: dists = dd else: dists = dd[:, 1] # this assumes percentile is on 0 to 100 scale perc_dist = np.percentile(dists, self.min_stds_fovmetric[instrument][1]) if return_metric: if perc_dist < self.min_stds_fovmetric[instrument][2]: return True, perc_dist else: return False, perc_dist else: if perc_dist < self.min_stds_fovmetric[instrument][2]: return True else: return False
[docs] def mag_limits(self, mag_metric, instrument, carton_class): """ Checks the if magnitude of assignments agree with design mode for some instrument and carton class Parameters ---------- mag_metric: np.array Array of shape (N,M), where N=10 corresponds to magntiudes [g, r, i, z, bp, gaia_g, rp, J, H, K], and M=2 where 0th column is minimum magnitude and 1st column is maximum magnitude. If no check in certain band, use None as value. instrument: str Instrument to check magnitudes. Must be 'BOSS' or 'APOGEE'. carton_class: str Carton class to check magnitude limits of. Must be 'std' or 'science'. Returns ------- mag_checks: np.array Array of booleans equal to length of self.design. If True, assignment within magnitude limits, False if not. If assignment is not with instrument or in carton_class, will be False. complete_check: np.array Array of str equal to length of self.design. If 'INCOMPLETE', then assignment either did not have all required photometry for check, or assignment is not with instrument or in carton_class. """ mag_checks = np.zeros(len(self.design['catalogID']), dtype=bool) # if complete, all bands for target present in check # if incomplete, then complete_check = np.array(['COMPLETE' for _ in range(len(self.design['catalogID']))], dtype='<U12') # check which limits are defined for mode check_inds = [] for i in range(mag_metric.shape[0]): if mag_metric[i][0] != -999. or mag_metric[i][1] != -999.: check_inds.append(i) # run checks for i in range(len(mag_checks)): if (self.design['catalogID'][i] != -1 and self.design['category'][i] in self.carton_classes[carton_class] and self.design['obsWavelength'][i] == instrument): # don't do check and make true if offset target if self.design['offset'][i] and self.design['offset_flag'][i] == 0: mag_checks[i] = True else: # check in each band that has check defined targ_check = np.zeros(len(check_inds), dtype=bool) for j, ind in enumerate(check_inds): # check the magntiude for this assignment targ_check[j], complete_check[i] = check_assign_mag_limit( mag_metric[ind][0], mag_metric[ind][1], self.mags[i][ind]) # if all True, then passes if np.all(targ_check): mag_checks[i] = True else: complete_check[i] = 'INCOMPLETE' return mag_checks, complete_check
[docs] def bright_neighbors(self, instrument, check_type='designmode', db_query=None): """ Check if any fibers are placed near a bright star Parameters ---------- instrument: str Instrument to check sky distances. Must be 'BOSS' or 'APOGEE'. check_type: str Either 'designmode' for checking bright stars in catalogdb down to designmode magnitude limit, or 'safety' for checking bright stars in targetdb in bright star cartons. db_query: peewee query Optionally pass the pre-queried bright stars from catalogdb or targetdb. Must include ra, dec, magnitude and id. Other option is to provide tuple of (ra,dec,magntiude,id). Returns ------- neigh_checks_des: np.array Array of booleans equal to length of 500, where order is robotID 1 to 500 robotID. True assignment valid (i.e. not near bright star), False if not valid. hasFiber: np.array Array of booleans equal to length of 500, where order is robotID 1 to 500 robotID. True if robotID has intrument on fiber, False if not. If False, do not consider result from neigh_checks_des for this robotID mag_adj_robo: np.array If neigh_checks_des == False, reports the magntiude of nearby bright source at the fiber position. If neigh_checks_des == True, then no value reported (where NULL = -9999.). isassigned: np.array Array of booleans equal to length of 500, where order is robotID 1 to 500 robotID. True if robotID is assigned in grid and False if it is not. """ # get xPos and yPos from robotGrid xrobo = np.zeros(500) yrobo = np.zeros(500) hasFiber = np.zeros(500, dtype=bool) + True isassigned = np.zeros(500, dtype=bool) + True mag_adj_robo = np.zeros(500) - 9999. for i, robotID in enumerate(self.rg.robotDict): if instrument == 'BOSS': xrobo[i] = self.rg.robotDict[robotID].bossWokXYZ[0] yrobo[i] = self.rg.robotDict[robotID].bossWokXYZ[1] else: hasFiber[i] = self.rg.robotDict[robotID].hasApogee xrobo[i] = self.rg.robotDict[robotID].apWokXYZ[0] yrobo[i] = self.rg.robotDict[robotID].apWokXYZ[1] isassigned[i] = self.rg.robotDict[robotID].isAssigned() ra_robo, dec_robo, fieldWarn = wokxy2radec(xWok=xrobo, yWok=yrobo, waveName=instrument.title(), raCen=self.racen, decCen=self.deccen, obsAngle=self.position_angle, obsSite=self.observatory, obsTime=self.obsTime) neigh_checks = np.zeros(500, dtype=bool) + True mag_limits = self.bright_limit_targets[instrument][:, 0] # set magntiude limit for instrument and lunation if instrument == 'Apogee': # 2MASS H mag_lim = mag_limits[8] elif 'bright' in self.desmode_label: # Gaia G mag_lim = mag_limits[5] else: # SDSS r mag_lim = mag_limits[1] # run query for field if not supplied if (self.db_query_results_boss is not None and instrument == 'BOSS'): db_query = self.db_query_results_boss[check_type] elif (self.db_query_results_apogee is not None and instrument == 'APOGEE'): db_query = self.db_query_results_apogee[check_type] elif db_query is None: db_query = build_brigh_neigh_query(check_type, instrument, mag_lim, self.racen, self.deccen, self.observatory) # only do check if any stars returned if len(db_query) > 0: if isinstance(db_query, tuple): ras, decs, mags, catalogids, pmras, pmdecs = db_query else: ras, decs, mags, catalogids, pmras, pmdecs = map(list, zip(*list(db_query.tuples()))) # set nan pms to 0 pmras[np.isnan(pmras)] = 0. pmdecs[np.isnan(pmdecs)] = 0. # convert to x,y radVel = (np.zeros(len(ras), dtype=np.float64) + 1.e-4) parallax = (np.zeros(len(ras), dtype=np.float64) + 1.e-4) res = radec2wokxy( ra=ras, dec=decs, coordEpoch=Time(np.array([2015.5] * len(ras)), format='decimalyear').jd, waveName=instrument.title(), raCen=self.racen, decCen=self.deccen, obsAngle=self.position_angle, obsSite=self.observatory, obsTime=self.obsTime, pmra=pmras, pmdec=pmdecs, parallax=parallax, radVel=radVel) # now convert back to ra,dec ras, decs, fieldWarn = wokxy2radec(xWok=res[0], yWok=res[1], waveName=instrument.title(), raCen=self.racen, decCen=self.deccen, obsAngle=self.position_angle, obsSite=self.observatory, obsTime=self.obsTime) if 'bright' in self.desmode_label: r_exclude, _ = offset_definition(mags, mag_limits, lunation='bright', waveName=instrument.title(), fmagloss=fmagloss, obsSite=self.observatory) else: r_exclude, _ = offset_definition(mags, mag_limits, lunation='dark', waveName=instrument.title(), fmagloss=fmagloss, obsSite=self.observatory) # check if fibers too close to bright neighbors for i in range(len(r_exclude)): # only do check if exclusion radius larger # than 0" (otherwise star not bright enough) if r_exclude[i] > 0.: dist = ang_sep(ras[i], decs[i], ra_robo, dec_robo) * 3600. if 'bright' in self.desmode_label: mag_adj = adjusted_brigh_neigh_mag(mags[i], dist, lunation='bright') else: mag_adj = adjusted_brigh_neigh_mag(mags[i], dist, lunation='dark') neigh_checks[dist < r_exclude[i]] = False mag_adj_robo[dist < r_exclude[i]] = mag_adj[dist < r_exclude[i]] # dont accoutn for places where no fiber mag_adj_robo[~hasFiber] = -9999. return neigh_checks, hasFiber, mag_adj_robo, isassigned
[docs] def design_mode_check_all(self, verbose=True): """ Perform all DesignMode checks for the given design. Parameters ---------- verbose: boolean True if want print statement summarizing results of all checks. Attributes ---------- n_skies_min_check: dict Results of minumum sky check for each instrument. min_skies_fovmetric_check: dict Results of sky FOV metric check for each instrument. n_stds_min_check: dict Results of minumum standard check for each instrument. min_stds_fovmetric_check: dict Results of standard FOV metric check for each instrument. stds_mags_check: dict Results of standard magnitude limit check for each instrument. bright_limit_targets_check: dict Results of science magnitude limit check for each instrument. """ self.n_skies_min_check = {} result = self.skies_min(instrument='BOSS', return_metric=True) self.n_skies_min_check['BOSS'] = result[0] self.n_skies_min_check['BOSS_metric'] = result[1] result = self.skies_min(instrument='APOGEE', return_metric=True) self.n_skies_min_check['APOGEE'] = result[0] self.n_skies_min_check['APOGEE_metric'] = result[1] self.min_skies_fovmetric_check = {} result = self.skies_fov(instrument='BOSS', return_metric=True) self.min_skies_fovmetric_check['BOSS'] = result[0] self.min_skies_fovmetric_check['BOSS_metric'] = result[1] result = self.skies_fov(instrument='APOGEE', return_metric=True) self.min_skies_fovmetric_check['APOGEE'] = result[0] self.min_skies_fovmetric_check['APOGEE_metric'] = result[1] self.n_stds_min_check = {} result = self.stds_min(instrument='BOSS', return_metric=True) self.n_stds_min_check['BOSS'] = result[0] self.n_stds_min_check['BOSS_metric'] = result[1] result = self.stds_min(instrument='APOGEE', return_metric=True) self.n_stds_min_check['APOGEE'] = result[0] self.n_stds_min_check['APOGEE_metric'] = result[1] self.min_stds_fovmetric_check = {} result = self.stds_fov(instrument='BOSS', return_metric=True) self.min_stds_fovmetric_check['BOSS'] = result[0] self.min_stds_fovmetric_check['BOSS_metric'] = result[1] result = self.stds_fov(instrument='APOGEE', return_metric=True) self.min_stds_fovmetric_check['APOGEE'] = result[0] self.min_stds_fovmetric_check['APOGEE_metric'] = result[1] self.stds_mags_check = {} self.stds_mags_check['BOSS'] = self.mag_limits(self.stds_mags['BOSS'], 'BOSS', 'std') check_tot = len(self.stds_mags_check['BOSS'][0][self.stds_mags_check['BOSS'][0]]) design_tot = len(self.design['x'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['std'])) & (self.design['obsWavelength'] == 'BOSS')]) self.stds_mags_check['BOSS_metric'] = [check_tot, design_tot] self.stds_mags_check['APOGEE'] = self.mag_limits(self.stds_mags['APOGEE'], 'APOGEE', 'std') check_tot = len(self.stds_mags_check['APOGEE'][0][self.stds_mags_check['APOGEE'][0]]) design_tot = len(self.design['x'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['std'])) & (self.design['obsWavelength'] == 'APOGEE')]) self.stds_mags_check['APOGEE_metric'] = [check_tot, design_tot] self.bright_limit_targets_check = {} self.bright_limit_targets_check['BOSS'] = self.mag_limits(self.bright_limit_targets['BOSS'], 'BOSS', 'science') check_tot = len(self.bright_limit_targets_check['BOSS'][0][self.bright_limit_targets_check['BOSS'][0]]) design_tot = len(self.design['x'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['science'])) & (self.design['obsWavelength'] == 'BOSS')]) self.bright_limit_targets_check['BOSS_metric'] = [check_tot, design_tot] self.bright_limit_targets_check['APOGEE'] = self.mag_limits(self.bright_limit_targets['APOGEE'], 'APOGEE', 'science') check_tot = len(self.bright_limit_targets_check['APOGEE'][0][self.bright_limit_targets_check['APOGEE'][0]]) design_tot = len(self.design['x'][(self.design['catalogID'] != -1) & (np.isin(self.design['category'], self.carton_classes['science'])) & (self.design['obsWavelength'] == 'APOGEE')]) self.bright_limit_targets_check['APOGEE_metric'] = [check_tot, design_tot] self.bright_neighbor_check = {} self.bright_neighbor_check['BOSS'] = self.bright_neighbors(instrument='BOSS', check_type='designmode') check_tot = len(self.bright_neighbor_check['BOSS'][0][self.bright_neighbor_check['BOSS'][0] & self.bright_neighbor_check['BOSS'][1] & self.bright_neighbor_check['BOSS'][3]]) design_tot = len(self.bright_neighbor_check['BOSS'][0][self.bright_neighbor_check['BOSS'][1] & self.bright_neighbor_check['BOSS'][3]]) mag_adj_near_bs = self.bright_neighbor_check['BOSS'][2] self.bright_neighbor_check['BOSS_metric'] = [check_tot, design_tot, mag_adj_near_bs] self.bright_neighbor_check['APOGEE'] = self.bright_neighbors(instrument='APOGEE', check_type='designmode') check_tot = len(self.bright_neighbor_check['APOGEE'][0][self.bright_neighbor_check['APOGEE'][0] & self.bright_neighbor_check['APOGEE'][1] & self.bright_neighbor_check['APOGEE'][3]]) design_tot = len(self.bright_neighbor_check['APOGEE'][0][self.bright_neighbor_check['APOGEE'][1] & self.bright_neighbor_check['APOGEE'][3]]) mag_adj_near_bs = self.bright_neighbor_check['APOGEE'][2] self.bright_neighbor_check['APOGEE_metric'] = [check_tot, design_tot, mag_adj_near_bs] if verbose: verbose_output = '' verbose_output += 'DesignMode Param | Pass Check?\n' verbose_output += '----------------------------------|-----------------\n' verbose_output += 'N Skies Min (BOSS): | %s\n' % self.n_skies_min_check['BOSS'] verbose_output += 'N Skies Min (APOGEE): | %s\n' % self.n_skies_min_check['APOGEE'] verbose_output += 'N Standards Min (BOSS): | %s\n' % self.n_stds_min_check['BOSS'] verbose_output += 'N Standards Min (APOGEE): | %s\n' % self.n_stds_min_check['APOGEE'] verbose_output += 'FOV Metric Skies (BOSS): | %s\n' % self.min_skies_fovmetric_check['BOSS'] verbose_output += 'FOV Metric Skies (APOGEE): | %s\n' % self.min_skies_fovmetric_check['APOGEE'] verbose_output += 'FOV Metric Standards (BOSS): | %s\n' % self.min_stds_fovmetric_check['BOSS'] verbose_output += 'FOV Metric Standards (APOGEE): | %s\n' % self.min_stds_fovmetric_check['APOGEE'] check_tot = self.stds_mags_check['BOSS_metric'][0] design_tot = self.stds_mags_check['BOSS_metric'][1] verbose_output += 'Magnitude Limit Stds (BOSS): | %d out of %d\n' % (check_tot, design_tot) check_tot = self.stds_mags_check['APOGEE_metric'][0] design_tot = self.stds_mags_check['APOGEE_metric'][1] verbose_output += 'Magnitude Limit Stds (APOGEE): | %d out of %d\n' % (check_tot, design_tot) check_tot = self.bright_limit_targets_check['BOSS_metric'][0] design_tot = self.bright_limit_targets_check['BOSS_metric'][1] verbose_output += 'Magnitude Limit Targets (BOSS): | %d out of %d\n' % (check_tot, design_tot) check_tot = self.bright_limit_targets_check['APOGEE_metric'][0] design_tot = self.bright_limit_targets_check['APOGEE_metric'][1] verbose_output += 'Magnitude Limit Targets (APOGEE): | %d out of %d\n' % (check_tot, design_tot) check_tot = self.bright_neighbor_check['BOSS_metric'][0] design_tot = self.bright_neighbor_check['BOSS_metric'][1] verbose_output += 'Bright Neighbor Check (BOSS): | %d out of %d\n' % (check_tot, design_tot) check_tot = self.bright_neighbor_check['APOGEE_metric'][0] design_tot = self.bright_neighbor_check['APOGEE_metric'][1] verbose_output += 'Bright Neighbor Check (APOGEE): | %d out of %d\n' % (check_tot, design_tot) print(verbose_output)
[docs] def check_design_to_status(design_id): """ check if the design_id is done """ dts = opsdb.DesignToStatus.get_or_none(design_id=design_id) if dts is None: return False else: if dts.status.label == 'done': return True else: return False
[docs] def designid_status_valid(design_id, field_id, field_exposure): """ check if design_id assocatied in rsFieldAssignments file is based on 'done' designs in database. Parameters ---------- design_id: int The design_id to be validated. If -1, then checks all possible field_exposures in the field. field_id: int field_id for the design. field_exposure: int field_exposure for the design. Returns ------- status: bool If the design_id is set correctly for the design_status. """ if design_id != -1: status = check_design_to_status(design_id) else: # grab all design_ids for current field_exposure in all verisons # of the field designs = targetdb.DesignToField.select()\ .join(targetdb.Field)\ .where(targetdb.Field.field_id == field_id, targetdb.DesignToField.field_exposure == field_exposure) # check all of the designs to see if they are marked done design_status = np.zeros(len(designs), dtype=bool) for i, d in enumerate(designs): design_status[i] = check_design_to_status(d.design_id) status = ~np.any(design_status) # should all be False if design_id = -1 return status
[docs] def find_designid_status(field_id, field_exposure, assign_hash=None): """ Find the designid status based on a given field and field_exposure Parameters ---------- field_id: int field_id for the design. field_exposure: int field_exposure for the design. assign_hash: str The assignment_hash for the design Returns ------- designid_status: int The designid_status for the given slot. If -1, not observed yet status_des: str status of design in opsdb """ # grab all design_ids for current field_exposure in all verisons # of the field designs = targetdb.DesignToField.select()\ .join(targetdb.Field)\ .where(targetdb.Field.field_id == field_id, targetdb.DesignToField.field_exposure == field_exposure) # get the current status status = np.zeros(len(designs), dtype=bool) designids = np.zeros(len(designs), dtype=int) versions = np.zeros(len(designs), dtype=int) for i, d in enumerate(designs): dts = opsdb.DesignToStatus.get_or_none(design_id=d.design_id) if dts is None: status[i] = False else: if dts.status.label == 'done': status[i] = True else: status[i] = False designids[i] = d.design_id versions[i] = d.field.version.pk if np.any(status): designid_status = designids[status][np.argmax(versions[status])] status_des = 'done' else: designid_status = -1 status_des = 'not started' # if assignment_hash provided, check if already exists if assign_hash is not None and designid_status == -1: designs = targetdb.DesignToField.select()\ .join(targetdb.Design)\ .where(targetdb.Design.assignment_hash == assign_hash, targetdb.DesignToField.field_exposure == field_exposure) # get design_id for most recent version if len(designs) > 0: designids = np.zeros(len(designs), dtype=int) versions = np.zeros(len(designs), dtype=int) for i, d in enumerate(designs): designids[i] = d.design_id versions[i] = d.field.version.pk designid_status = designids[np.argmax(versions)] return designid_status, status_des