"""
Core utility definitions, classes, and functions
"""
import hashlib
import sys
from pathlib import Path
# import astropy.units as u
import numpy as np
# import pandas as pd
from astropy.time import Time
[docs]
def select_from(key, value, df):
"""
Select data where key=value.
Parameters
----------
key : str
The key value (SDFITS column name)
value : any
The value to match
df : `~pandas.DataFrame`
The DataFrame to search
Returns
-------
df : `~pandas.DataFrame`
The subselected DataFrame
"""
return df[(df[key] == value)]
[docs]
def indices_where_value_changes(colname, df):
"""
Find the `~pandas.DataFrame` indices where the value of the input column name changes.
Parameters
----------
colname : str
The column name to query.
df : `~pandas.DataFrame`
The DataFrame to search
Returns
-------
indices : ~numpy.ndarray
The indices of the Dataframe where `colname` changes value.
"""
# This is some super panda kung-fu.
# See https://stackoverflow.com/questions/48673046/get-index-where-value-changes-in-pandas-dataframe-column
if colname not in df:
raise KeyError(f"Column {colname} not in input DataFrame")
# df.shift() shifts the index by one, so we are then comparing df[N] to df[N-1]. This gets us
# a truth table of where values change. We filter on colname, then return a list of indices
# where the value is true. Finally, we squeeze out the empty dimensions of the np array.
return np.squeeze(df.ne(df.shift()).filter(items=[colname]).apply(lambda x: x.index[x].tolist()).values)
[docs]
def gbt_timestamp_to_time(timestamp):
"""Convert the GBT sdfits timestamp string format to
an :class:`~astropy.time.Time` object. GBT SDFITS timestamps have the form
YYYY_MM_DD_HH:MM:SS in UTC.
Parameters
----------
timestamp : str
The GBT format timestamp as described above.
Returns
-------
time : `~astropy.time.Time`
The time object
"""
# convert to ISO FITS format YYYY-MM-DDTHH:MM:SS(.SSS)
t = timestamp.replace("_", "-", 2).replace("_", "T")
return Time(t, scale="utc")
[docs]
def generate_tag(values, hashlen):
"""
Generate a unique tag based on input values. A hash object is
created from the input values using SHA256, and a hex representation is created.
The first `hashlen` characters of the hex string are returned.
Parameters
----------
values : array-like
The values to use in creating the hash object
hashlen : int, optional
The length of the returned hash string.
Returns
-------
tag : str
The hash string
"""
data = "".join(map(str, values))
hash_object = hashlib.sha256(data.encode())
unique_id = hash_object.hexdigest()
return unique_id[0:hashlen]
[docs]
def consecutive(data, stepsize=1):
"""Returns the indices of elements in `data`
separated by less than stepsize separated into
groups.
Parameters
----------
data : array
Array with values to split.
stepsize : int
Maximum separation between elements of `data`
to be considered a single group.
Returns
-------
groups : `~numpy.ndarray`
Array with values of `data` separated into groups.
"""
return np.split(data, np.where(np.diff(data) >= stepsize)[0] + 1)
[docs]
def sq_weighted_avg(a, axis=0, weights=None):
# @todo make a generic moment or use scipy.stats.moment
r"""Compute the mean square weighted average of an array (2nd moment).
:math:`v = \sqrt{\frac{\sum_i{w_i~a_i^{2}}}{\sum_i{w_i}}}`
Parameters
----------
a : `~numpy.ndarray`
The data to average
axis : int
The axis over which to average the data. Default: 0
weights : `~numpy.ndarray` or None
The weights to use in averaging. The weights array must be the
length of the axis over which the average is taken. Default:
`None` will use equal weights.
Returns
-------
average : `~numpy.ndarray`
The average along the input axis
"""
if weights is None:
w = np.ones_like(a)
else:
w = weights
v = np.sqrt(np.average(a * a, axis=axis, weights=w))
return v
[docs]
def get_project_root() -> Path:
"""
Returns the project root directory.
"""
return Path(__file__).parent.parent.parent.parent
[docs]
def get_project_testdata() -> Path:
"""
Returns the project testdata directory
"""
return get_project_root() / "testdata"
[docs]
def get_size(obj, seen=None):
"""Recursively finds size of objects.
See https://goshippo.com/blog/measure-real-size-any-python-object/
"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([get_size(v, seen) for v in obj.values()])
size += sum([get_size(k, seen) for k in obj.keys()])
elif hasattr(obj, "__dict__"):
size += get_size(obj.__dict__, seen)
elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)):
size += sum([get_size(i, seen) for i in obj])
return size
[docs]
def minimum_string_match(s, valid_strings):
"""
return the valid string from a list, given a minimum string input
Example: minimum_string_match('a',['alpha','beta','gamma'])
returns: 'alpha'
Parameters
----------
s : string
string to use for minimum match
valid_strings : list of strings
list of full strings to min match on
Returns
-------
string
matched string, if one is found.
Otherwise "None" is returned.
"""
n = len(valid_strings)
m = []
for i in range(n):
if valid_strings[i].find(s) == 0:
m.append(i)
if len(m) == 1:
return valid_strings[m[0]]
return None
[docs]
def uniq(seq):
"""Remove duplicates from a list while preserving order.
from http://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
"""
seen = set()
seen_add = seen.add
return [x for x in seq if x not in seen and not seen_add(x)]
[docs]
def keycase(d, case="upper"):
"""
Change the case of dictionary keys
Parameters
----------
d : dict
The input dictionary
case : str, one of 'upper', 'lower'
Case to change keys to The default is "upper".
Returns
-------
newDict : dict
A copy of the dictionary with keys changed according to `case`
"""
if case == "upper":
newDict = {k.upper(): v for k, v in d.items()}
elif case == "lower":
newDict = {k.lower(): v for k, v in d.items()}
return newDict