Source code for mugatu.fpsdesign

# @Author: Ilija Medan
# @Date: December 29, 2020
# @Filename: fpsdesign.py
# @License: BSD 3-Clause
# @Copyright: Ilija Medan

import numpy as np
import warnings
import os
from astropy.io import fits
from astropy.time import Time
from peewee import JOIN

import kaiju
import kaiju.robotGrid
from mugatu.exceptions import (MugatuError, MugatuWarning,
                               MugatuDesignError, MugatuDesignWarning,
                               MugatuDesignModeWarning)
from coordio.utils import (radec2wokxy, object_offset, Moffat2dInterp,
                           _offset_radec)

fmagloss = Moffat2dInterp()

from coordio.defaults import POSITIONER_HEIGHT
from mugatu.designs_to_targetdb import (make_design_assignments_targetdb,
                                        make_design_field_targetdb)
from mugatu.designmode import DesignModeCheck, allDesignModes

import robostrategy.obstime as obstime
import coordio.time

try:
    from sdssdb.peewee.sdss5db import database
    database.set_profile('operations')
    _database = True
except:
    _database = False

from sdssdb.peewee.sdss5db.targetdb import (Design, Field, Observatory,
                                            Assignment, Instrument, Target,
                                            Hole, CartonToTarget, Carton,
                                            Magnitude, Category, DesignToField,
                                            Version)

modes = allDesignModes()


[docs] class FPSDesign(object): """ Class to load, validate and export a design. Parameters ---------- design_pk: int The pk of the design as it appears in targetdb. obsTime: float Julian date of the observation. racen: np.float64 Right Ascension of center of the field in degrees. deccen: np.float64 Declination of center of the field in degrees. position_angle: np.float64 Position angle of the field E of N in degrees. observatory: str Observatory where observation is taking place, either 'LCO' or 'APO'. desmode_label: int The pk in targetdb for the observing mode for the design. idtype: str ID type used for catalogids in design. Must be 'catalogID' or 'carton_to_target'. catalogids: np.array List of ids for a manual design in db. Default from idtype is carton_to_target. ra: np.array List of right ascensions that correspond to catalogids. If list not provided, but catalogids for a manual design are provided, right ascensions will be pulled from targetdb. dec: np.array List of declinations that correspond to catalogids. If list not provided, but catalogids for a manual design are provided, declinations will be pulled from targetdb. pmra: np.array Array of proper motions in the RA axis for the targets, in milliarcsec/yr. Must be a true angle, i.e, it must include the cos(dec) term. If no proper motion, set index to zero. pmdec: np.array Array of proper motions in the DEC axis for the targets, in milliarcsec/yr. If no proper motion, set index to zero. delta_ra: np.array List of offsets in right ascensions that correspond to catalogids. If list not provided, but catalogids for a manual design are provided, right ascensions will be pulled from targetdb. delta_dec: np.array List of offsets in declinations that correspond to catalogids. If list not provided, but catalogids for a manual design are provided, declinations will be pulled from targetdb. epoch: np.array Array of epochs for the coordinates in decimal years. holeID: np.array Fiber assignement for each target in the manual design. obsWavelength: np.array Wavelength of observation for each fiber that correspond to catalogids. Must be either 'BOSS' or 'APOGEE'. priority: np.array Priorties for targets that correspond to catalogids. carton_pk: np.array The carton_pks (as in targetdb) for the targets that correspond to catalogids. category: np.array The category for each target. Can be 'science', 'sky_INSTRUMENT' or 'standard_INSTRUMENT' magnitudes: np.array Magnitudes of the targets. Should be of size (N, 10), where columns correspond to g, r, i, z, bp, gaia_g, rp, J, H, K band magnitudes. design_file: str FITS file with a manual design. The file must be the same format as the rsFieldAssignments file. manual_design: boolean Boolean if the design being validated is manual (manual_design=True) or in targetdb (manual_design=False) exp: int Exposure number for design in file. If exp=0, assumes only 1 exposure in the design file. if exp>0, then will choose exposure exp = 1 to N. RS_VERSION: str The version of targetdb to get designs from. If None will look for RS_VERSION enviornment variable, and if None exists will default to first entry in DesignToField for that design_id. offset_min_skybrightness: float Minimum skybrightness targets will be offset year: str The observation epoch to use for the robostrategy.obstime.ObsTime object in format YYYY-MM-DD Attributes ---------- design: dict contains all of the design inputs for Kaiju needed to validate a design. rg: Kaiju RobotGrid object Kaiju RobotGrid with the assignments from design. targets_unassigned: list catalogid of targets that could not be assigned due to assigned robot not being able to reach the assigned target. holeID_mapping: np.array Mapping between robotID and holeID. Index corresponds to robotID in robotID_mapping attrobute. robotID_mapping: np.array Mapping between robotID and holeID. Index corresponds to holeID in holeID_mapping attrobute. targets_collided: list catalogid of targets that could not be assigned due to assigned robot to assigned target resulting in a collision. valid_design: dictonary Same format as design dictonary, except this is validated design by Kaiju with collision/delock assignments removed. design_built: booleen True if design dictonary has been populated, False if not. hourAngle: float Hour angle of field center from coordio.utils.radec2wokxy. positionAngle_coordio: float position angle of field center from coordio.utils.radec2wokxy. design_errors: dict dictonary with errors for design. Created and filled after validation. """ def __init__(self, design_pk, obsTime=None, racen=None, deccen=None, position_angle=None, observatory=None, desmode_label=None, idtype='carton_to_target', catalogids=None, ra=None, dec=None, pmra=None, pmdec=None, delta_ra=None, delta_dec=None, epoch=None, holeID=None, obsWavelength=None, priority=None, carton_pk=None, category=None, magnitudes=None, design_file=None, manual_design=False, exp=0, RS_VERSION=None, offset_min_skybrightness=0., year='2025-08-15'): if idtype != 'catalogID' and idtype != 'carton_to_target': message = 'idtype must be catalogID or carton_to_target' raise MugatuError(message=message) self.design_pk = design_pk self.design = {} self.RS_VERSION = RS_VERSION # either set field params or pull from db is design # is in targetdb if manual_design: if design_file is None: self.racen = racen self.deccen = deccen self.position_angle = position_angle self.observatory = observatory self.desmode_label = desmode_label else: head = fits.open(design_file)[0].header desmode_labels = head['DESMODE'].split(' ') self.racen = head['RACEN'] self.deccen = head['DECCEN'] self.position_angle = head['PA'] self.observatory = head['obs'].strip().upper() self.desmode_label = desmode_labels[exp - 1] else: if self.RS_VERSION is None: self.RS_VERSION = os.getenv('RS_VERSION') if self.RS_VERSION is None: design_field_db = ( Design.select(Design.design_id, Design.design_mode, Field.racen, Field.deccen, Field.position_angle, Observatory.label) .join(DesignToField, on=(Design.design_id == DesignToField.design)) .join(Field, on=(DesignToField.field == Field.pk)) .join(Observatory, on=(Field.observatory == Observatory.pk)) .where(Design.design_id == self.design_pk)) else: design_field_db = ( Design.select(Design.design_id, Design.design_mode, Field.racen, Field.deccen, Field.position_angle, Observatory.label) .join(DesignToField, on=(Design.design_id == DesignToField.design)) .join(Field, on=(DesignToField.field == Field.pk)) .join(Observatory, on=(Field.observatory == Observatory.pk)) .switch(Field) .join(Version, on=(Field.version == Version.pk)) .where((Design.design_id == self.design_pk) & (Version.plan == self.RS_VERSION))) if len(design_field_db) == 0: flag = 'Design not in set RS_VERSION.' warnings.warn(flag, MugatuWarning) design_field_db = ( Design.select(Design.design_id, Design.design_mode, Field.racen, Field.deccen, Field.position_angle, Observatory.label) .join(DesignToField, on=(Design.design_id == DesignToField.design)) .join(Field, on=(DesignToField.field == Field.pk)) .join(Observatory, on=(Field.observatory == Observatory.pk)) .where(Design.design_id == self.design_pk)) self.racen = design_field_db.objects()[0].racen self.deccen = design_field_db.objects()[0].deccen self.position_angle = design_field_db.objects()[0].position_angle self.observatory = design_field_db.objects()[0].label self.desmode_label = design_field_db.objects()[0].design_mode.label if self.observatory == 'LCO' and self.position_angle < 180: flag = ('Design position angle outside of reachable position' 'at LCO. Please set 180 < PA < 360.') warnings.warn(flag, MugatuDesignWarning) # should these be catalogids or carton_to_target? self.idtype = idtype self.catalogids = catalogids self.ra = ra self.dec = dec self.pmra = pmra self.pmdec = pmdec self.delta_ra = delta_ra self.delta_dec = delta_dec self.epoch = epoch self.holeID = holeID self.obsWavelength = obsWavelength self.priority = priority self.carton_pk = carton_pk self.category = category self.magnitudes = magnitudes self.design_file = design_file self.manual_design = manual_design self.exp = exp if obsTime is None: ot = obstime.ObsTime(observatory=self.observatory.lower(), date=year) self.obsTime = coordio.time.Time(ot.nominal(lst=self.racen)).jd else: self.obsTime = obsTime # set dummy value for collision for now # this may want to be a input, not sure the standard here # initialize robotGrid # check wth Mike about stepsize? if self.observatory == 'APO': self.rg = kaiju.robotGrid.RobotGridAPO(stepSize=0.1) else: self.rg = kaiju.robotGrid.RobotGridLCO(stepSize=0.1) self.holeID_mapping = np.zeros(500, dtype='<U10') self.robotID_mapping = np.zeros(500, dtype=int) for i, robotID in enumerate(self.rg.robotDict): self.holeID_mapping[i] = self.rg.robotDict[robotID].holeID self.robotID_mapping[i] = robotID # this is in Conor's test, I'm not quite sure what it does # but without paths wont generate for k in self.rg.robotDict.keys(): self.rg.homeRobot(k) # for rID in self.rg.robotDict: # robot = self.rg.getRobot(rID) # robot.setXYUniform() self.targets_unassigned = [] self.targets_collided = [] self.valid_design = {} self.design_built = False self.offset_min_skybrightness = offset_min_skybrightness
[docs] def calculate_offsets(self): """ Calculate the offsets for which an algorithmic offset value was requested """ if 'bright' in self.desmode_label: boss_mag_lim = modes[self.desmode_label].bright_limit_targets['BOSS'][:, 0] lunation = 'bright' skybrightness = 1.0 else: boss_mag_lim = modes[self.desmode_label].bright_limit_targets['BOSS'][:, 0] lunation = 'dark' skybrightness = 0.35 apogee_mag_lim = modes[self.desmode_label].bright_limit_targets['APOGEE'][:, 0] ev_boss = (self.design['obsWavelength'] == 'BOSS') res = object_offset(self.design['magnitudes'][ev_boss, :], boss_mag_lim, lunation, 'Boss', self.observatory, fmagloss=fmagloss, can_offset=self.design['offset'][ev_boss], skybrightness=skybrightness, offset_min_skybrightness=self.offset_min_skybrightness) self.design['delta_ra'][ev_boss] = res[0] self.design['delta_dec'][ev_boss] = res[1] self.design['offset_flag'][ev_boss] = res[2] ev_apogee = (self.design['obsWavelength'] == 'APOGEE') res = object_offset(self.design['magnitudes'][ev_apogee, :], apogee_mag_lim, lunation, 'Apogee', self.observatory, fmagloss=fmagloss, can_offset=self.design['offset'][ev_apogee], skybrightness=skybrightness, offset_min_skybrightness=self.offset_min_skybrightness) self.design['delta_ra'][ev_apogee] = res[0] self.design['delta_dec'][ev_apogee] = res[1] self.design['offset_flag'][ev_apogee] = res[2]
[docs] def radec_to_xy(self, ev): """ transorm radec to wok xy Parameters --------- ev: eval eval term to ignore unassigned fibers Returns ------- fieldWarn: boolean Warning if any of the transformations should be eyed with suspicion. """ with warnings.catch_warnings(record=True) as w: radVel = (np.zeros(len(self.design['ra_off'][ev]), dtype=np.float64) + 1.e-4) parallax = (np.zeros(len(self.design['ra_off'][ev]), dtype=np.float64) + 1.e-4) res = radec2wokxy( ra=self.design['ra_off'][ev], dec=self.design['dec_off'][ev], coordEpoch=Time(self.design['epoch'][ev], format='decimalyear').jd, waveName=np.array(list(map(lambda x: x.title(), self.design['obsWavelength'][ev]))), raCen=self.racen, decCen=self.deccen, obsAngle=self.position_angle, obsSite=self.observatory, obsTime=self.obsTime, pmra=self.design['pmra'][ev], pmdec=self.design['pmdec'][ev], parallax=parallax, radVel=radVel) self.design['x'][ev] = res[0] self.design['y'][ev] = res[1] fieldWarn = res[2] self.hourAngle = res[3] self.positionAngle_coordio = res[4] if len(w) > 0: for wm in w: if 'iauPmsafe return' in wm.message.args[0]: flag = ('Coordio xy coordinates converted ' 'should be eyed with suspicion.') warnings.warn(flag, MugatuDesignWarning) return fieldWarn
[docs] def holeID_to_robotID(self, holeID, return_ind=True): """ return the robotID for a given holeID based on current kaiju mapping """ ind = np.where(self.holeID_mapping == holeID)[0][0] robotID = self.robotID_mapping[ind] if return_ind: return robotID, ind else: return robotID
[docs] def build_design_db(self): """ Populate the design dictonary for design in targetdb. Notes ----- This method will build a design from the db if manual_design=False """ if self.manual_design: flag = 'Building database design with manual_design=True flag' warnings.warn(flag, MugatuWarning) # initialize dict for the design # not using None or nan for no assignments # using -1 (for int) and -9999.99 (for float) for None assignment self.design['design_pk'] = self.design_pk self.design['desmode_label'] = self.desmode_label self.design['catalogID'] = np.zeros(500, dtype=np.int64) - 1 self.design['robotID'] = np.zeros(500, dtype=np.int64) - 1 self.design['holeID'] = np.zeros(500, dtype='<U10') self.design['obsWavelength'] = np.zeros(500, dtype='<U6') self.design['priority'] = np.zeros(500, dtype=int) - 1 self.design['carton_pk'] = np.zeros(500, dtype=int) - 1 self.design['category'] = np.zeros(500, dtype='<U20') self.design['ra'] = np.zeros(500, dtype=float) - 9999.99 self.design['dec'] = np.zeros(500, dtype=float) - 9999.99 self.design['delta_ra'] = np.zeros(500, dtype=float) - 9999.99 self.design['delta_dec'] = np.zeros(500, dtype=float) - 9999.99 self.design['offset'] = np.zeros(500, dtype=bool) self.design['offset_flag'] = np.zeros(500, dtype=int) self.design['epoch'] = np.zeros(500, dtype=float) - 9999.99 self.design['ra_off'] = np.zeros(500, dtype=float) - 9999.99 self.design['dec_off'] = np.zeros(500, dtype=float) - 9999.99 self.design['pmra'] = np.zeros(500, dtype=float) - 9999.99 self.design['pmdec'] = np.zeros(500, dtype=float) - 9999.99 self.design['x'] = np.zeros(500, dtype=float) - 9999.99 self.design['y'] = np.zeros(500, dtype=float) - 9999.99 self.design['magnitudes'] = np.zeros((500, 10), dtype=float) - 9999.99 # need to add wokHole to query when in db (not there now) # I need to test this when v05 is up, im unsure about Joins design_targ_db = ( Assignment.select(Target.catalogid, Hole.holeid, Instrument.label, CartonToTarget.priority, Target.ra, Target.dec, CartonToTarget.carton, Carton.pk.alias('carton_pk'), CartonToTarget.pk, Magnitude.g, Magnitude.r, Magnitude.i, Magnitude.z, Magnitude.bp, Magnitude.gaia_g, Magnitude.rp, Magnitude.j, Magnitude.h, Magnitude.k, CartonToTarget.delta_ra, CartonToTarget.delta_dec, CartonToTarget.can_offset, Target.pmra, Target.pmdec, Target.epoch, Category.label.alias('cat_lab')) .join(Hole) .switch(Assignment) .join(Instrument) .switch(Assignment) .join(CartonToTarget) .join(Target) .switch(CartonToTarget) .join(Magnitude, JOIN.LEFT_OUTER) .switch(CartonToTarget) .join(Carton, JOIN.LEFT_OUTER) .join(Category, JOIN.LEFT_OUTER) .where(Assignment.design_id == self.design_pk)) for d in design_targ_db.objects(): # assign to index that corresponds to fiber assignment # index should match length of arrays robotID, pos_ind = self.holeID_to_robotID(d.holeid) if self.idtype == 'carton_to_target': self.design['catalogID'][pos_ind] = d.pk else: self.design['catalogID'][pos_ind] = d.catalogid self.design['robotID'][pos_ind] = robotID self.design['holeID'][pos_ind] = d.holeid # design['wokHoleID'][i] = design_targ_db[i] self.design['obsWavelength'][pos_ind] = d.label # catch targets with no assigned priority try: self.design['priority'][pos_ind] = d.priority except AttributeError: self.design['priority'][pos_ind] = -1 self.design['carton_pk'][pos_ind] = d.carton_pk self.design['category'][pos_ind] = d.cat_lab self.design['ra'][pos_ind] = d.ra self.design['dec'][pos_ind] = d.dec self.design['delta_ra'][pos_ind] = d.delta_ra self.design['delta_dec'][pos_ind] = d.delta_dec self.design['offset'][pos_ind] = d.can_offset self.design['pmra'][pos_ind] = d.pmra self.design['pmdec'][pos_ind] = d.pmdec self.design['epoch'][pos_ind] = d.epoch self.design['magnitudes'][pos_ind][0] = d.g self.design['magnitudes'][pos_ind][1] = d.r self.design['magnitudes'][pos_ind][2] = d.i self.design['magnitudes'][pos_ind][3] = d.z self.design['magnitudes'][pos_ind][4] = d.bp self.design['magnitudes'][pos_ind][5] = d.gaia_g self.design['magnitudes'][pos_ind][6] = d.rp self.design['magnitudes'][pos_ind][7] = d.j self.design['magnitudes'][pos_ind][8] = d.h self.design['magnitudes'][pos_ind][9] = d.k # set nan pm tp zero self.design['pmra'][np.isnan(self.design['pmra'])] = 0. self.design['pmdec'][np.isnan(self.design['pmdec'])] = 0. # calculate offsets for targets that request algorithm offsets self.calculate_offsets() # here convert ra/dec to x/y based on field/time of observation # I think I need to add inertial in here at some point, # dont see this in targetdb though ev = eval("(self.design['ra'] != -9999.99)") res = _offset_radec(ra=self.design['ra'][ev], dec=self.design['dec'][ev], delta_ra=self.design['delta_ra'][ev], delta_dec=self.design['delta_dec'][ev]) self.design['ra_off'][ev], self.design['dec_off'][ev] = res fieldWarn = self.radec_to_xy(ev) if np.any(fieldWarn): flag = ('Coordio xy coordinates converted ' 'should be eyed with suspicion.') warnings.warn(flag, MugatuDesignWarning) self.design_built = True return
[docs] def build_design_manual(self): """ Populate the design dictonary for manual design. Notes ----- This function creates a manual design whether it is from user inputted catalogids, or if it is a FITS file if manual_design=True. """ if not self.manual_design: flag = 'Building manual design with manual_design=False flag' warnings.warn(flag, MugatuWarning) # initialize dict for the design # for manual design, dont make arrays since length # will be defined by user self.design['design_pk'] = self.design_pk self.design['desmode_label'] = self.desmode_label if self.design_file is None: # manual design with targets in targetdb self.design['catalogID'] = self.catalogids self.design['obsWavelength'] = self.obsWavelength self.design['priority'] = self.priority self.design['carton_pk'] = self.carton_pk self.design['category'] = self.category if self.ra is None: for i in range(len(self.design['catalogID'])): targ_db = Target.get(catalogid=self.design['catalogID'][i]) self.design['ra'][i] = targ_db.ra self.design['dec'][i] = targ_db.dec self.design['pmra'][i] = targ_db.pmra self.design['pmdec'][i] = targ_db.pmdec self.design['delta_ra'][i] = targ_db.delta_ra self.design['delta_dec'][i] = targ_db.delta_dec else: self.design['ra'] = self.ra self.design['dec'] = self.dec self.design['pmra'] = self.pmra self.design['pmdec'] = self.pmdec self.design['delta_ra'] = self.delta_ra self.design['delta_dec'] = self.delta_dec self.design['epoch'] = self.epoch self.design['robotID'] = self.robotID self.design['holeID'] = np.zeros(len(self.design['robotID']), dtype='<U10') for i in range(len(self.design['robotID'])): self.design['holeID'][i] = self.holeID_mapping[self.design['robotID'][i] - 1] self.design['magnitudes'] = self.magnitudes else: # manual design from flat file # get header with field info head = fits.open(self.design_file)[0].header desmode_labels = head['DESMODE'].split(' ') # association between catalogid and instrument design_inst = fits.open(self.design_file)[1].data # catalogid assignment for each fiber design = fits.open(self.design_file)[2].data # grab obs info self.racen = head['RACEN'] self.deccen = head['DECCEN'] self.position_angle = head['PA'] self.observatory = head['obs'].strip().upper() self.desmode_label = desmode_labels[self.exp - 1] # grab assignment info if self.exp == 0: roboIDs = design['robotID'] holeIDs = design['holeID'] else: roboIDs = design['robotID'][:, self.exp - 1] holeIDs = design['holeID'][:, self.exp - 1] if self.idtype == 'catalogID': self.design['catalogID'] = (design_inst['catalogid'] [roboIDs != -1]) else: self.design['catalogID'] = (design_inst['carton_to_target_pk'] [roboIDs != -1]) self.design['ra'] = design_inst['ra'][roboIDs != -1] self.design['dec'] = design_inst['dec'][roboIDs != -1] self.design['delta_ra'] = design_inst['delta_ra'][roboIDs != -1] self.design['delta_dec'] = design_inst['delta_dec'][roboIDs != -1] self.design['offset'] = design_inst['can_offset'][roboIDs != -1] self.design['pmra'] = design_inst['pmra'][roboIDs != -1] self.design['pmdec'] = design_inst['pmdec'][roboIDs != -1] self.design['epoch'] = design_inst['epoch'][roboIDs != -1] self.design['holeID'] = holeIDs[roboIDs != -1] self.design['robotID'] = np.zeros(len(self.design['holeID']), dtype=int) - 1 for i in range(len(self.design['holeID'])): self.design['robotID'][i] = self.holeID_to_robotID( self.design['holeID'][i], return_ind=False) self.design['obsWavelength'] = (design_inst['fiberType'] [roboIDs != -1]) self.design['priority'] = design_inst['priority'][roboIDs != -1] self.design['carton_pk'] = design_inst['carton_pk'][roboIDs != -1] self.design['category'] = design_inst['category'][roboIDs != -1] self.design['magnitudes'] = design_inst['magnitude'][roboIDs != -1] # check magnitudes array size if self.design['magnitudes'].shape[1] != 10: message = 'Magntiude array must have size of Nx10' raise MugatuDesignError(message=message) # set nan pm tp zero self.design['pmra'][np.isnan(self.design['pmra'])] = 0. self.design['pmdec'][np.isnan(self.design['pmdec'])] = 0. # make empty x,y arrays self.design['x'] = (np.zeros(len(self.design['catalogID']), dtype=float) - 9999.99) self.design['y'] = (np.zeros(len(self.design['catalogID']), dtype=float) - 9999.99) self.design['ra_off'] = (np.zeros(len(self.design['catalogID']), dtype=float) - 9999.99) self.design['dec_off'] = (np.zeros(len(self.design['catalogID']), dtype=float) - 9999.99) self.design['offset_flag'] = np.zeros(len(self.design['catalogID']), dtype=int) # calculate offsets for targets that request algorithm offsets self.calculate_offsets() # here convert ra/dec to x/y based on field/time of observation ev = eval("(self.design['ra'] != -9999.99)") res = _offset_radec(ra=self.design['ra'][ev], dec=self.design['dec'][ev], delta_ra=self.design['delta_ra'][ev], delta_dec=self.design['delta_dec'][ev]) self.design['ra_off'][ev], self.design['dec_off'][ev] = res fieldWarn = self.radec_to_xy(ev) if np.any(fieldWarn): flag = ('Coordio xy coordinates converted ' 'should be eyed with suspicion.') warnings.warn(flag, MugatuDesignWarning) self.design_built = True return
[docs] def design_to_RobotGrid(self): """ Add assignments to Kaiju RobotGrid. Notes ----- Adds targets to the kaiju.robotGrid.RobotGridFilledHex and assigns them to fibers based on the design dictonary. """ is_unassigned = False for i in range(len(self.design['x'])): if self.design['robotID'][i] != -1: if self.design['obsWavelength'][i] == 'BOSS': self.rg.addTarget(targetID=self.design['catalogID'][i], xyzWok=[self.design['x'][i], self.design['y'][i], POSITIONER_HEIGHT], priority=self.design['priority'][i], fiberType=kaiju.cKaiju.BossFiber) else: self.rg.addTarget(targetID=self.design['catalogID'][i], xyzWok=[self.design['x'][i], self.design['y'][i], POSITIONER_HEIGHT], priority=self.design['priority'][i], fiberType=kaiju.cKaiju.ApogeeFiber) for i in range(len(self.design['x'])): if self.design['robotID'][i] > 0: try: self.rg.assignRobot2Target(self.design['robotID'][i], self.design['catalogID'][i]) except RuntimeError: # this catches the fact that robot cant be # assigned to given fiber self.targets_unassigned.append(self.design['catalogID'][i]) is_unassigned = True if is_unassigned: flag = 'Some targets could not be assigned to fiber' warnings.warn(flag, MugatuDesignWarning) return
[docs] def decollide_grid(self): """ Check to see if any collisions in grid, and if so, decollide grid and record assignments that were removed """ # decollide the unassigned robots for robotID in self.rg.robotDict: if(self.rg.robotDict[robotID].isAssigned() is False): self.rg.decollideRobot(robotID) # need to still set alpha/beta # robot = self.rg.getRobot(robotID) # robot.setDestinationAlphaBeta(0, 180) if self.rg.getNCollisions() > 0: self.design_errors['no_collisions'] = False self.rg.decollideGrid() # check if de-collision was successful if not self.rg.getNCollisions() == 0: raise MugatuDesignError(message='Kaiju decollideGrid failed') flag = 'Some targets removed from design due to collisions' warnings.warn(flag, MugatuDesignWarning) # grab all of the targets removed due to collisions for i in self.rg.robotDict: fiber_idx = np.where(self.design['robotID'] == i)[0] if (self.rg.robotDict[i].assignedTargetID == -1 and len(fiber_idx) > 0): targ_remove = self.design['catalogID'][fiber_idx[0]] if targ_remove not in self.targets_unassigned: self.targets_collided.append(targ_remove) else: self.design_errors['no_collisions'] = True return
[docs] def designmode_validate(self, db_query_results_boss=None, db_query_results_apogee=None, desmode_manual=None): """ Check all designmode parameters for a design """ mode = DesignModeCheck(FPSDesign=self, desmode_label=self.desmode_label, db_query_results_boss=db_query_results_boss, db_query_results_apogee=db_query_results_apogee, desmode_manual=desmode_manual) mode.design_mode_check_all(verbose=False) self.design_errors['min_skies_boss'] = mode.n_skies_min_check['BOSS'] self.design_errors['min_skies_boss_metric'] = (mode .n_skies_min_check ['BOSS_metric']) if self.design_errors['min_skies_boss'] == False: flag = 'Design does not meet minimum BOSS skies for DesignMode' warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['min_skies_apogee'] = (mode .n_skies_min_check['APOGEE']) self.design_errors['min_skies_apogee_metric'] = (mode .n_skies_min_check ['APOGEE_metric']) if self.design_errors['min_skies_apogee'] == False: flag = 'Design does not meet minimum APOGEE skies for DesignMode' warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['fov_skies_boss'] = (mode .min_skies_fovmetric_check ['BOSS']) self.design_errors['fov_skies_boss_metric'] = (mode .min_skies_fovmetric_check ['BOSS_metric']) if self.design_errors['fov_skies_boss'] == False: flag = ('Design does not meet FOV criteria ' 'for BOSS skies for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['fov_skies_apogee'] = (mode .min_skies_fovmetric_check ['APOGEE']) self.design_errors['fov_skies_apogee_metric'] = (mode .min_skies_fovmetric_check ['APOGEE_metric']) if self.design_errors['fov_skies_apogee'] == False: flag = ('Design does not meet FOV criteria ' 'for APOGEE skies for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['min_stds_boss'] = mode.n_stds_min_check['BOSS'] self.design_errors['min_stds_boss_metric'] = (mode .n_stds_min_check ['BOSS_metric']) if self.design_errors['min_stds_boss'] == False: flag = 'Design does not meet minimum BOSS standards for DesignMode' warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['min_stds_apogee'] = mode.n_stds_min_check['APOGEE'] self.design_errors['min_stds_apogee_metric'] = (mode .n_stds_min_check ['APOGEE_metric']) if self.design_errors['min_stds_apogee'] == False: flag = ('Design does not meet minimum ' 'APOGEE standards for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['fov_stds_boss'] = (mode .min_stds_fovmetric_check ['BOSS']) self.design_errors['fov_stds_boss_metric'] = (mode .min_stds_fovmetric_check ['BOSS_metric']) if self.design_errors['fov_stds_boss'] == False: flag = ('Design does not meet FOV criteria ' 'for BOSS standards for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['fov_stds_apogee'] = (mode .min_stds_fovmetric_check ['APOGEE']) self.design_errors['fov_stds_apogee_metric'] = (mode .min_stds_fovmetric_check ['APOGEE_metric']) if self.design_errors['fov_stds_apogee'] == False: flag = ('Design does not meet FOV criteria ' 'for APOGEE standards for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['stds_mag_boss'] = np.all( mode.stds_mags_check['BOSS'][0][(self.design['catalogID'] != -1) & (self.design['category'] == 'standard_boss')]) self.design_errors['stds_mag_boss_metric'] = mode.stds_mags_check['BOSS_metric'] if self.design_errors['stds_mag_boss'] == False: flag = ('Design has BOSS standard ' 'assignments too bright for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['stds_mag_apogee'] = np.all( mode.stds_mags_check['APOGEE'][0][(self.design['catalogID'] != -1) & (self.design['category'] == 'standard_apogee')]) self.design_errors['stds_mag_apogee_metric'] = mode.stds_mags_check['APOGEE_metric'] if self.design_errors['stds_mag_apogee'] == False: flag = ('Design has APOGEE standard ' 'assignments too bright for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['sci_mag_boss'] = np.all( mode.bright_limit_targets_check['BOSS'][0][(self.design['catalogID'] != -1) & (self.design['category'] == 'science') & (self.design['obsWavelength'] == 'BOSS')]) self.design_errors['sci_mag_boss_metric'] = mode.bright_limit_targets_check['BOSS_metric'] if self.design_errors['sci_mag_boss'] == False: flag = ('Design has BOSS science assignments ' ' too bright for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['sci_mag_apogee'] = np.all( mode.bright_limit_targets_check['APOGEE'][0][(self.design['catalogID'] != -1) & (self.design['category'] == 'science') & (self.design['obsWavelength'] == 'APOGEE')]) self.design_errors['sci_mag_apogee_metric'] = (mode .bright_limit_targets_check ['APOGEE_metric']) if self.design_errors['sci_mag_apogee'] == False: flag = ('Design has APOGEE science assignments ' 'too bright for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['bright_neigh_boss'] = np.all( mode.bright_neighbor_check['BOSS'][0][mode.bright_neighbor_check['BOSS'][1] & mode.bright_neighbor_check['BOSS'][3]]) self.design_errors['bright_neigh_boss_metric'] = mode.bright_neighbor_check['BOSS_metric'] if self.design_errors['bright_neigh_boss'] == False: flag = ('Design has BOSS fibers too near ' 'bright source for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) self.design_errors['bright_neigh_apogee'] = np.all( mode.bright_neighbor_check['APOGEE'][0][mode.bright_neighbor_check['APOGEE'][1] & mode.bright_neighbor_check['APOGEE'][3]]) self.design_errors['bright_neigh_apogee_metric'] = (mode .bright_neighbor_check ['APOGEE_metric']) if self.design_errors['bright_neigh_apogee'] == False: flag = ('Design has APOGEE fibers too ' 'near bright source for DesignMode') warnings.warn(flag, MugatuDesignModeWarning) return
[docs] def bright_neigh_safety(self, db_query_results_boss=None, db_query_results_apogee=None, desmode_manual=None): """ Perform safety version of bright neighbor check on the design """ mode = DesignModeCheck(FPSDesign=self, desmode_label=self.desmode_label, db_query_results_boss=db_query_results_boss, db_query_results_apogee=db_query_results_apogee, desmode_manual=desmode_manual) bright_check_boss, hasFiber_boss, _, isassigned_boss = mode.bright_neighbors( instrument='BOSS',check_type='safety') bright_check_apogee, hasFiber_apogee, _, isassigned_apogee = mode.bright_neighbors( instrument='APOGEE', check_type='safety') if (len(bright_check_boss[~bright_check_boss & hasFiber_boss & isassigned_boss]) > 0 or len(bright_check_apogee[~bright_check_apogee & hasFiber_apogee & isassigned_apogee]) > 0): message = 'Bright Neighbor Safety Checked Failed,' message += (' %d BOSS and %d APOGEE fibers near bright sources' % (len(bright_check_boss[~bright_check_boss & hasFiber_boss & isassigned_boss]), len(bright_check_apogee[~bright_check_apogee & hasFiber_apogee & isassigned_apogee]))) raise MugatuDesignError(message=message) return
[docs] def RobotGrid_to_valid_design(self): """ Construct valid design from Kaiju Robotgrid """ # initialize dict for the validated design # not using None or nan for no assignments # using -1 (for int) and -9999.99 (for float) for None assignment self.valid_design['design_pk'] = self.design_pk self.valid_design['desmode_label'] = self.desmode_label self.valid_design['catalogID'] = np.zeros(500, dtype=np.int64) - 1 self.valid_design['robotID'] = np.zeros(500, dtype=np.int64) - 1 self.valid_design['holeID'] = np.zeros(500, dtype='<U10') self.valid_design['obsWavelength'] = np.zeros(500, dtype='<U6') self.valid_design['priority'] = np.zeros(500, dtype=int) - 1 self.valid_design['carton_pk'] = np.zeros(500, dtype=int) - 1 self.valid_design['category'] = np.zeros(500, dtype='<U20') self.valid_design['ra'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['dec'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['pmra'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['pmdec'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['delta_ra'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['delta_dec'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['offset'] = np.zeros(500, dtype=bool) self.valid_design['offset_flag'] = np.zeros(500, dtype=int) self.valid_design['ra_off'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['dec_off'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['epoch'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['x'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['y'] = np.zeros(500, dtype=float) - 9999.99 self.valid_design['magnitudes'] = np.zeros((500, 10), dtype=float) - 9999.99 for i, rid in enumerate(self.rg.robotDict): self.valid_design['catalogID'][i] = (self.rg.robotDict[rid] .assignedTargetID) if self.valid_design['catalogID'][i] != -1: self.valid_design['robotID'][i] = rid ind_hole = np.where(self.robotID_mapping == rid)[0][0] self.valid_design['holeID'][i] = self.holeID_mapping[ind_hole] # is below necessary? i dont know if decollide ever reassigns # or just removes cond = eval("self.design['catalogID'] == self.valid_design['catalogID'][i]") self.valid_design['obsWavelength'][i] = self.design['obsWavelength'][cond][0] self.valid_design['priority'][i] = self.design['priority'][cond][0] self.valid_design['carton_pk'][i] = self.design['carton_pk'][cond][0] self.valid_design['category'][i] = self.design['category'][cond][0] self.valid_design['ra'][i] = self.design['ra'][cond][0] self.valid_design['dec'][i] = self.design['dec'][cond][0] self.valid_design['pmra'][i] = self.design['pmra'][cond][0] self.valid_design['pmdec'][i] = self.design['pmdec'][cond][0] self.valid_design['delta_ra'][i] = self.design['delta_ra'][cond][0] self.valid_design['delta_dec'][i] = self.design['delta_dec'][cond][0] self.valid_design['offset'][i] = self.design['offset'][cond][0] self.valid_design['offset_flag'][i] = self.design['offset_flag'][cond][0] self.valid_design['ra_off'][i] = self.design['ra_off'][cond][0] self.valid_design['dec_off'][i] = self.design['dec_off'][cond][0] self.valid_design['epoch'][i] = self.design['epoch'][cond][0] self.valid_design['magnitudes'][i, :] = self.design['magnitudes'][cond][0, :] self.valid_design['x'][i] = self.rg.robotDict[rid].xPos self.valid_design['y'][i] = self.rg.robotDict[rid].yPos return
[docs] def validate_design(self, designmode=True, safety=True, db_query_results_boss=None, db_query_results_apogee=None, desmode_manual=None): """ Validate design for deadlocks and collisions using Kaiju. Parameters ---------- designmode: bool Check designmodes for the design. safety: bool Check for bright neighbors using carton of brightest Gaia/2MASS stars in targetdb. db_query_results_boss: dict Database query results for BOSS bright neighbor check. Each index of dict is a tuple of (ras, decs, mags, catalogids) with one index for designmode and the other safety. db_query_results_apogee: dict Database query results for APOGEE bright neighbor check. Each index of dict is a tuple of (ras, decs, mags, catalogids) with one index for designmode and the other safety. """ # make dict to store design errors that may come up self.design_errors = {} # build the design with all needed parameters if it has not been # built already (save time by doing this check) if not self.design_built: if self.manual_design: self.build_design_manual() else: self.build_design_db() # construct the Kaiju robotGrid self.design_to_RobotGrid() if len(self.targets_unassigned) > 0: self.design_errors['all_targets_assigned'] = False else: self.design_errors['all_targets_assigned'] = True # validate the design # de-collide the grid if collisions exist # and check for targets removed self.decollide_grid() # generate paths # self.rg.pathGen() # if self.rg.didFail: # raise MugatuError(message='Kaiju pathGen failed') if designmode: self.designmode_validate(db_query_results_boss=db_query_results_boss, db_query_results_apogee=db_query_results_apogee, desmode_manual=desmode_manual) # do the safety check for design if safety: self.bright_neigh_safety(db_query_results_boss=db_query_results_boss, db_query_results_apogee=db_query_results_apogee, desmode_manual=desmode_manual) # I imagine that the above step would manipulate the robogrid based on # collisions and deadlocks, so the below would take these Kaiju results # and put them back as a design object which will be the output of the # function self.RobotGrid_to_valid_design() # another note to add to this above design dictonary, is how will this # include paths? This seems important for manual designs and to send # to Jaeger return
[docs] def design_to_targetdb(self, cadence, fieldid, exposure): """ Write a validated esign to targetdb Parameters ---------- cadence: str Cadence label for the field. fieldid: int fieldid for the field. exposure: int The exposure of this set of designs, 0th indexed. Notes ----- Version above should allow for manual designs to be added under version = 'manual' to seperate them from robostrategy ingested designs """ if not self.manual_design: raise MugatuError(message='Can only write manual designs to targetdb') if len(self.valid_design) == 0: raise MugatuError(message='Need to validate design first') # create the manual field for the design make_design_field_targetdb(cadence=cadence, fieldid=fieldid, plan='manual', racen=self.racen, deccen=self.deccen, position_angle=self.position_angle, observatory=self.observatory, slots_exposures=None) # create dictonary for unique carton pks for idtype is catalogID if self.idtype == 'catalogID': cart_pks = {} targetdb_ver = {} for pk in np.unique(self.valid_design['carton_pk'][self.valid_design['catalogID'] != -1]): cart_pks[pk] = pk targetdb_ver[pk] = Carton.get(pk).version_pk else: cart_pks = None targetdb_ver = None # add the design to targetdb make_design_assignments_targetdb( plan='manual', fieldid=fieldid, exposure=exposure, field_exposure=field_exposure, desmode_label=self.desmode_label, catalogID=self.valid_design['catalogID'][self.valid_design['catalogID'] != -1], robotID=self.valid_design['robotID'][self.valid_design['catalogID'] != -1], holeID=self.valid_design['holeID'][self.valid_design['catalogID'] != -1], obsWavelength=self.valid_design['obsWavelength'][self.valid_design['catalogID'] != -1], carton=self.valid_design['carton_pk'][self.valid_design['catalogID'] != -1], observatory=self.observatory, targetdb_ver=targetdb_ver, instr_pks=None, cart_pks=cart_pks, fiber_pks=None, idtype=self.idtype) return
[docs] def design_to_opsdb(self, design): """ Write a validated design to opsdb Notes ----- I dont know what needs to be included in opsdb yet, need to look at the schema """ return