Source code for dysh.util.selection

import numbers
import warnings
from collections.abc import Sequence
from copy import deepcopy

import astropy.units as u
import numpy as np
import pandas as pd
from astropy.coordinates import Angle
from astropy.table import Table
from astropy.time import Time
from astropy.units.quantity import Quantity
from pandas import DataFrame

# from ..fits import default_sdfits_columns
from . import gbt_timestamp_to_time, generate_tag, keycase

default_aliases = {
    "freq": "crval1",
    "ra": "crval2",
    "dec": "crval3",
    "glon": "crval2",
    "glat": "crval3",
    "gallon": "crval2",
    "gallat": "crval3",
    "elevation": "elevatio",
    "source": "object",
    "pol": "plnum",
    "subref": "subref_state",  # subreflector state
}


# workaround to avoid circular import error in sphinx (and only sphinx)
def _default_sdfits_columns():
    from ..fits import default_sdfits_columns

    return default_sdfits_columns()


[docs] class Selection(DataFrame): """This class contains the methods for creating rules to select data from an SDFITS object. Data (rows) can be selected using any column name in the input SDFITS object. Exact selection, range selection, upper/lower limit selection, and any-of selection are all supported. Users create *selection rules* by specifying keyword (SDFITS columns) and value(s) to be selected. Briefly, the selection methods are: :meth:`select` - Select exact values :meth:`select_range` - Select ranges of values :meth:`select_within` - Select a value +/- epsilon :meth:`select_channel` - Select channels or ranges of channels The Selection object maintains a DataFrame for each selection rule created by the user. The :meth:`final` selection is the logical OR of these rules. Users can examine the current selections with :meth:`show` which will show the current rules and how many rows each rule selects from the unfiltered data. Aliases of keywords are supported. The user may add an alias for an existing SDFITS column with :meth:`alias`. Some default :meth:`aliases` have been defined. """ def __init__(self, initobj, aliases=default_aliases, **kwargs): if hasattr(initobj, "_index"): # it's an SDFITSLoad object super().__init__(initobj._index, copy=True) DEFKEYS = list(initobj._index.keys()) else: super().__init__(initobj, copy=True) # it's a Selection or DataFrame DEFKEYS = _default_sdfits_columns() # adding attributes that are not columns will result # in a UserWarning, which we can safely ignore. warnings.simplefilter("ignore", category=UserWarning) self._add_utc_column() # if we want Selection to replace _index in sdfits # construction this will have to change. if hasattr("_index") etc self._idtag = ["ID", "TAG"] DEFKEYS.extend(["CHAN", "UTC", "# SELECTED"]) # add ID and TAG as the first columns for i in range(len(self._idtag)): DEFKEYS.insert(i, self._idtag[i]) # add channel, astropy-based timestamp, and number rows selected DEFKEYS = np.array(DEFKEYS) # set up object types for the np.array dt = np.array([str] * (len(DEFKEYS) - 1)) # add number selected column which is an int dt = np.insert(dt, len(dt), np.int32) # ID is also an int dt[0] = np.int32 self._defkeys = DEFKEYS self._deftypes = dt self._make_table() self._valid_coordinates = ["RA", "DEC", "GALLON", "GALLAT", "GLON", "GLAT", "CRVAL2", "CRVAL3"] self._selection_rules = {} self._aliases = {} self.alias(aliases) self._channel_selection = None warnings.resetwarnings() def _add_utc_column(self): """ Add column to the selection dataframe with a representation of the SDFITS UTC timestamp, which is a string, as an ~astropy.time.Time. Returns ------- None. """ self["UTC"] = [gbt_timestamp_to_time(q) for q in self.TIMESTAMP] def _make_table(self): """Create the table for displaying the selection rules""" self._table = Table(data=None, names=self._defkeys, dtype=self._deftypes) for t in self._idtag: self._table.add_index(t) @property def aliases(self): """ The aliases that may be used to refer to SDFITS columns. Returns ------- dict The dictionary of aliases and SDFITS column names """ return self._aliases
[docs] def alias(self, aliases): """ Alias a set of keywords to existing columns. Multiple aliases for a single column are allowed, e.g., { 'glon':'crval2', 'lon':'crval2'} Parameters ---------- aliases : {} The dictionary of keywords and column names where the new alias is the key and the column name is the value and , i.e., {alias:column} Returns ------- None. Raises ------ ValueError if the column name is not recognized. """ self._check_keys(aliases.values()) for k, v in aliases.items(): self._alias(k, v)
def _alias(self, key, column): """ Alias a new keyword to an existing column, e.g.. to alias the SDFITS column 'CRVAL2' as 'RA': `alias('RA','CRVAL2')` The map is case insensitive, so `alias('ra', 'crval2')` also works. Parameters ---------- key : str The new keyword to use as an alias. column : str The existing SDFITS column name to alias Returns ------- None. """ self._aliases[key.upper()] = column.upper() def _set_pprint_exclude_names(self): """Use `~astropy.Table.pprint_exclude_names` to set the list columns that have no entries. """ if len(self._table) > 0: emptycols = np.array(self._table.colnames)[ [np.all([self._table[k].data[i] == "" for i in range(len(self._table))]) for k in self._table.colnames] ] self._table.pprint_exclude_names.set(emptycols) def _sanitize_input(self, key, value): """ Sanitize a key-value pair for. Coordinate types are checked for. Parameters ---------- key : str upper case key value value : any The value for the key Returns ------- sanitized_value : str The sanitized value """ # @todo Allow minimum match str for key? if key in self._aliases.keys(): key = self._aliases[key] if key not in self: raise KeyError(f"{key} is not a recognized column name.") v = self._sanitize_coordinates(key, value) # deal with Time here or later? self._check_for_disallowed_chars(key, value) return v def _sanitize_coordinates(self, key, value): """ Sanitize a coordinate selection key-value pair. Coordinates will be converted to floats before the final value is created. Parameters ---------- key : str upper case key value value : any The value for the key. It can be a single float, a single Angle (Quantity), a tuple of Angles (a1,a2,a3) or an Angle tuple, e.g., (n1,n2)*u.degree Returns ------- sanitized_value : str The sanitized value. """ if key not in self._valid_coordinates and key not in self.aliases: return value # note Quantity is derivative of np.ndarray, so # need to filter that out in the recursive call. # This is to handle (q1,q2) as a range. # (n1,n2)*u.degree is handled below if isinstance(value, (tuple, np.ndarray, list)) and not isinstance(value, Quantity): return [self._sanitize_coordinates(key, v) for v in value] if isinstance(value, numbers.Number): a = Angle(value * u.degree) else: # it should be a str or Quantity a = Angle(value) return a.degree def _check_for_disallowed_chars(self, key, value): # are there any? coordinates will already # be transformed to decimal degrees pass def _generate_tag(self, values, hashlen=9): """ Generate a unique tag based on row values. A hash object is created from the input values using SHA256, and a hex representation is created. The first `hashlen` characters of the hex string are returned. Parameters ---------- values : array-like The values to use in creating the hash object hashlen : int, optional The length of the returned hash string. The default is 9. Returns ------- tag : str The hash string """ return generate_tag(values, hashlen) @property def _next_id(self) -> int: """ Get the next ID number in the table. Returns ------- id : int The highest existing ID number plus one """ ls = len(self._table) if ls == 0: return 0 return max(self._table["ID"]) + 1 def _check_keys(self, keys): """ Check a dictionary for unrecognized keywords. This method is called in any select method to check inputs. Parameters ---------- keys : list or array-like Keyword arguments Returns ------- None. Raises ------ KeyError If one or more keywords are unrecognized """ unrecognized = [] ku = [k.upper() for k in keys] for k in ku: if k not in self and k not in self._aliases: unrecognized.append(k) # print("KU, K", ku, k) if len(unrecognized) > 0: raise KeyError(f"The following keywords were not recognized: {unrecognized}") def _check_numbers(self, **kwargs): self._check_type(numbers.Number, "Expected numeric value for these keywords but did not get a number", **kwargs) def _check_range(self, **kwargs): bad = [] badtime = [] for k, v in kwargs.items(): ku = k.upper() # print(ku) if not isinstance(v, (tuple, list, np.ndarray)): raise ValueError(f"Invalid input for key {ku}={v}. Range inputs must be tuple or list.") for a in v: if a is not None: if isinstance(a, Quantity): a = self._sanitize_coordinates(ku, a) try: if ku == "UTC": badtime = self._check_type(Time, "Expected Time", silent=True, **{ku: a}) # print("BADTIME ", badtime) else: self._check_numbers(**{ku: a}) except ValueError: bad.append(ku) if len(bad) > 0 or len(badtime) > 0: msg = "Expected" a = " " if len(badtime) > 0: msg += f" Time object for {badtime}" a = " and " if len(bad) > 0: msg += a msg += f"numeric value(s) for {bad} " msg += " but did not get that." raise ValueError(msg) def _check_type(self, reqtype, msg, silent=False, **kwargs): # @todo allow Quantities """ Check that a list of keyword arguments is all a specified type. Parameters ---------- reqtype : type The object type to check against, e.g. numbers.Number, str, etc msg : str The exception message to show if the inputs are not the specific reqtype **kwargs : dict or key=value Keyword arguments Raises ------ ValueError If one or more of the values is not numeric. Returns ------- None or, if silent is True,s a list of keywords that raised errors. """ # deal with potential arrays first by calling # this method recursively on each array member. kw = deepcopy(kwargs) # prevent concurrent modification recursive_bad = [] for k, v in kwargs.items(): # if the input value is an array, then we want to # check the type for each member of the array, but # not raise an exception (silent=True) but collect # the bad ones and pass them on. if isinstance(v, (Sequence, np.ndarray)) and not isinstance(v, str): bad = [self._check_type(reqtype, msg, **{k: x}, silent=True) for x in v] # there's probably a smarter way to do this for i in range(len(bad)): if len(bad[i]) != 0: recursive_bad.extend(bad[i]) kw.pop(k) # now the main check ku = np.ma.masked_array([k.upper() for k in kw.keys()]) ku.mask = np.array([isinstance(x, reqtype) for x in kw.values()]) if len(recursive_bad) != 0: ku = np.ma.append(ku, recursive_bad) if silent: return list(ku[~ku.mask]) if not np.all(ku.mask): raise ValueError(f"{msg}: {np.squeeze(ku[~ku.mask])}") def _check_for_duplicates(self, df): """ Check that the user hasn't already added a rule matching this one Parameters ---------- df : ~pandas.DataFrame The selection to check Returns ------- bool True if a duplicate was found, False if not. """ # Raises # ------ # Exception # If an identical rule (DataFrame) has already been added. for _id, s in self._selection_rules.items(): if s.equals(df): # print(s, df) tag = self._table.loc[_id]["TAG"] # raise Exception( warnings.warn( f"A rule that results in an identical selection has already been added: ID: {_id}, TAG:{tag}." " Ignoring." ) return True # ) return False def _addrow(self, row, dataframe, tag=None): """ Common code to add a tagged row to the internal table after the selection has been created. Should be called in select* methods. Parameters ---------- row : dict key, value pairs of the selection dataframe : ~pandas.DataFrame The dataframe created by the selection. tag : str, optional An identifying tag by which the rule may be referred to later. If None, a randomly generated tag will be created. Returns ------- None. """ if self._check_for_duplicates(dataframe): return if tag is not None: row["TAG"] = tag else: gentag = [] # guarantee a unique seed by # including relevant key=value, which will be unique for k, v in row.items(): if v is not None and v != "": gentag.append(f"{k}={v}") row["TAG"] = self._generate_tag(gentag) row["ID"] = self._next_id row["# SELECTED"] = len(dataframe) self._selection_rules[row["ID"]] = dataframe self._table.add_row(row)
[docs] def select(self, tag=None, **kwargs): """Add one or more exact selection rules, e.g., `key1 = value1, key2 = value2, ...` If `value` is array-like then a match to any of the array members will be selected. For instance `select(object=['3C273', 'NGC1234'])` will select data for either of those objects and `select(ifnum=[0,2])` will select IF number 0 or IF number 2. Parameters ---------- tag : str An identifying tag by which the rule may be referred to later. If None, a randomly generated tag will be created. key : str The key (SDFITS column name or other supported key) value : any The value to select """ # @todo ?? MAYBE allow chan(nel) in here, e.g. # chan = kwargs.pop(chan,None) # if chan is not None: # self.select_channel(chan,tag=tag) # self._check_keys(kwargs.keys()) row = {} # if called via _select_from_mixed_kwargs, then we want to merge all the # selections df = kwargs.pop("startframe", self) for k, v in list(kwargs.items()): ku = k.upper() if ku in self._aliases: ku = self._aliases[ku] v = self._sanitize_input(ku, v) # If a list is passed in, it must be composed of strings. # Numeric lists are intepreted as ranges, so must be # selected by user with select_range if isinstance(v, (Sequence, np.ndarray)) and not isinstance(v, str): # print(ku, v) query = None for vv in v: if ku == "UTC": self._check_type( Time, "Expected Time object but got something else.", **{ku: vv}, ) # if it is a string, then OR them. # e.g. object = ["NGC123", "NGC234"] if isinstance(vv, str): thisq = f'{ku} == "{vv}"' else: thisq = f"{ku} == {vv}" if query is None: query = thisq else: query += f"| {thisq}" # for pd.merge to give the correct answer, we would # need "inner" on the first one and "outer" on subsequent # df = pd.merge(df, df[df[ku] == vv], how="inner") # print("final query ", query) df = df.query(query) else: df = pd.merge(df, df[df[ku] == v], how="inner") row[ku] = str(v) if df.empty: warnings.warn("Your selection rule resulted in no data being selected. Ignoring.") return self._addrow(row, df, tag)
# return df
[docs] def select_range(self, tag=None, **kwargs): """ Select a range of inclusive values for a given key(s). e.g., `key1 = (v1,v2), key2 = (v3,v4), ...` will select data `v1 <= data1 <= v2, v3 <= data2 <= v4, ... ` Upper and lower limits may be given by setting one of the tuple values to None. e.g., `key1 = (None,v1)` for an upper limit `data1 <= v1` and `key1 = (v1,None)` for a lower limit `data >=v1`. Lower limits may also be specified by a one-element tuple `key1 = (v1,)`. Parameters ---------- tag : str, optional An identifying tag by which the rule may be referred to later. If None, a randomly generated tag will be created. key : str The key (SDFITS column name or other supported key) value : array-like Tuple or list giving the lower and upper limits of the range. Returns ------- None. """ # @todo ?? MAYBE allow chan(nel) in here, e.g. # chan = kwargs.pop(chan,None) # if chan is not None: # self.select_channel(chan,tag=tag) self._check_keys(kwargs.keys()) self._check_range(**kwargs) row = {} df = self for k, v in list(kwargs.items()): ku = k.upper() if ku in self._aliases: ku = self._aliases[ku] v = self._sanitize_input(ku, v) # print(f"{ku}={v}") # deal with a tuple quantity if isinstance(v, Quantity): v = v.value vn = [] # deal with quantity inside a tuple. for q in v: # ultimately will need a map of # desired units, so e.g. if # GHz used, then the value is expressed in Hz if isinstance(q, Quantity): vn.append(q.value) else: vn.append(q) v = vn row[ku] = str(v) if len(v) == 2: if v[0] is not None and v[1] is not None: df = pd.merge(df, df[(df[ku] <= v[1]) & (df[ku] >= v[0])], how="inner") elif v[0] is None: # upper limit given df = pd.merge(df, df[(df[ku] <= v[1])], how="inner") else: # lower limit given (v[1] is None) df = pd.merge(df, df[(df[ku] >= v[0])], how="inner") elif len(v) == 1: # lower limit given df = pd.merge(df, df[(df[ku] >= v[0])], how="inner") else: raise Exception(f"Couldn't parse value tuple {v} for key {k} as a range.") if df.empty: warnings.warn("Your selection rule resulted in no data being selected. Ignoring.") return self._addrow(row, df, tag)
[docs] def select_within(self, tag=None, **kwargs): """ Select a value within a plus or minus for a given key(s). e.g. `key1 = [value1,epsilon1], key2 = [value2,epsilon2], ...` Will select data `value1-epsilon1 <= data1 <= value1+epsilon1,` `value2-epsilon2 <= data2 <= value2+epsilon2,...` Parameters ---------- tag : str, optional An identifying tag by which the rule may be referred to later. If None, a randomly generated tag will be created. key : str The key (SDFITS column name or other supported key) value : array-like Tuple or list giving the value and epsilon Returns ------- None. """ # This is just a type of range selection. kw = {} for k, v in kwargs.items(): v1 = v[0] - v[1] v2 = v[0] + v[1] kw[k] = (v1, v2) self.select_range(tag, **kw)
[docs] def select_channel(self, chan, tag=None): """ Select channels and/or channel ranges. These are NOT used in :meth:`final` but rather will be used to create a mask for calibration or flagging. Single arrays/tuples will be treated as channel lists; nested arrays will be treated as ranges, for instance `` # selects channels 1 and 10 select_channel([1,10]) # selects channels 1 thru 10 inclusive select_channel([[1,10]]) # select channel ranges 1 thru 10 and 47 thru 56 inclusive, and channel 75 select_channel([[1,10], [47,56], 75)]) # tuples also work, though can be harder for a human to read select_channel(((1,10), [47,56], 75)) `` Parameters ---------- chan : number, or array-like The channels to select Returns ------- None. """ # We don't want to get into trying to merge # different, possibly exclusive, channel selections. # This also avoids the side effect of using self to # compute "# Selected" in _addrow if self._channel_selection is not None: raise Exception( "You can only have one channel selection rule. Remove the old rule before creating a new one." ) self._check_numbers(chan=chan) self._channel_selection = chan self._addrow({"CHAN": str(chan)}, dataframe=self, tag=tag)
# NB: using ** in doc here because `id` will make a reference to the # python built-in function. Arguably we should pick a different # keyword but 'id' is easy for user.
[docs] def remove(self, id=None, tag=None): """Remove (delete) a selection rule(s). You must specify either **id** or **tag** but not both. If there are multiple rules with the same tag, they will all be deleted. Parameters ---------- id : int The ID number of the rule as displayed in `show()` tag : str An identifying tag by which the rule may be referred to later. """ if id is not None and tag is not None: raise Exception("You can only specify one of id or tag") if id is None and tag is None: raise Exception("You must specify either id or tag") if id is not None: if id in self._selection_rules: # We will assume that selection_rules and table # have been kept in sync. The implementation # should ensure this. del self._selection_rules[id] row = self._table.loc_indices["ID", id] # there is only one row per ID self._table.remove_row(row) else: raise KeyError(f"No ID = {id} found in this Selection") else: # need to find IDs of selection rules where TAG == tag. # This will raise keyerror if tag not matched, so no need # to raise our own, unless we want to change the messgae. matching_indices = self._table.loc_indices["TAG", tag] # raise KeyError(f"No TAG = {tag} found in this Selection") matching = Table(self._table[matching_indices]) for i in matching["ID"]: del self._selection_rules[i] # self._selection_rules.pop(i, None) # also works self._table.remove_rows(matching_indices)
[docs] def clear(self): """Remove all selection rules""" self._selection_rules = {} self._make_table()
[docs] def show(self): """ Print the current selection rules. Only columns with a rule are shown. The first two columns are ID number a TAG string. Either of these may be used to :meth:remove a row. The final column `# SELECTED` gives the number of rows that a given rule selects from the original. The :meth:final selection may be fewer rows because each selection rule is logically OR'ed to create the final selection. Returns ------- None. """ self._set_pprint_exclude_names() print(self._table)
@property def final(self): """ Create the final selection. This is done by a logical AND of each of the selection rules (specifically `pandas.merge(how='inner')`). Returns ------- final : DataFrame The resultant selection from all the rules. """ return self.merge(how="inner")
[docs] def merge(self, how, on=None): """ Merge selection rules using a specific type of join. Parameters ---------- how : {‘left’, ‘right’, ‘outer’, ‘inner’, ‘cross’}, no default. The type of join to be performed. See :meth:`pandas.merge()`. on: label or list Column or index level names to join on. These must be found in both DataFrames. If on is None and not merging on indexes then this defaults to the intersection of the columns in both DataFrames. Returns ------- final : DataFrame The resultant selection from all the rules. """ if len(self._selection_rules.values()) == 0: # warnings.warn("Selection.merge(): upselecting now") return DataFrame() final = None for df in self._selection_rules.values(): if final is None: # need a deepcopy here in case there # is only one selection rule, because # we don't want to return a reference to the rule # which the receiver might modify. final = deepcopy(df) else: final = pd.merge(final, df, how=how, on=on) return final
def _select_from_mixed_kwargs(self, **kwargs): """ Called by calibration routines which may be mixing channel selections and exact selections, but **not** 'within' or 'range' selections. Parameters ---------- **kwargs : dict Keyword arguments. key=value as in the public selection methods. Returns ------- None. """ # get the tag if given or generate one if not tag = kwargs.pop("tag", self._generate_tag(kwargs)) debug = kwargs.pop("debug", False) if len(kwargs) == 0: return # user gave no additional kwargs if tag is None: # in case user did tag=None (facepalm) tag = self._generate_tag(kwargs) if debug: print(f"working TAG IS {tag}") # in order to pop channel we need to check case insensitively ukwargs = keycase(kwargs) chan = ukwargs.pop("CHANNEL", None) if chan is not None: self.select_channel(chan, tag) if len(ukwargs) != 0: if debug: print(f"selection {ukwargs}") self.select(**ukwargs, tag=tag) def __deepcopy__(self, memo): warnings.simplefilter("ignore", category=UserWarning) cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): setattr(result, k, deepcopy(v, memo)) warnings.resetwarnings() return result