import datetime
import numbers
import warnings
from collections.abc import Sequence
from copy import deepcopy
import astropy.units as u
import numpy as np
import pandas as pd
from astropy.coordinates import Angle
from astropy.table import Table
from astropy.time import Time
from astropy.units.quantity import Quantity
from pandas import DataFrame
from ..log import logger
# from ..fits import default_sdfits_columns
from . import ALL_CHANNELS, abbreviate_to, generate_tag, keycase
default_aliases = {
"freq": "crval1",
"ra": "crval2",
"dec": "crval3",
"glon": "crval2",
"glat": "crval3",
"gallon": "crval2",
"gallat": "crval3",
"elevation": "elevatio",
"source": "object",
"pol": "plnum",
"subref": "subref_state", # subreflector state
}
# workaround to avoid circular import error in sphinx (and only sphinx)
def _default_sdfits_columns():
from ..fits import default_sdfits_columns
return default_sdfits_columns()
DEFAULT_COLUMN_WIDTH = 32 # char
DEFAULT_COLUMN_TYPE = f"<U{DEFAULT_COLUMN_WIDTH}"
[docs]
class SelectionBase(DataFrame):
"""This class is the base class for selection and flagging. Selection and flagging are both kinds of
data selection, so `SelectionBase` can encapsulate most necessary functionality. Derived classes
implement specific named methods e.g. select_channel, flag_channel that will simply call
the base class methods.
"""
def __init__(self, initobj, aliases=default_aliases, **kwargs):
if hasattr(initobj, "_index"): # it's an SDFITSLoad object
super().__init__(initobj._index)
DEFKEYS = list(initobj._index.keys())
else:
super().__init__(initobj) # it's a Selection or DataFrame
DEFKEYS = _default_sdfits_columns()
# adding attributes that are not columns will result
# in a UserWarning, which we can safely ignore.
warnings.simplefilter("ignore", category=UserWarning)
self._add_datetime_column()
self["CHAN"] = None
# if we want Selection to replace _index in sdfits
# construction this will have to change. if hasattr("_index") etc
self._idtag = ["ID", "TAG"]
# Add channel, timestamp, and number rows selected.
DEFKEYS.extend(["FITSINDEX", "CHAN", "UTC", "# SELECTED"])
# Add ID and TAG as the first columns.
DEFKEYS = self._idtag + DEFKEYS
# Remove duplicates.
DEFKEYS = sorted(set(DEFKEYS), key=DEFKEYS.index)
self._defkeys = DEFKEYS
# set up object types for the np.array
dt = np.full(len(DEFKEYS) - 1, np.dtype(DEFAULT_COLUMN_TYPE))
dt[0] = np.int32
# add number selected column which is an int
dt = np.insert(dt, len(dt), np.int32)
# ID is also an int
dt[0] = np.int32
self._deftypes = dt
self._make_table()
self._valid_coordinates = [
"RA",
"DEC",
"GALLON",
"GALLAT",
"GLON",
"GLAT",
"CRVAL2",
"CRVAL3",
]
self._selection_rules = {}
self._aliases = {}
self.alias(aliases)
self._channel_selection = None
self._flag_channel_selection = {} # used in Flag only
warnings.resetwarnings()
def _add_datetime_column(self):
"""
Add column to the selection/flag dataframe with a
representation of the SDFITS DATE-OBS which is a string,
as an `~np.datetime64`.
Returns
-------
None.
"""
# Do not add utc=True to this call, as later comparisons will not work.
self["UTC"] = pd.to_datetime(self["DATE-OBS"])
def _make_table(self):
"""Create the table for displaying the selection rules"""
self._table = Table(data=None, names=self._defkeys, dtype=self._deftypes)
for t in self._idtag:
self._table.add_index(t)
@property
def aliases(self):
"""
The aliases that may be used to refer to SDFITS columns.
Returns
-------
dict
The dictionary of aliases and SDFITS column names
"""
return self._aliases
[docs]
def alias(self, aliases):
"""
Alias a set of keywords to existing columns. Multiple aliases for
a single column are allowed, e.g.,
{ 'glon':'crval2', 'lon':'crval2'}
Parameters
----------
aliases : {}
The dictionary of keywords and column names
where the new alias is the key and
the column name is the value and , i.e., {alias:column}
Returns
-------
None.
Raises
------
ValueError if the column name is not recognized.
"""
self._check_keys(aliases.values())
for k, v in aliases.items():
self._alias(k, v)
def _alias(self, key, column):
"""
Alias a new keyword to an existing column, e.g..
to alias the SDFITS column 'CRVAL2' as 'RA':
`alias('RA','CRVAL2')`
The map is case insensitive, so `alias('ra', 'crval2')` also works.
Parameters
----------
key : str
The new keyword to use as an alias.
column : str
The existing SDFITS column name to alias
Returns
-------
None.
"""
self._aliases[key.upper()] = column.upper()
def _set_pprint_exclude_names(self):
"""Use `~astropy.Table.pprint_exclude_names` to set the list
columns that have no entries.
"""
if len(self._table) > 0:
emptycols = np.array(self._table.colnames)[
[np.all([self._table[k].data[i] == "" for i in range(len(self._table))]) for k in self._table.colnames]
]
self._table.pprint_exclude_names.set(emptycols)
[docs]
def columns_selected(self):
"""The names of any columns which were used in a selection rule
Returns
-------
colnames - set
A set of str column names. An empty set is returned if no selection rule has yet been made.
"""
if len(self._table) == 0:
return set()
self._set_pprint_exclude_names() # ensure __attributes__ gets set.
return (
set(self._table.colnames)
- set(self._table.meta["__attributes__"]["pprint_exclude_names"])
- set(["# SELECTED", "ID", "TAG"])
)
def _sanitize_input(self, key, value):
"""
Sanitize a key-value pair for.
Coordinate and boolean types are checked for.
Parameters
----------
key : str
Upper case key value.
value : any
The value for the key.
Returns
-------
sanitized_value : str
The sanitized value
"""
# @todo Allow minimum match str for key?
if key in self._aliases.keys():
key = self._aliases[key]
if key not in self:
raise KeyError(f"{key} is not a recognized column name.")
v = self._sanitize_boolean(key, value)
v = self._sanitize_coordinates(key, v)
return v
def _sanitize_boolean(self, key, value):
"""
Sanitize a boolean selection key-value pair. Boolean values
will be converted to "T" or "F" characters if the key is
"SIG" or "CAL".
Parameters
----------
key : str
Upper case key value.
value : bool or any
The value for the key.
Returns
-------
sanitized_value : str
The sanitized value. Either "T" or "F" if `value` is True or False, respectively.
Otherwise, return the input `value`.
"""
TF = {True: "T", False: "F"}
bool_char_cols = ["SIG", "CAL"]
if key in bool_char_cols and isinstance(value, bool):
value = TF[value]
return value
def _sanitize_coordinates(self, key, value):
"""
Sanitize a coordinate selection key-value pair. Coordinates will be
converted to floats before the final value is created.
Parameters
----------
key : str
Upper case key value.
value : float or `~astropy.coordinates.Angle` or str or any
The value for the key. It can be a single float,
a single Angle (Quantity), a tuple of Angles
(a1,a2,a3) or an Angle tuple, e.g., (n1,n2)*u.degree
Returns
-------
sanitized_value : float or `~astropy.coordinates.Angle` or `value` type
The sanitized value if it is a number, `~astropy.coordinates.Angle` or str.
If it is not any of those, then return the input `value`.
"""
if key not in self._valid_coordinates and key not in self.aliases:
return value
# note Quantity is derivative of np.ndarray, so
# need to filter that out in the recursive call.
# This is to handle (q1,q2) as a range.
# (n1,n2)*u.degree is handled below
if isinstance(value, (tuple, np.ndarray, list)) and not isinstance(value, Quantity):
return [self._sanitize_coordinates(key, v) for v in value]
if isinstance(value, numbers.Number):
a = Angle(value * u.degree)
else: # it should be a str or Quantity
a = Angle(value)
return a.degree
def _generate_tag(self, values, hashlen=9):
"""
Generate a unique tag based on row values. A hash object is
created from the input values using SHA256, and a hex representation is created.
The first `hashlen` characters of the hex string are returned.
Parameters
----------
values : array-like
The values to use in creating the hash object
hashlen : int, optional
The length of the returned hash string. The default is 9.
Returns
-------
tag : str
The hash string
"""
return generate_tag(values, hashlen)
@property
def _next_id(self) -> int:
"""
Get the next ID number in the table.
Returns
-------
id : int
The highest existing ID number plus one
"""
ls = len(self._table)
if ls == 0:
return 0
return max(self._table["ID"]) + 1
def _check_keys(self, keys):
"""
Check a dictionary for unrecognized keywords. This method is called in any select method to check inputs.
Parameters
----------
keys : list or array-like
Keyword arguments
Returns
-------
None.
Raises
------
KeyError
If one or more keywords are unrecognized
"""
# ignorekeys = ["PROPOSED_CHANNEL_RULE"]
unrecognized = []
ku = [k.upper() for k in keys]
# for k in ignorekeys:
# if k in ku:
# ku.remove(k)
for k in ku:
if k not in self and k not in self._aliases:
unrecognized.append(k)
if len(unrecognized) > 0:
raise KeyError(f"The following keywords were not recognized: {unrecognized}")
def _check_numbers(self, **kwargs):
self._check_type(
numbers.Number,
"Expected numeric value for these keywords but did not get a number",
**kwargs,
)
def _check_range(self, **kwargs):
bad = []
badtime = []
for k, v in kwargs.items():
ku = k.upper()
if not isinstance(v, (tuple, list, np.ndarray)):
raise ValueError(f"Invalid input for key {ku}={v}. Range inputs must be tuple or list.")
for a in v:
if a is not None:
if isinstance(a, Quantity):
a = self._sanitize_coordinates(ku, a)
try:
if ku == "UTC":
badtime = self._check_type(np.datetime64, "Expected np.datetime64", silent=True, **{ku: a})
else:
self._check_numbers(**{ku: a})
except ValueError:
bad.append(ku)
if len(bad) > 0 or len(badtime) > 0:
msg = "Expected"
a = " "
if len(badtime) > 0:
msg += f" np.datetime64 object for {badtime}"
a = " and "
if len(bad) > 0:
msg += a
msg += f"numeric value(s) for {bad} "
msg += " but did not get that."
raise ValueError(msg)
def _check_type(self, reqtype, msg, silent=False, **kwargs):
# @todo allow Quantities
"""
Check that a list of keyword arguments is all a specified type.
Parameters
----------
reqtype : type
The object type to check against, e.g. numbers.Number, str, etc
msg : str
The exception message to show if the inputs are not the specific reqtype
**kwargs : dict or key=value
Keyword arguments
Raises
------
ValueError
If one or more of the values is not numeric.
Returns
-------
None or, if silent is True,s a list of keywords that raised errors.
"""
# deal with potential arrays first by calling
# this method recursively on each array member.
kw = deepcopy(kwargs) # prevent concurrent modification
recursive_bad = []
for k, v in kwargs.items():
# if the input value is an array, then we want to
# check the type for each member of the array, but
# not raise an exception (silent=True) but collect
# the bad ones and pass them on.
if isinstance(v, (Sequence, np.ndarray)) and not isinstance(v, str):
bad = [self._check_type(reqtype, msg, **{k: x}, silent=True) for x in v]
# there's probably a smarter way to do this
for i in range(len(bad)):
if len(bad[i]) != 0:
recursive_bad.extend(bad[i])
kw.pop(k)
# now the main check
ku = np.ma.masked_array([k.upper() for k in kw.keys()])
ku.mask = np.array([isinstance(x, reqtype) for x in kw.values()])
if len(recursive_bad) != 0:
ku = np.ma.append(ku, recursive_bad)
if silent:
return list(ku[~ku.mask])
if not np.all(ku.mask):
raise ValueError(f"{msg}: {np.squeeze(ku[~ku.mask])}")
def _check_for_duplicates(self, df):
"""
Check that the user hasn't already added a rule matching this one
Parameters
----------
df : `~pandas.DataFrame`
The selection to check
Returns
-------
bool
True if a duplicate was found, False if not.
"""
for _id, s in self._selection_rules.items():
tag = self._table.loc[_id]["TAG"]
if s.equals(df):
tag = self._table.loc[_id]["TAG"]
warnings.warn( # noqa: B028
f"A rule that results in an identical selection has already been added: ID: {_id}, TAG:{tag}."
" Ignoring."
)
return True
return False
def _addrow(self, row, dataframe, tag=None, check=False):
"""
Common code to add a tagged row to the internal table after the selection has been created.
Should be called in select* methods.
Parameters
----------
row : dict
key, value pairs of the selection
dataframe : `~pandas.DataFrame`
The dataframe created by the selection.
tag : str, optional
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, call `_check_for_duplicates()` to see if a dataframe already implements this rule.
Returns
-------
None.
"""
if check:
if self._check_for_duplicates(dataframe):
return
if tag is not None:
row["TAG"] = tag
else:
gentag = []
# guarantee a unique seed by
# including relevant key=value, which will be unique
for k, v in row.items():
if v is not None and v != "":
gentag.append(f"{k}={v}")
row["TAG"] = self._generate_tag(gentag)
row["ID"] = self._next_id
row["# SELECTED"] = len(dataframe)
self._selection_rules[row["ID"]] = dataframe
for k, v in row.items():
row[k] = abbreviate_to(DEFAULT_COLUMN_WIDTH, v)
self._table.add_row(row)
# for some reason the table gets "unsorted" from its index
# resulting in issue #457
# so always do a sort (by primary index by default) after adding a rows
self._table.sort(self._idtag[0])
def _replace_time(self, **kwargs):
"""Replace astropy.Time and datetime.datetime objects in a kwargs list with numpy.datetime64 equivalent.
This is need because UTC is a datetime64 column but we want users to be able to input Time or datetime if desired
Parameters
---------
kwargs : dict
dictionary of keywords/values
Returns
------
dict of updated values.
"""
kc = kwargs.copy()
for k, v in kc.items():
if isinstance(v, Time):
kc[k] = v.datetime64
elif isinstance(v, datetime.datetime):
kc[k] = np.datetime64(v)
# @todo could probably do this with a clever recursive call to _replace_time.
elif isinstance(v, (Sequence, np.ndarray)) and not isinstance(v, str):
if isinstance(v, tuple):
v = list(v)
for i in range(len(v)):
if isinstance(v[i], Time):
v[i] = v[i].datetime64
elif isinstance(v[i], datetime.datetime):
v[i] = np.datetime64(v[i])
kc[k] = v
return kc
def _base_select(self, tag=None, check=False, **kwargs):
"""Add one or more exact selection/flag rules, e.g., `key1 = value1, key2 = value2, ...`
If `value` is array-like then a match to any of the array members will be selected/flagged.
Derived classes will call this method with their own specific name, i.e. `select` or `flag`.
Parameters
----------
tag : str
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, check that a previous selection does not give an identical result as this one.
key : str
The key (SDFITS column name or other supported key)
value : any
The value to select
Returns
-------
True if the selection resulted in a new rule, False if not (no data selected)
"""
# pop these before check_keys, which is intended to check SDFITS keywords
proposed_channel_rule = kwargs.pop("proposed_channel_rule", None)
# if called via _select_from_mixed_kwargs, then we want to merge all the
# selections
df = kwargs.pop("startframe", self)
self._check_keys(kwargs.keys())
# While not necessary for adding a row to a Table, ensuring the dict
# has keys for all Table columns improves the performance of Table._addrow.
row = dict.fromkeys(self._table.colnames, "")
single_value_queries = None
multi_value_queries = None
for k, v in list(kwargs.items()):
if v is None:
continue
ku = k.upper()
if ku in self._aliases:
ku = self._aliases[ku]
v = self._sanitize_input(ku, v)
# If a list is passed in, it must be composed of strings.
# Numeric lists are intepreted as ranges, so must be
# selected by user with select_range
if isinstance(v, (Sequence, np.ndarray)) and not isinstance(v, str):
query = None
for vv in v:
if ku == "UTC":
self._check_type(
np.datetime64,
"Expected np.datetime64 object but got something else.",
**{ku: vv},
)
# if it is a string, then OR them.
# e.g. object = ["NGC123", "NGC234"]
if isinstance(vv, str):
thisq = f'{ku} == "{vv}"'
else:
thisq = f"{ku} == {vv}"
if query is None:
query = thisq
else:
query += f"| {thisq}"
# for pd.merge to give the correct answer, we would
# need "inner" on the first one and "outer" on subsequent
# df = pd.merge(df, df[df[ku] == vv], how="inner")
if multi_value_queries is None:
multi_value_queries = f"({query})"
else:
multi_value_queries += f"&({query})"
else:
if isinstance(v, str):
thisq = f'{ku} == "{v}"'
else:
thisq = f"{ku} == {v}"
if single_value_queries is None:
single_value_queries = thisq
else:
single_value_queries += f"& {thisq}"
row[ku] = v
if multi_value_queries is not None and single_value_queries is not None:
query = f"{multi_value_queries} & {single_value_queries}"
elif multi_value_queries is None and single_value_queries is not None:
query = single_value_queries
elif multi_value_queries is not None and single_value_queries is None:
query = multi_value_queries
else:
warnings.warn("There was no data selection") # should never happen # noqa: B028
return False
df = df.query(query)
if df.empty:
warnings.warn("Your selection rule resulted in no data being selected. Ignoring.") # noqa: B028
return False
df.loc[:, "CHAN"] = proposed_channel_rule # this column is normally None so no need to check if None first.
self._addrow(row, df, tag, check=check)
return True
def _base_select_range(self, tag=None, check=False, **kwargs):
"""
Select a range of inclusive values for a given key(s).
e.g., `key1 = (v1,v2), key2 = (v3,v4), ...`
will select data `v1 <= data1 <= v2, v3 <= data2 <= v4, ... `
Upper and lower limits may be given by setting one of the tuple values
to None. e.g., `key1 = (None,v1)` for an upper limit `data1 <= v1` and
`key1 = (v1,None)` for a lower limit `data >=v1`. Lower
limits may also be specified by a one-element tuple `key1 = (v1,)`.
Parameters
----------
tag : str, optional
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, check that a previous selection does not give an identical result as this one.
key : str
The key (SDFITS column name or other supported key)
value : array-like
Tuple or list giving the lower and upper limits of the range.
Returns
-------
None.
"""
self._check_keys(kwargs.keys())
kwargs.update(self._replace_time(**kwargs))
self._check_range(**kwargs)
row = {}
df = self
for k, v in list(kwargs.items()):
ku = k.upper()
if ku in self._aliases:
ku = self._aliases[ku]
v = self._sanitize_input(ku, v)
# deal with a tuple quantity
if isinstance(v, Quantity):
v = v.value
vn = []
# deal with quantity inside a tuple.
for q in v:
# ultimately will need a map of
# desired units, so e.g. if
# GHz used, then the value is expressed in Hz
if isinstance(q, Quantity):
vn.append(q.value)
else:
vn.append(q)
v = vn
row[ku] = v
if len(v) == 2:
if v[0] is not None and v[1] is not None:
df = pd.merge(df, df[(df[ku] <= v[1]) & (df[ku] >= v[0])], how="inner")
elif v[0] is None: # upper limit given
df = pd.merge(df, df[(df[ku] <= v[1])], how="inner")
else: # lower limit given (v[1] is None)
df = pd.merge(df, df[(df[ku] >= v[0])], how="inner")
elif len(v) == 1: # lower limit given
df = pd.merge(df, df[(df[ku] >= v[0])], how="inner")
else:
raise Exception(f"Couldn't parse value tuple {v} for key {k} as a range.")
if df.empty:
warnings.warn("Your selection rule resulted in no data being selected. Ignoring.") # noqa: B028
return
self._addrow(row, df, tag)
def _base_select_within(self, tag=None, check=False, **kwargs):
"""
Select a value within a plus or minus for a given key(s).
e.g. `key1 = [value1,epsilon1], key2 = [value2,epsilon2], ...`
Will select data
`value1-epsilon1 <= data1 <= value1+epsilon1,`
`value2-epsilon2 <= data2 <= value2+epsilon2,...`
Parameters
----------
tag : str, optional
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, check that a previous selection does not give an identical result as this one.
key : str
The key (SDFITS column name or other supported key)
value : array-like
Tuple or list giving the value and epsilon
Returns
-------
None.
"""
# This is just a type of range selection.
kw = {}
for k, v in kwargs.items():
v1 = v[0] - v[1]
v2 = v[0] + v[1]
kw[k] = (v1, v2)
self._base_select_range(tag, **kw)
def _base_select_channel(self, channel, tag=None):
"""
Select channels and/or channel ranges. These are NOT used in :meth:`final`
but rather will be used to create a mask for calibration or
flagging. Single arrays/tuples will be treated as channel lists;
nested arrays will be treated as *inclusive* ranges. For instance:
``
# select channel 24
select_channel(24)
# selects channels 1 and 10
select_channel([1,10])
# selects channels 1 thru 10 inclusive
select_channel([[1,10]])
# select channel ranges 1 thru 10 and 47 thru 56 inclusive, and channel 75
select_channel([[1,10], [47,56], 75)])
# tuples also work, though can be harder for a human to read
select_channel(((1,10), [47,56], 75))
``
*Note* : channel numbers start at zero.
Parameters
----------
channel : number, or array-like
The channels to select
Returns
-------
None.
"""
# We don't want to get into trying to merge
# different, possibly exclusive, channel selections.
# This also avoids the side effect of using self to
# compute "# Selected" in _addrow
if self._channel_selection is not None:
raise Exception(
"You can only have one channel selection rule. Remove the old rule before creating a new one."
)
self._check_numbers(chan=channel)
if isinstance(channel, numbers.Number):
channel = [int(channel)]
self._channel_selection = channel
# we don't care if a selection selects the same channels. They are all pasted together in numpy later and
# never go through a DataFrame (which is why we pass in the dummy self)
self._addrow(
{"CHAN": abbreviate_to(DEFAULT_COLUMN_WIDTH, channel)},
dataframe=self,
tag=tag,
check=False,
)
# NB: using ** in doc here because `id` will make a reference to the
# python built-in function. Arguably we should pick a different
# keyword but 'id' is easy for user.
[docs]
def remove(self, id=None, tag=None):
"""Remove (delete) a selection rule(s).
You must specify either **id** or **tag** but not both. If there are
multiple rules with the same tag, they will all be deleted.
Parameters
----------
id : int
The ID number of the rule as displayed in `show()`
tag : str
An identifying tag by which the rule may be referred to later.
"""
if id is not None and tag is not None:
raise Exception("You can only specify one of id or tag")
if id is None and tag is None:
raise Exception("You must specify either id or tag")
if id is not None:
if id in self._selection_rules:
# We will assume that selection_rules and table
# have been kept in sync. The implementation
# should ensure this.
del self._selection_rules[id]
row = self._table.loc_indices["ID", id]
# there is only one row per ID
self._table.remove_row(row)
else:
raise KeyError(f"No ID = {id} found in this Selection")
else:
# need to find IDs of selection rules where TAG == tag.
# This will raise keyerror if tag not matched, so no need
# to raise our own, unless we want to change the messgae.
matching_indices = self._table.loc_indices["TAG", tag]
# raise KeyError(f"No TAG = {tag} found in this Selection")
matching = Table(self._table[matching_indices])
for i in matching["ID"]:
del self._selection_rules[i]
# self._selection_rules.pop(i, None) # also works
self._table.remove_rows(matching_indices)
[docs]
def clear(self):
"""Remove all selection rules"""
self._selection_rules = {}
self._flag_channel_selection = {}
self._make_table()
self._channel_selection = None
[docs]
def show(self):
"""
Print the current selection rules. Only columns with a rule are shown.
The first two columns are ID number a TAG string. Either of these may be used
to :meth:`remove` a row. The final column `# SELECTED` gives
the number of rows that a given rule selects from the original.
The :meth:`final` selection may be fewer rows because each selection rule
is logically OR'ed to create the final selection.
Returns
-------
None.
"""
self._set_pprint_exclude_names()
print(self._table)
@property
def final(self):
"""
Create the final selection. This is done by a logical AND of each
of the selection rules (specifically `pandas.merge(how='inner')`).
Returns
-------
final : DataFrame
The resultant selection from all the rules.
"""
return self.merge(how="inner")
[docs]
def merge(self, how, on=None):
"""
Merge selection rules using a specific
type of join.
Parameters
----------
how : {‘left’, ‘right’, ‘outer’, ‘inner’, ‘cross’}, no default.
The type of join to be performed. See :meth:`pandas.merge()`.
on: label or list
Column or index level names to join on. These must be found in both DataFrames.
If on is None and not merging on indexes then this defaults to the intersection
of the columns in both DataFrames.
Returns
-------
final : DataFrame
The resultant selection from all the rules.
"""
if len(self._selection_rules.values()) == 0:
# warnings.warn("Selection.merge(): upselecting now")
return DataFrame()
final = None
for df in self._selection_rules.values():
if final is None:
# need a deepcopy here in case there
# is only one selection rule, because
# we don't want to return a reference to the rule
# which the receiver might modify.
final = deepcopy(df)
else:
final = pd.merge(final, df, how=how, on=on)
return final
def _select_from_mixed_kwargs(self, **kwargs):
"""
Called by calibration routines which may be mixing channel selections
and exact selections, but **not** 'within' or 'range' selections.
Parameters
----------
**kwargs : dict
Keyword arguments. key=value as in the public selection methods.
Returns
-------
None.
"""
# get the tag if given or generate one if not
kwlist = list(kwargs.items())
tag = kwargs.pop("tag", self._generate_tag(kwlist))
if len(kwargs) == 0:
return # user gave no additional kwargs
if tag is None: # in case user did tag=None (facepalm)
tag = self._generate_tag(kwlist)
logger.debug(f"working TAG IS {tag}")
# in order to pop channel we need to check case insensitively
ukwargs = keycase(kwargs)
chan = ukwargs.pop("CHANNEL", None)
if chan is not None:
self._base_select_channel(chan, tag)
if len(ukwargs) != 0:
logger.debug(f"selection {ukwargs}")
self._base_select(**ukwargs, tag=tag)
def __deepcopy__(self, memo):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
setattr(result, k, deepcopy(v, memo))
result._table = self._table.copy()
return result
[docs]
def get(self, key):
"""Get the selection/flag rule by its ID
Parameters
----------
key : int
The ID value. See :meth:`show`.
Returns
-------
`~pandas.DataFrame`
The selection/flag rule
"""
return self._selection_rules[key]
[docs]
class Selection(SelectionBase):
"""This class contains the methods for creating rules to select data from an SDFITS object.
Data (rows) can be selected using any column name in the input SDFITS object.
Exact selection, range selection, upper/lower limit selection, and any-of selection
are all supported.
Users create *selection rules* by specifying keyword (SDFITS columns) and value(s) to be selected.
Briefly, the selection methods are:
:meth:`select` - Select exact values
:meth:`select_range` - Select ranges of values
:meth:`select_within` - Select a value +/- epsilon
:meth:`select_channel` - Select channels or ranges of channels
The Selection object maintains a DataFrame for each selection rule created by the user. The
:meth:`final` selection is the logical OR of these rules. Users can examine the current selections
with :meth:`show` which will show the current
rules and how many rows each rule selects from the unfiltered data.
Aliases of keywords are supported. The user may add an alias for an existing SDFITS column with :meth:`alias`. Some default :meth:`aliases` have been defined.
"""
[docs]
def select(self, tag=None, check=False, **kwargs):
"""Add one or more exact selection rules, e.g., `key1 = value1, key2 = value2, ...`
If `value` is array-like then a match to any of the array members will be selected.
For instance `select(object=['3C273', 'NGC1234'])` will select data for either of those
objects and `select(ifnum=[0,2])` will select IF number 0 or IF number 2.
Parameters
----------
tag : str
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, check that a previous selection does not give an identical result as this one.
key : str
The key (SDFITS column name or other supported key)
value : any
The value to select
"""
self._base_select(tag, check=check, **kwargs)
[docs]
def select_range(self, tag=None, **kwargs):
"""
Select a range of inclusive values for a given key(s).
e.g., `key1 = (v1,v2), key2 = (v3,v4), ...`
will select data `v1 <= data1 <= v2, v3 <= data2 <= v4, ... `
Upper and lower limits may be given by setting one of the tuple values
to None. e.g., `key1 = (None,v1)` for an upper limit `data1 <= v1` and
`key1 = (v1,None)` for a lower limit `data >=v1`. Lower
limits may also be specified by a one-element tuple `key1 = (v1,)`.
Parameters
----------
tag : str, optional
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
key : str
The key (SDFITS column name or other supported key)
value : array-like
Tuple or list giving the lower and upper limits of the range.
Returns
-------
None.
"""
self._base_select_range(tag, **kwargs)
[docs]
def select_within(self, tag=None, **kwargs):
"""
Select a value within a plus or minus for a given key(s).
e.g. `key1 = [value1,epsilon1], key2 = [value2,epsilon2], ...`
Will select data
`value1-epsilon1 <= data1 <= value1+epsilon1,`
`value2-epsilon2 <= data2 <= value2+epsilon2,...`
Parameters
----------
tag : str, optional
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
key : str
The key (SDFITS column name or other supported key)
value : array-like
Tuple or list giving the value and epsilon
Returns
-------
None.
"""
self._base_select_within(tag, **kwargs)
[docs]
def select_channel(self, chan, tag=None):
"""
Select channels and/or channel ranges. These are NOT used in :meth:`final`
but rather will be used to create a mask for calibration or
flagging. Single arrays/tuples will be treated as channel lists;
nested arrays will be treated as *inclusive* ranges. For instance:
``
# select channel 24
select_channel(24)
# selects channels 1 and 10
select_channel([1,10])
# selects channels 1 thru 10 inclusive
select_channel([[1,10]])
# select channel ranges 1 thru 10 and 47 thru 56 inclusive, and channel 75
select_channel([[1,10], [47,56], 75)])
# tuples also work, though can be harder for a human to read
select_channel(((1,10), [47,56], 75))
``
*Note* : channel numbers start at zero.
Parameters
----------
chan : number, or array-like
The channels to select
Returns
-------
None.
"""
self._base_select_channel(chan, tag)
[docs]
class Flag(SelectionBase):
"""This class contains the methods for creating rules to flag data from an SDFITS object.
Data (rows) can be selected for flagging using any column name in the input SDFITS object.
Exact selection, range selection, upper/lower limit selection, and any-of selection
are all supported.
Users create *flag rules* by specifying keyword (SDFITS columns) and value(s) to be flagged.
Briefly, the flag methods are:
:meth:`flag` - Flag exact values
:meth:`flag_range` - Flag ranges of values
:meth:`flag_within` - Flag a value +/- epsilon
:meth:`flag_channel` - Flag channels or ranges of channels
The Flag object maintains a DataFrame for each flag rule created by the user. The
:meth:`final` flag is the logical OR of these rules. Users can examine the current flags
with :meth:`show` which will show the current
rules and how many rows each rule selects for flagging from the unfiltered data.
The actual flags, which are per channel, are stored in the GBTFITSLoad object,
not in the Flag object. The Flag object just contains the flagging rules.
Aliases of keywords are supported. The user may add an alias for an existing SDFITS column with :meth:`alias`. Some default :meth:`aliases` have been defined.
GBTIDL Flags can be read in with :meth:`read`.
"""
[docs]
def flag(self, tag=None, check=False, **kwargs):
"""Add one or more exact flag rules, e.g., `key1 = value1, key2 = value2, ...`
If `value` is array-like then a match to any of the array members will be flagged.
For instance `flag(object=['3C273', 'NGC1234'])` will select data for either of those
objects and `flag(ifnum=[0,2])` will flag IF number 0 or IF number 2. Channels for selected data
can be flagged using keyword `channel`, e.g., `flag(object='MBM12',channel=[0,23])`
will flag channels 0 through 23 *inclusive* for object MBM12.
Parameters
----------
tag : str
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, check that a previous selection does not give an identical result as this one.
key : str
The key (SDFITS column name or other supported key)
value : any
The value to select
"""
chan = kwargs.pop("channel", None)
if chan is not None:
if isinstance(chan, numbers.Number):
chan = [int(chan)]
self._check_numbers(chan=chan)
if len(kwargs) == 0:
# The user only entered channel as a keyword, so just call flag_channel
self.flag_channel(channel=chan, tag=tag)
else:
# Select on the other kwargs then add channel to it.
# Since we are allowing the behavior that the user can select
# identical rows with different channel flags, we must
# use a 'proposed channel rule' because the "CHAN" column is not normally set
# before the _check_for_duplicates call inside _base_select.
# The selection rules dataframes are allowed to be identical if the
# the CHAN columns will be different.
if chan is None:
kwargs["proposed_channel_rule"] = ALL_CHANNELS
else:
kwargs["proposed_channel_rule"] = str(chan)
success = self._base_select(tag, check=check, **kwargs) # don't do this unless chan input is good.
if not success:
return
idx = len(self._table) - 1
if chan is not None:
cc = abbreviate_to(DEFAULT_COLUMN_WIDTH, chan)
self._table.loc[idx]["CHAN"] = cc
self._flag_channel_selection[idx] = chan
# self._selection_rules[idx]["CHAN"] = str(chan)
self._selection_rules[idx].loc[:, "CHAN"] = str(chan)
else:
self._flag_channel_selection[idx] = ALL_CHANNELS
self._selection_rules[idx].loc[:, "CHAN"] = ALL_CHANNELS
[docs]
def flag_channel(self, channel, tag=None, **kwargs):
"""
Flag channels and/or channel ranges for *all data*. These are NOT used in :meth:`final`
but rather will be used to create a mask for
flagging. Single arrays/tuples will be treated as *channel lists;
nested arrays will be treated as *inclusive* ranges. For instance:
```
# flag channel 24
flag_channel(24)
# flag channels 1 and 10
flag_channel([1,10])
# flags channels 1 thru 10 inclusive
flag_channel([[1,10]])
# flag channel ranges 1 thru 10 and 47 thru 56 inclusive, and channel 75
flag_channel([[1,10], [47,56], 75)])
# tuples also work, though can be harder for a human to read
flag_channel(((1,10), [47,56], 75))
```
*Note* : channel numbers start at zero
Parameters
----------
channel : number, or array-like
The channels to flag
Returns
-------
None.
"""
# okay to use base method because we are flagging all rows
self._base_select_channel(channel, tag, **kwargs)
idx = len(self._table) - 1
self._flag_channel_selection[idx] = channel
self._selection_rules[idx]["CHAN"] = str(channel)
self._channel_selection = None # unused for flagging
[docs]
def flag_range(self, tag=None, check=False, **kwargs):
"""Flag a range of inclusive values for a given key(s).
e.g., `key1 = (v1,v2), key2 = (v3,v4), ...`
will flag data `v1 <= data1 <= v2, v3 <= data2 <= v4, ... `
Upper and lower limits may be given by setting one of the tuple values
to None. e.g., `key1 = (None,v1)` for an upper limit `data1 <= v1` and
`key1 = (v1,None)` for a lower limit `data >=v1`. Lower
limits may also be specified by a one-element tuple `key1 = (v1,)`.
Parameters
----------
tag : str, optional
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, check that a previous selection does not give an identical result as this one.
key : str
The key (SDFITS column name or other supported key)
value : array-like
Tuple or list giving the lower and upper limits of the range.
Returns
-------
None.
"""
self._base_select_range(tag, check=check, **kwargs)
idx = len(self._table) - 1
self._flag_channel_selection[idx] = ALL_CHANNELS
self._selection_rules[idx]["CHAN"] = ALL_CHANNELS
self._channel_selection = None # unused for flagging
[docs]
def flag_within(self, tag=None, check=False, **kwargs):
"""
Flag a value within a plus or minus for a given key(s).
e.g. `key1 = [value1,epsilon1], key2 = [value2,epsilon2], ...`
Will select data
`value1-epsilon1 <= data1 <= value1+epsilon1,`
`value2-epsilon2 <= data2 <= value2+epsilon2,...`
Parameters
----------
tag : str, optional
An identifying tag by which the rule may be referred to later.
If None, a randomly generated tag will be created.
check : bool
If True, check that a previous selection does not give an identical result as this one.
key : str
The key (SDFITS column name or other supported key)
value : array-like
Tuple or list giving the value and epsilon
Returns
-------
None.
"""
self._base_select_within(tag, check=check, **kwargs)
idx = len(self._table) - 1
self._flag_channel_selection[idx] = ALL_CHANNELS
self._selection_rules[idx]["CHAN"] = ALL_CHANNELS
self._channel_selection = None # unused for flagging
[docs]
def read(self, fileobj, **kwargs):
"""Read a GBTIDL flag file and instantiate Flag object.
Parameters
----------
fileobj : str, file-like or `pathlib.Path`
File to read. If a file object, must be opened in a
readable mode.
**kwargs : dict
Extra keyword arguments to apply to the flag rule. (This is mainly for internal use.)
Returns
-------
None.
"""
# GBTIDL flag files two sections [header] and [flags]
# In the [header] section is information about file creation.
# The [flags] section containes the flag table
# The table has 10 columns. Its rows have vertical bar (|) separated columns, while
# the table header is separated by commas and begins with a #
# The columns are:
#
# ID - flag ID number, same as dysh's flag rule `id`
# RECNUM - range of the selected record numbers given as low:high inclusive
# SCAN - range of the selected scan numbers given as low:high inclusive
# INTNUM - range of the selected integration numbers given as low:high inclusive
# PLNUM - range of the selected polarization numbers given as low:high inclusive
# IFNUM - range of the selected IF numbers given as low:high inclusive
# BCHAN - beginning channel flagged (inclusive, starting from zero)
# ECHAN - end channel flagged (inclusive)
# IDSTRING - Reason for flagging, same as dysh's flag rule `tag`
#
# Numeric alues can be a single integer or comma-separated list of integers. If BCHAN and ECHAN
# are a comma-separated list then they must be pair up as [bchan_i,echan+i]
# A wildcard appears in a column if it had no selection (meaning all values were selected).
# Example file:
# [header]
# created = Wed Jan 5 16:48:37 2022
# version = 1.0
# created_by = sdfits
# [flags]
# #RECNUM,SCAN,INTNUM,PLNUM,IFNUM,FDNUM,BCHAN,ECHAN,IDSTRING
# *|6|*|*|2|0|3072|3072|VEGAS_SPUR
#
# It is possible there is a space after the *GBTIDL flag files can also indicate ranges with a : and can indicate upper or lower limits
# by not including a number. For instancer here is scan range 42 to 51 and channel range with
# lower limit of 2299
# *|20|42:51|*|*|*|2299|*|unspecified
# Because the table header and table row delimeters are different,
# Table.read() can't work. So construct it row by row.
f = open(fileobj, mode="r")
lines = f.read().splitlines() # gets rid of \n
f.close()
header = [
"RECNUM",
"SCAN",
"INTNUM",
"PLNUM",
"IFNUM",
"FDNUM",
"BCHAN",
"ECHAN",
"IDSTRING",
]
found_header = False
for l in lines[lines.index("[flags]") + 1 :]:
vdict = {}
if l.startswith("#"):
if not found_header:
# its the header
colnames = l[1:].split(",")
if colnames != header:
raise Exception(f"Column names {colnames} do not match expectated {header}")
found_header = True
else:
values = l.split("|")
for i, v in enumerate(values):
if v.strip() == "*":
continue
else:
if header[i] == "IDSTRING":
vdict[header[i]] = v
else:
# handle comma-separated lists
if "," in v:
vdict[header[i]] = [int(float(x)) for x in v.split(",")]
# handle colon-separated ranges by expanding into a comma-separated list.
elif ":" in v:
vdict[header[i]] = [int(float(x)) for x in range(*map(int, v.split(":")))] + [
int(v.split(":")[-1])
]
# handle single values
else:
vdict[header[i]] = int(float(v))
# our tag is gbtidl's idstring
tag = vdict.pop("IDSTRING", None)
bchan = vdict.pop("BCHAN", None)
echan = vdict.pop("ECHAN", None)
if bchan is not None and echan is not None:
if not isinstance(bchan, list):
bchan = [bchan]
bchan = [int(float(x)) for x in bchan]
if not isinstance(echan, list):
echan = [echan]
echan = [int(float(x)) for x in echan]
# pair up echan and bchan
vdict["channel"] = list(zip(bchan, echan, strict=False))
elif bchan is not None and echan is None:
if not isinstance(bchan, list):
bchan = [bchan]
bchan = [int(float(x)) for x in bchan]
echan = [2**25] * len(
bchan
) # Set to a large number so it effectively spans the whole range from `bchan`.
vdict["channel"] = tuple(zip(bchan, echan, strict=False))
elif bchan is None and echan is not None:
if not isinstance(echan, list):
echan = [echan]
echan = [int(float(x)) for x in echan]
bchan = [0] * len(echan)
vdict["channel"] = tuple(zip(bchan, echan, strict=False))
if kwargs is not None:
vdict.update(kwargs)
logger.debug(f"flag({tag=},{vdict})")
self.flag(tag=tag, check=False, **vdict)
self._table.sort(self._idtag[0])