Source code for dysh.spectra.core

"""
Core functions for spectral data.
"""

import warnings

import astropy.units as u
import numpy as np
from astropy.modeling.fitting import LevMarLSQFitter, LinearLSQFitter
from astropy.modeling.polynomial import Chebyshev1D, Hermite1D, Legendre1D, Polynomial1D
from specutils import SpectralRegion
from specutils.fitting import fit_continuum

from ..coordinates import veltofreq
from ..util import minimum_string_match


# @todo: allow data to be SpectrumList or array of Spectrum
[docs] def average(data, axis=0, weights=None): """Average a group of spectra or scans. Parameters ---------- data : `~numpy.ndarray` The spectral data, typically with shape (nspect,nchan). axis : int The axis over which to average the data. Default axis=0 will return the average spectrum if shape is (nspect,nchan) weights : `~numpy.ndarray` The weights to use in averaging. These might typically be system temperature based. The weights array must be the length of the axis over which the average is taken. Default: None will use equal weights Returns ------- average : `~numpy.ndarray` The average along the input axis """ # Spectra that are blanked will have all channels set to NaN # indices = ~np.isnan(data) # Find indices with ANY spectra (rows) that have a NaN or Inf # goodindices = ~np.ma.fix_invalid(data).mask.any(axis=1) # c = data[goodindices] # if len(c) == 0: # return np.nan # Find indices that have any spectra with all channels = NaN # badindices = np.where(np.isnan(data).all(axis=1)) goodindices = find_non_blanks(data) if weights is not None: return np.average(data[goodindices], axis, weights[goodindices]) else: return np.average(data[goodindices], axis, weights)
[docs] def find_non_blanks(data): """ Finds the indices of blanked integrations. Parameters ---------- data : `~numpy.ndarray` The spectral data, typically with shape (nspect,nchan). Returns ------- blanks : `~numpy.ndarray` Array with indices of blanked integrations. """ return np.where(~np.isnan(data).all(axis=1))
[docs] def exclude_to_region(exclude, refspec, fix_exclude=False): """Convert an exclude list to a list of ~specutuls.SpectralRegion. Parameters ---------- exclude : list of 2-tuples of int or `~astropy.units.Quantity`, or `~specutils.SpectralRegion` List of region(s) to exclude from the fit. The tuple(s) represent a range in the form [lower,upper], inclusive. Examples: One channel-based region: >>> [11,51] Two channel-based regions: >>> [(11,51),(99,123)] One `~astropy.units.Quantity` region: >>> [110.198*u.GHz,110.204*u.GHz]. One compound `~specutils.SpectralRegion`: >>> SpectralRegion([(110.198*u.GHz,110.204*u.GHz),(110.196*u.GHz,110.197*u.GHz)]). refspec: `~spectra.spectrum.Spectrum` The reference spectrum whose spectral axis will be used when converting between exclude and axis units (e.g. channels to GHz). fix_exclude: bool If True, fix exclude regions that are out of bounds of the specctral axis to be within the spectral axis. Default:False Returns ------- regionlist : list of `~specutil.SpectralRegion` A list of `~specutil.SpectralRegion` corresponding to `exclude` with units of the `refspec.spectral_axis`. """ regionlist = [] p = refspec sa = refspec.spectral_axis if exclude is not None: regionlist = [] # a single SpectralRegion was given if isinstance(exclude, SpectralRegion): b = exclude.bounds if b[0] < sa[0] or b[1] > sa[1]: msg = f"Exclude limits {b} are not fully within the spectral axis {sa}" raise Exception(msg) regionlist.append(exclude) # list of int or Quantity or SpectralRegion was given else: # if user provided a single list, we have to # add another set of brackets so we an iterate. # If SpectralRegion took a list argument, we wouldn't # have to do this. if len(np.shape(exclude[0])) == 0: exclude = [exclude] # NB: we are assuming that a SpectralAxis is always [lower...upper]. Is this true??? for pair in exclude: if type(pair[0]) == int: # convert channel to spectral axis units lastchan = len(sa) - 1 msg = f"Exclude limits {pair} are not fully within the spectral axis [0,{lastchan}]." if pair[0] < 0 or pair[1] > lastchan: if fix_exclude: msg += f" Setting upper limit to {lastchan}." pair[1] = lastchan warnings.warn(msg) else: raise Exception(msg) pair = [sa[pair[0]], sa[pair[1]]] # if it is already a spectral region no additional # work is needed # @todo we should test that the SpectralRegion is not out of bounds if isinstance(pair[0], SpectralRegion): b = pair[0].bounds if b[0] < sa[0] or b[1] > sa[1]: msg = f"Exclude limits {pair} are not fully within the spectral axis {p.spectral_axis}" raise Exception(msg) regionlist.append(pair) else: # it is a Quantity that may need conversion to spectral_axis units q = [pair[0].value, pair[1].value] * pair[0].unit if q.unit.is_equivalent("km/s"): veldef = p.meta.get("VELDEF", None) if veldef is None: raise KeyError("Input spectrum has no VELDEF in header, can't convert to frequency units.") pair = veltofreq(q, p.rest_value, veldef) # offset = p.rest_value - p.radial_velocity.to(sa.unit, equivalencies=p.equivalencies) else: pair[0] = pair[0].to(sa.unit, equivalencies=p.equivalencies) pair[1] = pair[1].to(sa.unit, equivalencies=p.equivalencies) # Ensure test is with sorted [lower,upper] pair = sorted(pair) salimits = sorted([sa[0], sa[-1]]) if pair[0] < salimits[0] or pair[1] > salimits[-1]: msg = ( f"Exclude limits {pair} are not fully within the spectral axis" f" {[salimits[0],salimits[-1]]}." ) if fix_exclude: msg += f" Setting upper limit to {p.spectral_axis[-1]}." pair[1] = sa[-1] warnings.warn(msg) else: raise Exception(msg) sr = SpectralRegion(pair[0], pair[1]) regionlist.append(sr) return regionlist
[docs] def region_to_axis_indices(region, refspec): """ Parameters ---------- region : `~specutils.SpectralRegion` refspec: `~spectra.spectrum.Spectrum` The reference spectrum whose spectral axis will be used when converting between exclude and axis units (e.g., channels to GHz). Returns ------- indices : 2-tuple of int The array indices in `refspec` corresponding to `region.bounds` """ # Spectral region to indices in an input spectral axis. # @todo needs to work for multiple spectral regions? or just loop outside this call p = refspec sa = refspec.spectral_axis if region.lower.unit != sa.unit: # @todo if they are conformable, then allow it and convert raise Exception(f"Axis units of region [{region.lower.unit}] and refspec [{sa.unit}] not identical") b = [x.value for x in region.bounds] indices = np.abs(np.subtract.outer(sa.value, b)).argmin(0) return indices
[docs] def exclude_to_mask(exclude, refspec): # set a mask based on an exclude region # mask ~ exclude_to_indices(exclude_to_region()) pass
[docs] def baseline(spectrum, order, exclude=None, **kwargs): """Fit a baseline for a spectrum Parameters ---------- spectrum : `~spectra.spectrum.Spectrum` The input spectrum order : int The order of the polynomial series, a.k.a. baseline order exclude : list of 2-tuples of int or `~astropy.units.Quantity`, or `~specutils.SpectralRegion` List of region(s) to exclude from the fit. The tuple(s) represent a range in the form [lower,upper], inclusive. Examples: One channel-based region: >>> [11,51] Two channel-based regions: >>> [(11,51),(99,123)] One `~astropy.units.Quantity` region: >>> [110.198*u.GHz,110.204*u.GHz]. One compound `~specutils.SpectralRegion`: >>> SpectralRegion([(110.198*u.GHz,110.204*u.GHz),(110.196*u.GHz,110.197*u.GHz)]). Default: no exclude region model : str One of 'polynomial', 'chebyshev', 'legendre', 'hermite'. Default: 'chebyshev' fitter : `~astropy.fitting._FitterMeta` The fitter to use. Default: `~astropy.fitter.LinearLSQFitter` (with `calc_uncertaintes=True`). Be care when choosing a different fitter to be sure it is optimized for this problem. Returns ------- models : list of `~astropy.modeling.Model` The list of models that contain the fitted model parameters. See `~specutuls.fitting.fit_continuum`. """ kwargs_opts = { #'show': False, "model": "chebyshev", "fitter": LinearLSQFitter(calc_uncertainties=True), "fix_exclude": False, "exclude_action": "replace", # {'replace','append', None} } kwargs_opts.update(kwargs) available_models = { "chebyshev": Chebyshev1D, "hermite": Hermite1D, "legendre": Legendre1D, "polynomial": Polynomial1D, } model = minimum_string_match(kwargs_opts["model"], list(available_models.keys())) if model == None: raise ValueError(f'Unrecognized input model {kwargs["model"]}. Must be one of {list(available_models.keys())}') selected_model = available_models[model](degree=order) _valid_exclude_actions = ["replace", "append", None] if kwargs_opts["exclude_action"] not in _valid_exclude_actions: raise ValueError( f'Unrecognized exclude region action {kwargs["exclude_region"]}. Must be one of {_valid_exclude_actions}' ) fitter = kwargs_opts["fitter"] # print(f"MODEL {model} FITTER {fitter}") p = spectrum if np.isnan(p.data).all(): # @todo handle masks return None # or raise exception if exclude is not None: regionlist = exclude_to_region(exclude, spectrum, fix_exclude=kwargs_opts["fix_exclude"]) if kwargs_opts["exclude_action"] == "replace": p._exclude_regions = regionlist elif kwargs_opts["exclude_action"] == "append": p._exclude_regions.extend(regionlist) regionlist = p._exclude_regions else: # use the spectrum's preset exclude regions if they # exist (they will be a list of SpectralRegions or None) regionlist = p._exclude_regions print(f"EXCLUDING {regionlist}") return fit_continuum(spectrum=p, model=selected_model, fitter=fitter, exclude_regions=regionlist)
[docs] def mean_tsys(calon, caloff, tcal, mode=0, fedge=0.1, nedge=None): """ Get the system temperature from the neighboring calon and caloff, which reflect the state of the noise diode. We define an extra way to set the edge size, nedge, if you prefer to use number of edge channels instead of the inverse fraction. This implementation recreates GBTIDL's `dcmeantsys`. Parameters ---------- calon : `~numpy.ndarray`-like ON calibration caloff : `~numpy.ndarray`-like OFF calibration tcal : `~numpy.ndarray`-like calibration temperature mode : int mode=0 Do the mean before the division mode=1 Do the mean after the division fedge : float Fraction of edge channels to exclude at each end, a number between 0 and 1. Default: 0.1, meaning the central 80% bandwidth is used nedge : int Number of edge channels to exclude. Default: None, meaning use `fedge` Returns ------- meanTsys : `~numpy.ndarray`-like The mean system temperature """ # @todo Pedro thinks about a version that takes a spectrum with multiple SpectralRegions to exclude. nchan = len(calon) if nedge is None: nedge = int(nchan * fedge) # Python uses exclusive array ranges while GBTIDL uses inclusive ones. # Therefore we have to add a channel to the upper edge of the range # below in order to reproduce exactly what GBTIDL gets for Tsys. # See github issue #28. # Define the channel range once. chrng = slice(nedge, -(nedge - 1), 1) # Make them doubles. Probably not worth it. caloff = caloff.astype("d") calon = calon.astype("d") if mode == 0: # mode = 0 matches GBTIDL output for Tsys values meanoff = np.nanmean(caloff[chrng]) meandiff = np.nanmean(calon[chrng] - caloff[chrng]) meanTsys = meanoff / meandiff * tcal + tcal / 2.0 else: meanTsys = np.mean(caloff[chrng] / (calon[chrng] - caloff[chrng])) meanTsys = meanTsys * tcal + tcal / 2.0 # meandiff can sometimes be negative, which makes Tsys negative! # GBTIDL also takes abs(Tsys) because it does sqrt(Tsys^2) return np.abs(meanTsys)
[docs] def sq_weighted_avg(a, axis=0, weights=None): # @todo make a generic moment or use scipy.stats.moment r"""Compute the mean square weighted average of an array (2nd moment). :math:`v = \sqrt{\frac{\sum_i{w_i~a_i^{2}}}{\sum_i{w_i}}}` Parameters ---------- a : `~numpy.ndarray` The data to average. axis : int The axis over which to average the data. Default: 0 weights : `~numpy.ndarray` or None The weights to use in averaging. The weights array must be the length of the axis over which the average is taken. Default: `None` will use equal weights. Returns ------- average : `~numpy.ndarray` The square root of the squared average of a along the input axis. """ # if weights is None: # w = np.ones_like(a) # else: # w = weights a2 = a * a v = np.sqrt(np.average(a2, axis=axis, weights=weights)) return v
[docs] def tsys_weight(exposure, delta_freq, tsys): r"""Compute the system temperature based weight(s) using the expression: :math:`w = t_{exp} \times \delta_\nu / T_{sys}^2,` If `exposure`, `delta_freq`, or `tsys` parameters are given as `~astropy.units.Quantity`, they will be converted to seconds, Hz, K, respectively for the calculation. (Not that this really matters since weights are relative to each other) >>> import astropy.units as u >>> [3*u.s, 4*u.s] ### WRONG Rather, if using `~astropy.units.Quantity`, they have to be `~astropy.units.Quantity` objects, e.g., >>> [3, 4]*u.s ### RIGHT! Parameters ---------- exposure : `~numpy.ndarray`, float, or `~astropy.units.Quantity` The exposure time, typically given in seconds delta_freq : `~numpy.ndarray`, float, or `~astropy.units.Quantity` The channel width in frequency units tsys : `~numpy.ndarray`, float, or `~astropy.units.Quantity` The system temperature, typically in K Returns ------- weight : `~numpy.ndarray` The weights array """ # Quantitys work with abs and power! # Using `numpy.power` like this results in increased # precision over the calculation used by GBTIDL: # weight = abs(delta_freq) * exposure / tsys**2. weight = abs(delta_freq) * exposure * np.power(tsys, -2.0) if type(weight) == u.Quantity: return weight.value.astype(np.float64) else: return weight.astype(np.float64)
[docs] def get_spectral_equivalency(restfreq, velocity_convention): # Yeesh, the doppler_convention parameter for SpectralAxis.to does not match the doppler_convention list for Spectrum1D! # This is actually bug in Spectrum1D documentation https://github.com/astropy/specutils/issues/1067 if "radio" in velocity_convention: return u.doppler_radio(restfreq) elif "optical" in velocity_convention: return u.doppler_optical(restfreq) elif "relativistic" in velocity_convention: return u.doppler_relativistic(restfreq) elif "redshift" in velocity_convention: return u.doppler_redshift() else: raise ValueError(f"Unrecognized velocity convention {velocity_convention}")