"""
The Spectrum class to contain and manipulate spectra.
"""
import warnings
from copy import deepcopy
import astropy.units as u
import numpy as np
import pandas as pd
from astropy.coordinates import ITRS, SkyCoord, SpectralCoord, StokesCoord
from astropy.coordinates.spectral_coordinate import attach_zero_velocities
from astropy.io import registry
from astropy.io.fits.verify import VerifyWarning
from astropy.modeling.fitting import LinearLSQFitter
# from astropy.nddata.ccddata import fits_ccddata_writer
from astropy.table import Table
from astropy.time import Time
from astropy.utils.masked import Masked
from astropy.wcs import WCS, FITSFixedWarning
from ndcube import NDCube
from specutils import Spectrum as Spectrum1D
from dysh.spectra import core
from ..coordinates import ( # is_topocentric,; topocentric_velocity_to_frame,
KMS,
Observatory,
astropy_convenience_frame_names,
astropy_frame_dict,
change_ctype,
get_velocity_in_frame,
make_target,
replace_convention,
sanitize_skycoord,
veldef_to_convention,
)
from ..log import HistoricalBase, log_call_to_history, log_call_to_result
from ..plot import specplot as sp
from ..util import minimum_string_match
from ..util.docstring_manip import copy_docstring
from . import (
FWHM_TO_STDDEV,
available_smooth_methods,
baseline,
curve_of_growth,
decimate,
exclude_to_spectral_region,
get_spectral_equivalency,
spectral_region_to_list_of_tuples,
spectral_region_to_unit,
)
# from astropy.nddata import StdDevUncertainty
# Spectrum attributes to be ignored by Spectrum._copy_attributes
IGNORE_ON_COPY = [
"_data",
"_flux",
"_meta",
"_mask",
"_weights",
"_baseline_model",
"_plotter",
"_uncertainty",
"_unit",
]
[docs]
class Spectrum(Spectrum1D, HistoricalBase):
"""
This class contains a spectrum and its attributes. It is built on
`~specutils.Spectrum` with added attributes like baseline model.
Note that `~specutils.Spectrum` can contain multiple spectra but
we probably will not use that because the restriction that it can
have only one spectral axis conflicts with slight Doppler shifts.
See `~specutils.Spectrum` for the instantiation arguments.
"""
@log_call_to_history
def __init__(self, *args, **kwargs):
HistoricalBase.__init__(self)
self._target = kwargs.pop("target", None)
if self._target is not None:
self._target = sanitize_skycoord(self._target)
self._velocity_frame = self._target.frame.name
else:
self._velocity_frame = None
self._observer = kwargs.pop("observer", None)
Spectrum1D.__init__(self, *args, **kwargs)
# Try making a target from meta. If it fails, don't worry about it.
if False:
if self._target is None:
try:
self._target = make_target(self.meta)
self._velocity_frame = self._target.frame.name
except Exception:
pass
self._spectral_axis._target = self._target
if self.velocity_convention is None and "VELDEF" in self.meta:
self._spectral_axis._doppler_convention = veldef_to_convention(self.meta["VELDEF"])
# if "uncertainty" not in kwargs:
# self._uncertainty = StdDevUncertainty(np.ones_like(self.data), unit=self.flux.unit)
# Weights are Non here because can be defined
# separately from uncertainty by the user. Not really recommended
# but they can do so. See self.weights()
# self._weights = None
self._weights = np.ones(self.flux.shape)
# Get observer quantity from observer location.
if "DATE-OBS" in self.meta:
self._obstime = Time(self.meta["DATE-OBS"])
elif "MJD-OBS" in self.meta:
self._obstime = Time(self.meta["MJD-OBS"])
else:
self._obstime = None
self._spectral_axis._observer = self.observer
if self._spectral_axis._observer is not None:
self._velocity_frame = self._spectral_axis._observer.name
# if mask is not set via the flux input (NaNs in flux or flux.mask),
# then set the mask to all False == good
if self.mask is None:
self._mask = np.full(np.shape(self.flux), False)
self._baseline_model = None
self._subtracted = False
self._normalized = False
self._exclude_regions = None
self._include_regions = None # do we really need this?
self._plotter = None
# `self._resolution` is the spectral resolution in channels.
# This will be 1 for VEGAS, and >1 for the ACS.
if "FREQRES" in self.meta and "CDELT1" in self.meta:
self._resolution = self.meta["FREQRES"] / abs(self.meta["CDELT1"])
else:
self._resolution = 1
def _len(self):
"""return the size of the `Spectrum` in the spectral dimension.
@todo __len__ has unintended consquences, yuck.
"""
return len(self.frequency)
def _spectrum_property(self, prop: str):
"""
Utility method to return a header value as a property.
Parameters
----------
prop : str
The property name, _case-sensitive_
Returns
-------
value : Any or Non
The property value or None if `prop` is not in the header.
"""
return self.meta.get(prop, None)
@property
def weights(self):
"""The channel weights of this spectrum"""
# Cannot take power of NDUncertainty, so convert
# to a Quanity first.
# if self._weights is None:
# return self._uncertainty.quantity**-2
return self._weights
@property
def exclude_regions(self):
"""The baseline exclusion region(s) of this spectrum"""
return self._exclude_regions
@property
def baseline_model(self):
"""Returns the computed baseline model or None if it has not yet been computed."""
return self._baseline_model
@property
def flux(self):
"""
Converts the stored data and unit and mask into a `~astropy.units.Quantity` object.
Returns
-------
`~astropy.units.Quantity`
Spectral data as a quantity. Masked values are filled with NaN.
"""
return Masked(self.data * self.unit, mask=self.mask).filled(np.nan)
[docs]
@log_call_to_history
def baseline(self, degree, exclude=None, include=None, color="k", **kwargs):
# fmt: off
"""
Compute and optionally remove a baseline.
The code uses `~astropy.modeling.fitting.Fitter` and `~astropy.modeling.polynomial` to compute the baseline.
See the documentation for those modules for details.
This method will set the `baseline_model` attribute to the fitted model function which can be evaluated over a domain.
Note that `include` and `exclude` are mutually exclusive. If both are present, only `include` will be used.
Parameters
----------
degree : int
The degree of the polynomial series, a.k.a. baseline order
exclude : list of 2-tuples of int or `~astropy.units.Quantity`, or `~specutils.SpectralRegion`
List of region(s) to exclude from the fit. The tuple(s) represent a range in the form [lower,upper], inclusive.
Examples:
One channel-based region: [11,51]
Two channel-based regions: [(11,51),(99,123)].
One `~astropy.units.Quantity` region: [110.198*u.GHz,110.204*u.GHz].
One compound `~specutils.SpectralRegion`: SpectralRegion([(110.198*u.GHz,110.204*u.GHz),(110.196*u.GHz,110.197*u.GHz)]).
Default: no exclude region
include : list of 2-tuples of int or `~astropy.units.Quantity`, or `~specutils.SpectralRegion`
List of region(s) to include in the fit. The tuple(s) represent a range in the form [lower,upper], inclusive.
See `exclude` for examples.
color : str
The color to plot the baseline model, if remove=False. Can be any type accepted by matplotlib.
model : str
One of 'polynomial', 'chebyshev', 'legendre', or 'hermite'
Default: 'chebyshev'
fitter : `~astropy.modeling.fitting.Fitter`
The fitter to use. Default: `~astropy.modeling.fitting.LinearLSQFitter` (with `calc_uncertaintes=True`).
Be careful when choosing a different fitter to be sure it is optimized for this problem.
remove : bool
If True, the baseline is removed from the spectrum.
If False, the baseline will be computed and overlaid on the spectrum. Default: False
"""
# fmt: on
# @todo: Are exclusion regions OR'd with the existing mask? make that an option?
kwargs_opts = {
"remove": False,
"model": "chebyshev",
"fitter": LinearLSQFitter(calc_uncertainties=True),
}
kwargs_opts.update(kwargs)
# `include` and `exclude` are mutually exclusive, but we allow `include`
# if `include` is used, transform it to `exclude`.
if include is not None:
if exclude is not None:
logger.info(f"Warning: ignoring exclude={exclude}") # noqa: F821
exclude = core.include_to_exclude_spectral_region(include, self)
self._baseline_model = baseline(self, degree, exclude, **kwargs)
if kwargs_opts["remove"]:
s = self.subtract(self._baseline_model(self.spectral_axis))
self._data = s._data
self._subtracted = True
if self._plotter is not None:
if kwargs_opts["remove"]:
self._plotter._line.set_ydata(self._data)
self._plotter.clear_overlays(blines=True)
if not self._plotter._freezey:
self._plotter.freey()
else:
if self._plotter._xunit == "chan":
xval = np.arange(len(self.flux))
else:
xval = self._plotter._sa
self._plotter._axis.plot(xval, self._baseline_model(self.spectral_axis), c=color, gid="baseline")
self._plotter.refresh()
# baseline
[docs]
@log_call_to_history
def undo_baseline(self):
"""
Undo the most recently computed baseline. If the baseline
has been subtracted, it will be added back to the data. The `baseline_model`
attribute is set to None. Exclude regions are untouched.
"""
if self._baseline_model is None:
return
if self._subtracted:
if self._normalized:
warnings.warn("Cannot undo previously normalized baseline subtraction") # noqa: B028
return
s = self.add(self._baseline_model(self.spectral_axis))
self._data = s._data
if self._plotter is not None:
self._plotter._line.set_ydata(self._data)
if not self._plotter._freezey:
self._plotter.freey()
self._baseline_model = None
@property
def subtracted(self):
"""Has a baseline model been subtracted?"
Returns
-------
True if a baseline model has been subtracted, False otherwise
"""
return self._subtracted
def _set_exclude_regions(self, exclude):
"""
Set the mask for the regions to exclude.
Parameters
----------
exclude : list of 2-tuples of int or ~astropy.units.Quantity, or ~specutils.SpectralRegion
List of region(s) to exclude from the fit. The tuple(s) represent a range in the form [lower,upper], inclusive.
In channel units.
Examples: One channel-based region: [11,51],
Two channel-based regions: [(11,51),(99,123)].
One `~astropy.units.Quantity` region: [110.198*u.GHz,110.204*u.GHz].
One compound `~specutils.SpectralRegion`: SpectralRegion([(110.198*u.GHz,110.204*u.GHz),(110.196*u.GHz,110.197*u.GHz)]).
"""
pass
[docs]
def list_to_spectral_region(self, inlist):
# @todo utility code to convert a input list of channels or quantities to a spectral region with units of self.spectral_axis.unit.
# This could go in core.py combine this with _set_exclude_regions
pass
[docs]
def bshow(self):
"""Show the baseline model"""
print(f"baseline model {self._baseline_model}")
[docs]
@copy_docstring(sp.SpectrumPlot.plot)
def plot(self, **kwargs):
""" """
if self._plotter is None:
self._plotter = sp.SpectrumPlot(self, **kwargs)
self._plotter.plot(**kwargs)
return self._plotter
[docs]
def get_selected_regions(self, unit=None):
"""Get selected regions from plot."""
if self._plotter is None:
raise TypeError("No plotter attached to spectrum. Use Spectrum.plot() first.")
regions = self._plotter.get_selected_regions()
# If there are no selected regions, tell the user and return.
if len(regions) == 0:
warnings.warn(
"No selected regions.",
UserWarning,
stacklevel=2,
)
return
if unit is not None:
regions = exclude_to_spectral_region(regions, self)
regions = spectral_region_to_unit(regions, self, unit=unit, append_doppler=True)
regions = spectral_region_to_list_of_tuples(regions)
return regions
@property
def obstime(self):
return self._obstime
@property
def plotter(self):
return self._plotter
[docs]
def stats(self, roll=0, qac=False):
"""
Compute some statistics of this `Spectrum`. The mean, rms, median,
data minimum and data maximum are calculated. Note this works
with slicing, so, e.g., `myspectrum[45:153].stats()` will return
the statistics of the slice.
Parameters
----------
roll : int
Return statistics on a 'rolled' array differenced with the
original array. If there is no correllaton between channels,
a roll=1 would return an RMS sqrt(2) larger than that of the
input array. Another advantage of rolled statistics it will
remove most slow variations, thus RMS/sqrt(2) might be a better
indication of the underlying RMS.
Default: 0
qac : bool
If set, the returned simple string contains mean,rms,datamin,datamax
for easier visual regression. Based on some legacy code.
Default: False
Returns
-------
stats : dict
Dictionary consisting of (mean,median,rms,datamin,datamax)
"""
if roll == 0:
mean = self.mean()
median = self.median()
rms = np.nanstd(self.flux)
dmin = self.min()
dmax = self.max()
else:
d = self[roll:] - self[:-roll]
mean = d.mean()
median = d.median()
rms = np.nanstd(d.flux)
dmin = d.min()
dmax = d.max()
if qac:
out = f"{mean.value} {rms.value} {dmin.value} {dmax.value}"
return out
out = {"mean": mean, "median": median, "rms": rms, "min": dmin, "max": dmax}
return out
[docs]
@log_call_to_history
def decimate(self, n):
"""
Decimate the `Spectrum` by n pixels.
Parameters
----------
n : int
Decimation factor of the spectrum by returning every n-th channel.
Returns
-------
s : `Spectrum`
The decimated `Spectrum`.
"""
new_data, new_meta = decimate(self.data * self.flux.unit, n, self.meta)
s = Spectrum.make_spectrum(new_data, meta=new_meta, observer_location="from_meta")
if self._baseline_model is not None:
s._baseline_model = None
return s
[docs]
@log_call_to_history
def smooth(
self,
method="hanning",
width=1,
decimate=0,
meta=None,
mask=None,
boundary="extend",
nan_treatment="fill",
fill_value=np.nan,
preserve_nan=True,
):
"""
Smooth or Convolve the `Spectrum`, optionally decimating it.
Default smoothing is hanning.
Parameters
----------
method : string
Smoothing method. Valid are: 'hanning', 'boxcar' and
'gaussian'. Minimum match applies.
The default is 'hanning'.
width : int
Effective width of the convolving kernel.
For 'hanning' this should be 1, with a 0.25,0.5,0.25 kernel.
For 'boxcar' an even value triggers an odd one with half the
signal at the edges, and will thus not reproduce GBTIDL.
For 'gaussian' this is the FWHM of the final frequency response.
That is, the smoothed `Spectrum` will have an effective frequency resolution
equal to CDELT1*`width`.
The default is 1.
decimate : int
Decimation factor of the spectrum by returning every decimate channel.
-1: no decimation
0: use the width parameter
>1: user supplied decimation (use with caution)
mask : None or `numpy.ndarray`
A "mask" array. Shape must match ``array``, and anything that is masked
(i.e., not 0/`False`) will be set to NaN for the convolution. If
`None`, no masking will be performed unless ``array`` is a masked array.
If ``mask`` is not `None` *and* ``array`` is a masked array, a pixel is
masked if it is masked in either ``mask`` *or* ``array.mask``.
boundary : str
A flag indicating how to handle boundaries:
* `None`
Set the ``result`` values to zero where the kernel
extends beyond the edge of the array.
* 'fill'
Set values outside the array boundary to ``fill_value`` (default).
* 'wrap'
Periodic boundary that wrap to the other side of ``array``.
* 'extend'
Set values outside the array to the nearest ``array``
value.
fill_value : float
The value to use outside the array when using ``boundary='fill'``. Default value is ``NaN``.
nan_treatment : {'interpolate', 'fill'}
The method used to handle NaNs in the input ``array``:
* ``'interpolate'``: ``NaN`` values are replaced with
interpolated values using the kernel as an interpolation
function. Note that if the kernel has a sum equal to
zero, NaN interpolation is not possible and will raise an
exception.
* ``'fill'``: ``NaN`` values are replaced by ``fill_value``
prior to convolution.
preserve_nan : bool
After performing convolution, should pixels that were originally NaN
again become NaN?
Raises
------
Exception
If no valid smoothing method is given.
ValueError
If `width` is less than one.
If `width` is less than the spectral resolution (in channels).
If `decimate` is not an integer.
Returns
-------
s : `Spectrum`
The new, possibly decimated, convolved `Spectrum`.
"""
valid_methods = available_smooth_methods()
this_method = minimum_string_match(method, valid_methods)
if width < 1:
raise ValueError(f"`width` ({width}) must be >=1.")
if this_method is None:
raise Exception(f"smooth({method}): valid methods are {valid_methods}")
md = np.ma.masked_array(self._data, self.mask)
if decimate == 0:
# Take the default decimation by `width`.
decimate = int(abs(width))
if not float(width).is_integer():
print(f"Adjusting decimation factor to be a natural number. Will decimate by {decimate}")
if this_method == "gaussian":
if width <= self._resolution:
raise ValueError(
f"`width` ({width} channels) cannot be less than the current resolution ({self._resolution} channels)."
)
kwidth = np.sqrt(width**2 - self._resolution**2) # Kernel effective width.
stddev = kwidth / FWHM_TO_STDDEV
new_data, new_meta = core.smooth(
data=md * self.flux.unit,
method=this_method,
ndecimate=decimate,
width=stddev,
meta=self.meta,
boundary=boundary,
mask=mask,
nan_treatment=nan_treatment,
fill_value=fill_value,
preserve_nan=preserve_nan,
)
else:
kwidth = width
new_data, new_meta = core.smooth(
data=md,
method=this_method,
width=width,
ndecimate=decimate,
meta=self.meta,
boundary=boundary,
mask=mask,
nan_treatment=nan_treatment,
fill_value=fill_value,
preserve_nan=preserve_nan,
)
s = Spectrum.make_spectrum(new_data * self.flux.unit, meta=new_meta, observer_location="from_meta")
s._baseline_model = self._baseline_model
# Update the spectral resolution in channel units.
s._resolution = s.meta["FREQRES"] / abs(s.meta["CDELT1"])
return s
[docs]
def shift(self, s, remove_wrap=True, fill_value=np.nan, method="fft"):
"""
Shift the `Spectrum` by `s` channels in place.
Parameters
----------
s : float
Number of channels to shift the `Spectrum` by.
remove_wrap : bool
If `False` keep channels that wrap around the edges.
If `True` fill channels that wrap with `fill_value`.
fill_value : float
If `remove_wrap=True` fill channels that wrapped with this value.
method : "fft"
Method used to perform the fractional channel shift.
"fft" uses a phase shift.
"""
new_spec = self._copy()
new_data = core.data_shift(new_spec.data, s, remove_wrap=remove_wrap, fill_value=fill_value, method=method)
# Update data values.
new_spec._data = new_data
# Update metadata.
new_spec.meta["CRPIX1"] += s
# Update WCS.
new_spec.wcs.wcs.crpix[0] += s
# Update `SpectralAxis` values.
# Radial velocity needs to be copied by hand.
radial_velocity = deepcopy(new_spec._spectral_axis._radial_velocity)
new_spectral_axis_values = new_spec.wcs.spectral.pixel_to_world(np.arange(new_spec.flux.shape[-1]))
new_spec._spectral_axis = new_spec.spectral_axis.replicate(value=new_spectral_axis_values)
new_spec._spectral_axis._radial_velocity = radial_velocity
return new_spec
[docs]
def find_shift(self, other, units=None, frame=None):
"""
Find the shift required to align this `Spectrum` with `other`.
Parameters
----------
other : `Spectrum`
Target `Spectrum` to align to.
units : {None, `astropy.units.Quantity`}
Find the shift to align the two `Spectra` in these units.
If `None`, the `Spectra` will be aligned using the units of
`other`.
frame : {None, str}
Find the shift in this reference frame.
If `None` will use the frame of `other`.
Returns
-------
shift : float
Number of channels that this `Spectrum` must be shifted to
be aligned with `other`.
"""
if not isinstance(other, Spectrum):
raise ValueError("`other` must be a `Spectrum`.")
if frame is not None and frame not in astropy_frame_dict.keys():
raise ValueError(
f"`frame` ({frame}) not recognized. Frame must be one of {', '.join(list(astropy_frame_dict.keys()))}"
)
else:
frame = other._velocity_frame
sa = self.spectral_axis.with_observer_stationary_relative_to(frame)
tgt_sa = other.spectral_axis.with_observer_stationary_relative_to(frame)
if units is None:
units = tgt_sa.unit
sa = sa.to(units)
tgt_sa = tgt_sa.to(units)
cdelt1 = sa[1] - sa[0]
shift = ((sa[0] - tgt_sa[0]) / cdelt1).value
return shift
[docs]
def align_to(self, other, units=None, frame=None, remove_wrap=True, fill_value=np.nan, method="fft"):
"""
Align the `Spectrum` with respect to `other`.
Parameters
----------
other : `Spectrum`
Target `Spectrum` to align to.
units : {None, `astropy.units.Quantity`}
Find the shift to align the two `Spectra` in these units.
If `None`, the `Spectra` will be aligned using the units of
`other`.
frame : {None, str}
Find the shift in this reference frame.
If `None` will use the frame of `other`.
remove_wrap : bool
If `True` allow spectrum to wrap around the edges.
If `False` fill channels that wrap with `fill_value`.
fill_value : float
If `wrap=False` fill channels that wrapped with this value.
method : "fft"
Method used to perform the fractional channel shift.
"fft" uses a phase shift.
"""
s = self.find_shift(other, units=units, frame=frame)
return self.shift(s, remove_wrap=remove_wrap, fill_value=fill_value, method=method)
@property
def equivalencies(self):
"""Get the spectral axis equivalencies that can be used in converting the axis
between km/s and frequency or wavelength"""
equiv = u.spectral()
sa = self.spectral_axis
if sa.doppler_rest is not None:
rfq = sa.doppler_rest
elif "RESTFREQ" in self.meta:
cunit1 = self.meta.get("CUNIT1", self.wcs.wcs.cunit[0])
# @todo this could be done with a dict str->function
rfq = self.meta["RESTFREQ"] * cunit1 # WCS wants no E
else:
rfq = None
if rfq is not None:
equiv.extend(get_spectral_equivalency(rfq, self.velocity_convention))
return equiv
@property
def target(self):
"""
The target object of this spectrum.
Returns
-------
target : `~astropy.coordinates.sky_coordinate.SkyCoord`
The sky coordinate object
"""
return self._target
@property
def tscale(self):
"""
The descriptive brightness unit of the data. One of
- 'Raw' : raw value, e.g., count
- 'Ta' : Antenna Temperature in K
- 'Ta*' : Antenna temperature corrected to above the atmosphere in K
- 'Flux': flux density in Jansky
Returns
-------
tscale : str or None
Brightness unit string. If there is no TSCALE in the header, None is returned.
"""
return self._spectrum_property("TSCALE")
@property
def tscale_fac(self):
"""
The factor by which the data have been scale from antenna temperature to corrected antenna temperature
or flux density. This is the average of the values by which the integrations in the spectrum have been scaled.
Returns
-------
tscale_fac : float or None
The scale factor. If there is no TSCALFAC in the header, None is returned.
"""
return self._spectrum_property("TSCALFAC")
@property
def observer(self):
"""
Returns
-------
observer : `~astropy.coordinates.BaseCoordinateFrame` or derivative
The coordinate frame of the observer if present.
"""
return self._observer
@property
def velocity_frame(self):
"""String representation of the velocity frame"""
return self._velocity_frame
@property
def doppler_convention(self):
"""String representation of the velocity (Doppler) convention"""
return self.velocity_convention
[docs]
def axis_velocity(self, unit=KMS):
"""Get the spectral axis in velocity units.
*Note*: This is not the same as `Spectrum.velocity`, which includes the source radial velocity.
Parameters
----------
unit : `~astropy.units.Quantity` or str that can be converted to Quantity
The unit to which the axis is to be converted
Returns
-------
velocity : `~astropy.units.Quantity`
The converted spectral axis velocity
"""
return self._spectral_axis.to(unit)
[docs]
def velocity_axis_to(self, unit=KMS, toframe=None, doppler_convention=None):
"""
Parameters
----------
unit : `~astropy.units.Quantity` or str that can be converted to Quantity
The unit to which the axis is to be converted
toframe : str
The coordinate frame to convert to, e.g. 'hcrs', 'icrs'
doppler_convention : str
The Doppler velocity covention to use, one of 'optical', 'radio', or 'rest'
Returns
-------
test_spectrum.pyvelocity : `~astropy.units.Quantity`
The converted spectral axis velocity
"""
if toframe is not None and toframe != self.velocity_frame:
s = self.with_frame(toframe)
else:
s = self
if doppler_convention is not None:
return s._spectral_axis.to(unit=unit, doppler_convention=doppler_convention).to(unit)
else:
return s.axis_velocity(unit)
[docs]
def get_velocity_shift_to(self, toframe):
if self._target is None:
raise Exception("Can't calculate velocity because Spectrum.target is None")
return get_velocity_in_frame(self._target, toframe, self._observer, self._obstime)
[docs]
@log_call_to_history
def set_frame(self, toframe):
"""Set the sky coordinate and doppler tracking reference frame of this Spectrum. The header 'CTYPE1' will be changed accordingly.
To make a copy of this Spectrum with new coordinate referece frmae instead, use `with_frame`.
Parameters
----------
toframe - str, or ~astropy.coordinates.BaseCoordinateFrame, or ~astropy.coordinates.SkyCoord
The coordinate reference frame identifying string, as used by astropy, e.g. 'hcrs', 'icrs', etc.,
or an actual coordinate system instance
"""
tfl = toframe
if isinstance(toframe, str):
tfl = toframe.lower()
tfl = astropy_convenience_frame_names.get(tfl, tfl)
if "itrs" in tfl:
if isinstance(self._observer, ITRS):
return # nothing to be done, we already have the correct axis
raise ValueError(
"For topographic or ITRS coordaintes, you must supply a full astropy Coordinate instance."
)
elif self._velocity_frame == tfl:
return # the frame is already the requested frame
self._spectral_axis = self._spectral_axis.with_observer_stationary_relative_to(tfl)
self._observer = self._spectral_axis.observer
# This line is commented because:
# SDFITS defines CTYPE1 as always being the TOPO frequency.
# See Issue #373 on GitHub.
# self._meta["CTYPE1"] = change_ctype(self._meta["CTYPE1"], toframe)
if isinstance(tfl, str):
self._velocity_frame = tfl
else:
self._velocity_frame = tfl.name
# While it is incorrect to change CTYPE1, it is reasonable to change VELDEF.
# SDFITS defines CTYPE1 as always being the TOPO frequency.
# See Issue #373 on GitHub.
self.meta["VELDEF"] = change_ctype(self.meta["VELDEF"], self._velocity_frame)
[docs]
def with_frame(self, toframe):
"""Return a copy of this `Spectrum` with a new coordinate reference frame.
Parameters
----------
toframe - str, `~astropy.coordinates.BaseCoordinateFrame`, or `~astropy.coordinates.SkyCoord`
The coordinate reference frame identifying string, as used by astropy, e.g. 'hcrs', 'icrs', etc.,
or an actual coordinate system instance
Returns
-------
spectrum : `Spectrum`
A new `Spectrum` object with the rest frame set to `toframe`.
"""
s = self._copy()
s.set_frame(toframe)
return s
[docs]
@log_call_to_history
def set_convention(self, doppler_convention):
"""Set the velocity convention of this `Spectrum`. The spectral axis of this `Spectrum` will be replaced
with a new spectral axis with the input velocity convention. The header 'VELDEF' value will
be changed accordingly.
To make a copy of this `Spectrum` with a new velocity convention instead, use `with_velocity_convention`.
Parameters
----------
doppler_convention - str
The velocity convention, one of 'radio', 'optical', 'relativistic'
"""
# replicate() gives the same asnwer as
# self._spectral_axis.to(unit=self._spectral_axis.unit, doppler_convention=doppler_convention)
new_sp_axis = self.spectral_axis.replicate(doppler_convention=doppler_convention)
self._spectral_axis = new_sp_axis
self.meta["VELDEF"] = replace_convention(self.meta["VELDEF"], doppler_convention)
[docs]
def with_velocity_convention(self, doppler_convention):
"""Returns a copy of this `Spectrum` with the input velocity convention. The header 'VELDEF' value will
be changed accordingly.
Parameters
----------
doppler_convention - str
The velocity convention, one of 'radio', 'optical', 'relativistic'
Returns
-------
spectrum : `Spectrum`
A new `Spectrum` object with `doppler_convention` as its Doppler convention.
"""
s = self._copy(velocity_convention=doppler_convention)
s.set_convention(doppler_convention)
return s
[docs]
def savefig(self, file, **kwargs):
"""Save the plot"""
raise Exception("savefig() has been moved to the SpecPlot class")
def _write_table(self, fileobj, format, **kwargs):
"""
Write this `Spectrum` as an ~astropy.table.Table. The output columns will be
- frequency or velocity - in the Spectrum's spectral axis units, or converted with `xaxis_unit` keyword
- flux - in the Spectrum's flux units, or converted with `yaxis_unit` keyword
- uncertainty - The channel-based uncertainty or zeroes if uncertainty was not defined
- weights - the channel weights
- mask - 0 is unmasked ('good'), 1 is masked ('bad')
- baseline model value - the value of the baseline model at each x axis point, regardless of whether it
has been subtracted from the Spectrum or not. This will be all zeroes if no baseline model
has been computed.
Parameters
----------
fileobj : str, file-like or `pathlib.Path`
File to write to. If a file object, must be opened in a
writeable mode.
format : str
The output format. Must be a format supported by ~astropy.table.Table.write
kwargs : variable
Additional keyword arguments supported by ~astropy.table.Table.write
"""
# @todo support xaxis_unit, yaxis_unit, flux_unit
# xaxis_unit : str or :class:`astropy.units.Unit`
# yaxis_unit : str or :class:`astropy.units.Unit`
# flux_unit : str or :class:`astropy.units.Unit`
if self._baseline_model is None:
bl = np.zeros_like(self.flux)
bldesc = "Fitted baseline value at given channel (was not defined)"
else:
bl = self._baseline_model(self._spectral_axis)
bldesc = "Fitted baseline value at given channel"
mask = self.mask * 1
if self.uncertainty is None:
unc = np.zeros(self.flux.shape) # , mask=False)
udesc = "Flux uncertainty (was not defined)"
else:
unc = self.uncertainty.quantity
udesc = "Flux uncertainty"
outarray = [self.spectral_axis, self.flux, unc, self.weights, mask, bl]
description = [
"Spectral axis",
"Flux",
udesc,
"Channel weights",
"Mask 0=unmasked, 1=masked",
bldesc,
]
# remove FITS reserve keywords
meta = deepcopy(self.meta)
meta.pop("NAXIS1", None)
meta.pop("TDIM7", None)
meta.pop("TUNIT7", None)
meta["HISTORY"] = self.history # use property not _history to ensure ascii
meta["COMMENT"] = self.comments
if format == "mrt":
ulab = "e_flux" # MRT convention that error on X is labeled e_X
else:
ulab = "uncertainty"
outnames = ["spectral_axis", "flux", ulab, "weight", "mask", "baseline"]
if "ipac" in format:
# IPAC format wants a crazy dictionary style.
d = {}
for k, v in meta.items():
d[k] = {"value": v}
t = Table(outarray, names=outnames, meta={"keywords": d}, descriptions=description)
else:
t = Table(outarray, names=outnames, meta=meta, descriptions=description)
# for now ignore complaints about keywords until we clean them up.
# There are some that are more than 8 chars that should be fixed in GBTFISLOAD
warnings.simplefilter("ignore", VerifyWarning)
t.write(fileobj, format=format, **kwargs)
def _copy(self, **kwargs):
"""
Perform deep copy operations on each attribute of the ``Spectrum``
object.
This overrides the ``specutils.Spectrum`` method so that
target and observer attributes get copied.
"""
alt_kwargs = dict(
flux=deepcopy(self.flux),
spectral_axis=deepcopy(self.spectral_axis),
uncertainty=deepcopy(self.uncertainty),
wcs=deepcopy(self.wcs),
mask=deepcopy(self.mask),
meta=deepcopy(self.meta),
unit=deepcopy(self.unit),
velocity_convention=deepcopy(self.velocity_convention),
rest_value=deepcopy(self.rest_value),
target=deepcopy(self.target),
observer=deepcopy(self.observer),
)
alt_kwargs.update(kwargs)
s = self.__class__(**alt_kwargs)
# s.topocentric = is_topocentric(meta["CTYPE1"]) # GBT-specific to use CTYPE1 instead of VELDEF
return s
[docs]
@classmethod
def fake_spectrum(cls, nchan=1024, seed=None, **kwargs):
"""
Create a fake spectrum, useful for simple testing. A default header is
created, which may be modified with kwargs.
Parameters
----------
nchan : int, optional
Number of channels. The default is 1024.
seed : {None, int, array_like[ints], `numpy.random.SeedSequence`, `numpy.random.BitGenerator`, `numpy.random.Generator`}, optional
A seed to initialize the `BitGenerator`. If None, then fresh, unpredictable entropy will be pulled from the OS.
If an int or array_like[ints] is passed, then all values must be non-negative and will be passed to
`SeedSequence` to derive the initial `BitGenerator` state. One may also pass in a `SeedSequence` instance.
Additionally, when passed a `BitGenerator`, it will be wrapped by `Generator`. If passed a `Generator`, it will
be returned unaltered.
The default is `None`.
**kwargs: dict or key=value
Metadata to put in the header. If the key exists already in
the default header, it will be replaced. Otherwise the key and value will be
added to the header. Keys are case insensitive.
Returns
-------
spectrum : `Spectrum`
The spectrum object
"""
rng = np.random.default_rng(seed)
data = rng.random(nchan) * u.K
meta = {
"OBJECT": "NGC2415",
"BANDWID": 23437500.0,
"DATE-OBS": "2021-02-10T07:38:37.50",
"DURATION": 0.9982445,
"EXPOSURE": 732.1785161896237,
"TSYS": 17.930595470605255,
"CTYPE1": "FREQ-OBS",
"CRVAL1": 1402544936.7749996,
"CRPIX1": float(nchan) / 2.0,
"CDELT1": -715.2557373046875,
"CTYPE2": "RA",
"CRVAL2": 114.23878994411744,
"CTYPE3": "DEC",
"CRVAL3": 35.24315395841497,
"CRVAL4": -6,
"OBSERVER": "Michael Fanelli",
"OBSID": "unknown",
"SCAN": 152,
"OBSMODE": "OnOff:PSWITCHON:TPWCAL",
"FRONTEND": "Rcvr1_2",
"TCAL": 1.4551637172698975,
"VELDEF": "OPTI-HEL",
"VFRAME": 15264.39118499772,
"RVSYS": 3775382.910954342,
"OBSFREQ": 1402544936.7749996,
"LST": 42101.90296246562,
"AZIMUTH": 285.9514963267411,
"ELEVATIO": 42.100623613548194,
"TAMBIENT": 270.4,
"PRESSURE": 696.2290227048372,
"HUMIDITY": 0.949,
"RESTFREQ": 1420405751.7,
"FREQRES": 715.2557373046875,
"EQUINOX": 2000.0,
"RADESYS": "FK5",
"TRGTLONG": 114.2375,
"TRGTLAT": 35.24194444444444,
"SAMPLER": "A2_0",
"FEED": 1,
"SRFEED": 0,
"FEEDXOFF": 0.0,
"FEEDEOFF": 0.0,
"SUBREF_STATE": 1,
"SIDEBAND": "L",
"PROCSEQN": 1,
"PROCSIZE": 2,
"PROCSCAN": "ON",
"PROCTYPE": "SIMPLE",
"LASTON": 152,
"LASTOFF": 0,
"TIMESTAMP": "2021_02_10_07:38:37",
"QD_BAD": -1,
"QD_METHOD": "",
"VELOCITY": 3784000.0,
"DOPFREQ": 1420405751.7,
"ADCSAMPF": 3000000000.0,
"VSPDELT": 65536.0,
"VSPRVAL": 19.203125,
"VSPRPIX": 16385.0,
"SIG": "T",
"CAL": "F",
"CALTYPE": "LOW",
"CALPOSITION": "Unknown",
"IFNUM": 0,
"PLNUM": 0,
"FDNUM": 0,
"HDU": 1,
"BINTABLE": 0,
"ROW": 2,
"DATE": "2022-02-14T17:25:01",
"ORIGIN": "NRAO Green Bank",
"TELESCOP": "NRAO_GBT",
"INSTRUME": "VEGAS",
"SDFITVER": "sdfits ver1.22",
"FITSVER": "1.9",
"CTYPE4": "STOKES",
"PROJID": "TGBT21A_501_11",
"BACKEND": "VEGAS",
"SITELONG": -79.83983,
"SITELAT": 38.43312,
"SITEELEV": 824.595,
"EXTNAME": "SINGLE DISH",
"FITSINDEX": 0,
"PROC": "OnOff",
"OBSTYPE": "PSWITCHON",
"SUBOBSMODE": "TPWCAL",
"INTNUM": 0,
"CUNIT1": "Hz",
"CUNIT2": "deg",
"CUNIT3": "deg",
"RESTFRQ": 1420405751.7,
"MEANTSYS": 17.16746070048293,
"WTTSYS": 17.16574907094451,
"TSCALE": "Ta*",
"TSCALFAC": 0.54321,
"TUNIT7": "K",
"BUNIT": "K",
}
for k, v in kwargs.items():
meta[k.upper()] = v
return Spectrum.make_spectrum(data, meta, observer_location=Observatory["GBT"])
# @todo allow observer or observer_location. And/or sort this out in the constructor.
[docs]
@classmethod
def make_spectrum(cls, data, meta, use_wcs=True, observer_location=None, observer=None):
# , shift_topo=False):
"""Factory method to create a `Spectrum` object from a data and header. The the data are masked,
the `Spectrum` mask will be set to the data mask.
Parameters
----------
data : `~numpy.ndarray`
The data array. See `~specutils.Spectrum`
meta : dict
The metadata, typically derived from an SDFITS header.
Required items in `meta` are 'CTYPE[123]','CRVAL[123]', 'CUNIT[123]', 'VELOCITY', 'EQUINOX', 'RADESYS'
use_wcs : bool
If True, create a WCS object from `meta`
Default: True
observer_location : `~astropy.coordinates.EarthLocation` or str
Location of the observatory. See `~dysh.coordinates.Observatory`.
This will be transformed to `~astropy.coordinates.ITRS` using the time of observation DATE-OBS or MJD-OBS in `meta`.
If this parameter is given the special str value 'from_meta', then an observer_location
will be created from SITELONG, SITELAT, and SITEELEV in the meta dictionary.
observer : `~astropy.coordinates.BaseCoordinateFrame`
Coordinate frame for the observer.
Will be ignored if DATE-OBS or MJD-OBS are present in `meta` and `observer_location` is not `None`.
Returns
-------
spectrum : `Spectrum`
The spectrum object
"""
# @todo generic check_required method since I now have this code in two places (coordinates/core.py).
# @todo requirement should be either DATE-OBS or MJD-OBS, but make_target() needs to be updated
# in that case as well.
_required = set(
[
"CRVAL1",
"CRVAL2",
"CRVAL3",
"CTYPE1",
"CTYPE2",
"CTYPE3",
"CUNIT1",
"CUNIT2",
"CUNIT3",
"VELOCITY",
"EQUINOX",
"RADESYS",
"DATE-OBS",
"VELDEF",
"RESTFRQ",
]
)
# Otherwise we change the input meta, which could lead to confusion.
_meta = deepcopy(meta)
# RADECSYS is also a valid column name. See issue #287
# https://github.com/GreenBankObservatory/dysh/issues/287
if "RADECSYS" in _meta.keys() and "RADESYS" not in _meta.keys():
_meta["RADESYS"] = deepcopy(_meta["RADECSYS"])
del _meta["RADECSYS"]
missing = set(_required).difference(set(_meta.keys()))
if len(missing) > 0:
raise ValueError(f"Header (meta) is missing one or more required keywords: {missing}")
# @todo WCS is expensive.
# Possibly figure how to calculate spectral_axis instead.
# @todo allow fix=False in WCS constructor?
if use_wcs:
with warnings.catch_warnings():
# Skip warnings about DATE-OBS being converted to MJD-OBS.
warnings.filterwarnings("ignore", category=FITSFixedWarning)
# Skip warnings FITS keywords longer than 8 chars or containing
# illegal characters (like _).
warnings.filterwarnings("ignore", category=VerifyWarning)
# lists are problematic in constructor to WCS
# so temporarily remove history and comment values
savehist = _meta.pop("HISTORY", None)
savecomment = _meta.pop("COMMENT", None)
if savecomment is None:
savecomment = _meta.pop("comments", None)
wcs = WCS(header=_meta)
if savehist is not None:
_meta["HISTORY"] = savehist
if savecomment is not None:
_meta["COMMENT"] = savecomment
# It would probably be safer to add NAXISi to meta.
if wcs.naxis > 3:
wcs.array_shape = (0, 0, 0, len(data))
# For some reason these aren't identified while creating the WCS object.
if "SITELONG" in _meta.keys():
wcs.wcs.obsgeo[:3] = _meta["SITELONG"], _meta["SITELAT"], _meta["SITEELEV"]
# Reset warnings.
else:
wcs = None
# is_topo = is_topocentric(meta["CTYPE1"]) # GBT-specific to use CTYPE1 instead of VELDEF
target = make_target(_meta)
vc = veldef_to_convention(_meta["VELDEF"])
# Define an observer as needed.
if observer is not None:
obsitrs = observer
elif observer_location is not None and (_meta.get("DATE-OBS") or _meta.get("MJD-OBS")) is not None:
obstime = Time(_meta.get("DATE-OBS") or _meta.get("MJD-OBS"))
if observer_location == "from_meta":
try:
observer_location = Observatory.get_earth_location(
_meta["SITELONG"], _meta["SITELAT"], _meta["SITEELEV"]
)
except KeyError as ke:
raise Exception(f"Not enough info to create observer_location: {ke}") # noqa: B904
obsitrs = SpectralCoord._validate_coordinate(
attach_zero_velocities(observer_location.get_itrs(obstime=obstime))
)
else:
warnings.warn( # noqa: B028
"'meta' does not contain DATE-OBS or MJD-OBS. Spectrum won't be convertible to certain coordinate"
" frames"
)
obsitrs = None
if hasattr(data, "mask"):
s = cls(
flux=data,
wcs=wcs,
meta=_meta,
velocity_convention=vc,
radial_velocity=target.radial_velocity,
rest_value=_meta["RESTFRQ"] * u.Hz,
observer=obsitrs,
target=target,
mask=data.mask,
)
else:
s = cls(
flux=data,
wcs=wcs,
meta=_meta,
velocity_convention=vc,
radial_velocity=target.radial_velocity,
rest_value=_meta["RESTFRQ"] * u.Hz,
observer=obsitrs,
target=target,
)
# For some reason, Spectrum.spectral_axis created with WCS do not inherit
# the radial velocity. In fact, they get no radial_velocity attribute at all!
# This method creates a new spectral_axis with the given radial velocity.
if observer_location is None and observer is None:
s.set_radial_velocity_to(target.radial_velocity) # open
return s
def _arithmetic_apply(self, other, op, handle_meta, **kwargs):
if isinstance(other, NDCube):
result = op(other, **{"handle_meta": handle_meta})
else:
result = op(other, **{"handle_meta": handle_meta, "meta_other_meta": False})
self._copy_attributes(result)
return result
def _copy_attributes(self, other):
"""
Copy `Spectrum` attributes after
an arithmetic operation.
Only copy attributes that are not modified by the arithmetic.
I.e., do not copy the "_flux" attribute.
"""
for k, v in vars(self).items():
if k not in IGNORE_ON_COPY:
vars(other)[k] = deepcopy(v)
def __add__(self, other):
op = self.add
handle_meta = self._add_meta
if not isinstance(other, (NDCube, u.Quantity)):
try:
other = u.Quantity(other, unit=self.unit)
except TypeError:
return NotImplemented
result = self._arithmetic_apply(other, op, handle_meta)
return result
__radd__ = __add__
def __sub__(self, other):
op = self.subtract
handle_meta = self._add_meta
if not isinstance(other, (NDCube, u.Quantity)):
try:
other = u.Quantity(other, unit=self.unit)
except TypeError:
return NotImplemented
result = self._arithmetic_apply(other, op, handle_meta)
return result
def __rsub__(self, other):
return -1 * (self - other)
def __mul__(self, other):
op = self.multiply
handle_meta = self._mul_meta
if not isinstance(other, NDCube):
other = u.Quantity(other)
result = self._arithmetic_apply(other, op, handle_meta)
return result
__rmul__ = __mul__
def __div__(self, other):
op = self.divide
handle_meta = self._div_meta
if not isinstance(other, NDCube):
other = u.Quantity(other)
result = self._arithmetic_apply(other, op, handle_meta)
return result
def __truediv__(self, other):
op = self.divide
handle_meta = self._div_meta
if not isinstance(other, NDCube):
other = u.Quantity(other)
result = self._arithmetic_apply(other, op, handle_meta)
return result
def _add_meta(self, operand, operand2, **kwargs):
kwargs.setdefault("other_meta", True)
meta = deepcopy(operand)
if kwargs["other_meta"]:
meta["EXPOSURE"] = operand["EXPOSURE"] + operand2["EXPOSURE"]
meta["DURATION"] = operand["DURATION"] + operand2["DURATION"]
return meta
def _mul_meta(self, operand, operand2=None, **kwargs):
# TBD
return deepcopy(operand)
def _div_meta(self, operand, operand2=None, **kwargs):
# TBD
return deepcopy(operand)
def __getitem__(self, item):
def q2idx(q, wcs, spectral_axis, coo, sto):
"""Quantity to index."""
if "velocity" in u.get_physical_type(q):
# Convert from velocity to channel.
idx = vel2idx(q, wcs, spectral_axis, coo, sto)
elif "length" in u.get_physical_type(q):
# Convert wavelength to channel.
idx = wav2idx(q, wcs, spectral_axis, coo, sto)
elif "frequency" in u.get_physical_type(q):
# Convert from frequency to channel.
idxs = wcs.world_to_pixel(coo, q, sto)
idx = int(np.round(idxs[0]))
return idx
def vel2idx(vel, wcs, spectral_axis, coo, sto):
eq = get_spectral_equivalency(spectral_axis.doppler_rest, spectral_axis.doppler_convention)
# Make `vel` a `SpectralCoord`.
vel_sp = spectral_axis.to(unit=vel.unit).replicate(value=vel.value, unit=vel.unit)
with u.set_enabled_equivalencies(eq):
idxs = wcs.world_to_pixel(coo, vel_sp, sto)
return int(np.round(idxs[0]))
def wav2idx(wav, wcs, spectral_axis, coo, sto):
with u.set_enabled_equivalencies(u.spectral()):
wav_sp = spectral_axis.to(unit=wav.unit).replicate(value=wav.value, unit=wav.unit)
idxs = wcs.world_to_pixel(coo, wav_sp, sto)
return int(np.round(idxs[0]))
# Only slicing along the spectral coordinate.
# Assumes that the WCS is in frequency.
if isinstance(item, slice):
if item.step is not None and item.step != 1:
raise NotImplementedError(
"Cannot slice with a step other than 1. Try smoothing and decimating if you want to downsample."
)
spectral_axis = self.spectral_axis
# Use the WCS to convert from world to pixel values.
wcs = self.wcs
# We need a sky location to convert incorporating velocity shifts.
coo = SkyCoord(
wcs.wcs.crval[wcs.wcs.lng] * wcs.wcs.cunit[wcs.wcs.lng],
wcs.wcs.crval[wcs.wcs.lat] * wcs.wcs.cunit[wcs.wcs.lat],
frame="fk5",
)
# Same for the Stokes axis.
sto = StokesCoord(0)
start = item.start
stop = item.stop
# Start.
if isinstance(start, u.Quantity):
start_idx = q2idx(start, wcs, spectral_axis, coo, sto)
else:
start_idx = start
# Stop.
if isinstance(stop, u.Quantity):
stop_idx = q2idx(stop, wcs, spectral_axis, coo, sto)
else:
stop_idx = stop
else:
raise NotImplementedError("Use a slice for slicing.")
# WCS slicing does not do well if the start is negative.
if start is not None and not isinstance(start, u.Quantity) and start < 0:
start_idx = len(self.data) + start
# Sort the start and stop indices if they are not None nor
# negative integers.
if (
start is not None
and stop is not None
and not (
(not isinstance(start, u.Quantity) and start < 0) or (not isinstance(stop, u.Quantity) and stop < 0)
)
):
start_idx, stop_idx = np.sort([start_idx, stop_idx])
# Slicing uses NumPY ordering by default.
sliced_wcs = wcs[0:1, 0:1, 0:1, start_idx:stop_idx]
# Update meta.
meta = self.meta.copy()
head = sliced_wcs.to_header()
for k in ["CRPIX1", "CRVAL1"]:
meta[k] = head[k]
# New Spectrum.
return self.make_spectrum(
Masked(self.flux[start_idx:stop_idx], self.mask[start_idx:stop_idx]),
meta=meta,
observer_location=Observatory[meta["TELESCOP"]],
)
[docs]
@log_call_to_result
def average(self, spectra, weights="tsys", align=False):
r"""
Average this `Spectrum` with `spectra`.
The resulting `average` will have an exposure equal to the sum of the exposures,
and coordinates and system temperature equal to the weighted average of the coordinates and system temperatures.
Parameters
----------
spectra : list of `Spectrum`
Spectra to be averaged. They must have the same number of channels.
No checks are done to ensure they are aligned.
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
align : bool
If `True` align the `spectra` to itself.
This uses `Spectrum.align_to`.
Returns
-------
average : `Spectrum`
Averaged spectra.
"""
if type(spectra) is not list:
spectra = [spectra]
spectra += [self]
return average_spectra(spectra, weights=weights, align=align, history=self.history)
[docs]
def cog(
self,
vc=None,
width_frac=None,
bchan=None,
echan=None,
flat_tol=0.1,
fw=1,
xunit="km/s",
):
"""
Curve of growth (CoG) analysis based on Yu et al. (2020) [1]_.
Parameters
----------
vc : float
Central velocity of the line.
If not provided, it will be estimated from the moment 1 of the `x` and `y` values.
width_frac : list
List of fractions of the total flux at which to compute the line width.
If 0.25 and 0.85 are not included, they will be added to estimate the concentration
as defined in [1]_.
bchan : int
Beginning channel where there is signal.
If not provided it will be estimated using `fw` times the width of the line at the largest `width_frac`.
echan : int
End channel where there is signal.
If not provided it will be estimated using `fw` times the width of the line at the largest `width_frac`.
flat_tol : float
Tolerance used to define the flat portion of the curve of growth.
The curve of growth will be considered flat when it's slope is within `flat_tol` times the standard deviation of the slope from zero.
fw : float
When estimating the line-free range, use `fw` times the largest width.
xunit : str or `~astropy.units.quantity`
Units for the x axis when computing the CoG.
Returns
-------
results : dict
Dictionary with the flux (:math:`F`), width (:math:`V`), flux asymmetry (:math:`A_F`), slope asymmetry (:math:`A_C`), concentration (:math:`C_V`),
rms, central velocity ("vel"), redshifted flux (:math:`F_r`), blueshifted flux (:math:`F_b`),`bchan` and `echan`, and errors on the
flux ("flux_std"), width ("width_std"), central velocity ("vel_std"), redshifted flux ("flux_r_std"), and blueshifted flux ("flux_b_std").
The rms is the standard deviation in the range outside of (bchan,echan).
.. [1] `N. Yu, L. Ho & J. Wang, "On the Determination of Rotation Velocity and Dynamical Mass of Galaxies Based on Integrated H I Spectra"
<https://ui.adsabs.harvard.edu/abs/2020ApJ...898..102Y/abstract>`_.
"""
if width_frac is None:
width_frac = [0.25, 0.65, 0.75, 0.85, 0.95]
x = self.spectral_axis.to(xunit)
y = self.flux
return curve_of_growth(x, y, vc=vc, width_frac=width_frac, bchan=bchan, echan=echan, flat_tol=flat_tol, fw=fw)
# @todo figure how how to document write()
####################################################################
# There is probably a less brute-force way to do this but I haven't
# been able to figure it out. astropy.io.registry tools are not
# well explained. register_writer 'consumes' the format keyword, so
# it cannot be passed along via a single overaarching write method,
# e.g., spectrum_writer()
####################################################################
[docs]
def ascii_spectrum_writer_basic(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="ascii.basic", **kwargs)
[docs]
def ascii_spectrum_writer_fixed_width(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="ascii.fixed_width", **kwargs)
[docs]
def ascii_spectrum_writer_ipac(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="ascii.ipac", **kwargs)
[docs]
def spectrum_writer_votable(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="votable", **kwargs)
[docs]
def spectrum_writer_ecsv(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="ascii.ecsv", **kwargs)
[docs]
def spectrum_writer_mrt(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="mrt", **kwargs)
[docs]
def spectrum_writer_fits(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="fits", **kwargs)
def _read_table(fileobj, format, **kwargs):
# GBTIDL format example:
# Scan: 152 NGC2415 2021-02-10 +07 38 37.5
# Ta
# GHz-HEL YY
# 1.4143356980054205 -0.1042543
# 1.4143349827132639 0.0525000
#
# Note we don't get the full coordinates, just RA!
if format == "gbtidl":
# Read it into a pandas dataframe, which will have one column
# with a name like 'Scan: 152 NGC2415 2021-02-10 +07 38 37.5'
# We can split out the data array
# We do not use e.g., Table.read(fileobs,data_start=3) because we need to
# get the header info and don't want to open the file twice.
# Note that functions like Table.read, pandas.read*, np.load_txt also natively
# recognize compressed files, which is why we aren't using raw Python I/O
# which does not.
# t = Table.read(fileobj, format="ascii.fixed_width") # ,data_start=3,header_start=3)
df = pd.read_table(fileobj)
df2 = df[df.columns[0]][2:].str.split(expand=True).astype(float).add_prefix("col")
spectral_axis = df2["col0"]
flux = df2["col1"]
# Parse the first line of the header and put into meta
tmp, scan, target, date, ra = df.columns[0].split(maxsplit=4)
meta = {}
meta["SCAN"] = int(scan)
meta["OBJECT"] = target
meta["DATE-OBS"] = Time(date).to_string()
c = SkyCoord(ra + " 0 0 0.0", unit=(u.hour, u.deg))
meta["RA"] = c.ra.degree
# Parse the 2nd link of the header to get the veldef and flux unit
# Sometimes velocity_convention isn't there!
h2 = df[df.columns[0]][0].split()
if len(h2) == 2:
velocity_convention = h2[0][0:4]
flux_unit = h2[1]
else:
velocity_convention = "None"
flux_unit = h2[0]
if flux_unit == "Ta":
fu = u.K
elif flux_unit == "Counts":
fu = u.ct
else:
fu = u.dimensionless_unscaled
# parse the 3rd line column names, We only care about the ferquency axis
h3 = df[df.columns[0]][1].split()
units, vd = h3[0].split("-")
spectral_axis = spectral_axis.values * u.Unit(units) # noqa: PD011
meta["VELDEF"] = velocity_convention + "-" + vd
meta["POL"] = h3[1]
s = Spectrum(flux=flux.values * fu, spectral_axis=spectral_axis, meta=meta) # noqa: PD011
return s
t = Table.read(fileobj, format=format, **kwargs)
f = t["flux"].value * t["flux"].unit
return Spectrum.make_spectrum(f, meta=t.meta, observer_location="from_meta")
[docs]
def spectrum_reader_ecsv(fileobj, **kwargs):
return _read_table(fileobj, format="ascii.ecsv", **kwargs)
[docs]
def spectrum_reader_fits(fileobj, **kwargs):
return _read_table(fileobj, format="fits", **kwargs)
[docs]
def spectrum_reader_gbtidl(fileobj, **kwargs):
return _read_table(fileobj, format="gbtidl", **kwargs)
with registry.delay_doc_updates(Spectrum):
# WRITERS
registry.register_writer("ascii.basic", Spectrum, ascii_spectrum_writer_basic)
registry.register_writer("basic", Spectrum, ascii_spectrum_writer_basic)
registry.register_writer("ascii.commented_header", Spectrum, ascii_spectrum_writer_commented_header)
registry.register_writer("commented_header", Spectrum, ascii_spectrum_writer_commented_header)
registry.register_writer("ascii.fixed_width", Spectrum, ascii_spectrum_writer_fixed_width)
registry.register_writer("fixed_width", Spectrum, ascii_spectrum_writer_fixed_width)
registry.register_writer("ascii.ipac", Spectrum, ascii_spectrum_writer_ipac)
registry.register_writer("ipac", Spectrum, ascii_spectrum_writer_ipac)
registry.register_writer("votable", Spectrum, spectrum_writer_votable)
registry.register_writer("ecsv", Spectrum, spectrum_writer_ecsv)
# UnifiedOutputRegistry.write uses all caps if format not specified
# and it believes the desired output is ecsv
registry.register_writer("ECSV", Spectrum, spectrum_writer_ecsv)
registry.register_writer("mrt", Spectrum, spectrum_writer_mrt)
registry.register_writer("fits", Spectrum, spectrum_writer_fits)
# UnifiedOutputRegistry.write uses retrurns tabular-fits if format
# not specified and it believes the desired output is fits.
registry.register_writer("tabular-fits", Spectrum, spectrum_writer_fits)
# READERS
registry.register_reader("fits", Spectrum, spectrum_reader_fits)
registry.register_reader("gbtidl", Spectrum, spectrum_reader_gbtidl)
registry.register_reader("ascii.ecsv", Spectrum, spectrum_reader_ecsv)
registry.register_reader("ecsv", Spectrum, spectrum_reader_ecsv)
# We aren't going to support these since they don't have easily digestible metadata
# if they have metadata at all.
# registry.register_writer("ascii.basic", Spectrum, ascii_spectrum_reader_basic)
# registry.register_writer("basic", Spectrum, ascii_spectrum_reader_basic)
# registry.register_writer("ascii.commented_header", Spectrum, ascii_spectrum_reader_commented_header)
# registry.register_writer("commented_header", Spectrum, ascii_spectrum_reader_commented_header)
# registry.register_writer("ascii.fixed_width", Spectrum, ascii_spectrum_reader_fixed_width)
# registry.register_writer("fixed_width", Spectrum, ascii_spectrum_reader_fixed_width)
# registry.register_writer("ascii.ipac", Spectrum, ascii_spectrum_reader_ipac)
# registry.register_writer("ipac", Spectrum, ascii_spectrum_reader_ipac)
# registry.register_writer("votable", Spectrum, spectrum_reader_votable)
# registry.register_writer("mrt", Spectrum, spectrum_reader_mrt)
[docs]
def average_spectra(spectra, weights="tsys", align=False, history=None):
r"""
Average `spectra`. The resulting `average` will have an exposure equal to the sum of the exposures,
and coordinates and system temperature equal to the weighted average of the coordinates and system temperatures.
Parameters
----------
spectra : list of `Spectrum`
Spectra to be averaged. They must have the same number of channels.
No checks are done to ensure they are aligned.
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
align : bool
If `True` align the `spectra` to the first element.
This uses `Spectrum.align_to`.
history : `dysh.log.HistoricalBase`
History to append to the averaged spectra.
Returns
-------
average : `Spectrum`
Averaged spectra.
"""
nspec = len(spectra)
nchan = len(spectra[0].data)
shape = (nspec, nchan)
_data = np.empty(shape, dtype=float)
_mask = np.zeros(shape, dtype=bool)
data_array = np.ma.MaskedArray(_data, mask=_mask, dtype=float, fill_value=np.nan)
wts = np.empty(shape, dtype=float)
exposures = np.empty(nspec, dtype=float)
tsyss = np.empty(nspec, dtype=float)
xcoos = np.empty(nspec, dtype=float)
ycoos = np.empty(nspec, dtype=float)
observer = spectra[0].observer
units = spectra[0].flux.unit
for i, s in enumerate(spectra):
if not isinstance(s, Spectrum):
raise ValueError(f"Element {i} of `spectra` is not a `Spectrum`. {type(s)}")
if units != s.flux.unit:
raise ValueError(
f"Element {i} of `spectra` has units {s.flux.unit}, but the first element has units {units}."
)
if align:
if i > 0:
s = s.align_to(spectra[0])
data_array[i] = s.data
data_array[i].mask = s.mask
if weights == "tsys":
wts[i] = core.tsys_weight(s.meta["EXPOSURE"], s.meta["CDELT1"], s.meta["TSYS"])
else:
wts[i] = 1.0
exposures[i] = s.meta["EXPOSURE"]
tsyss[i] = s.meta["TSYS"]
xcoos[i] = s.meta["CRVAL2"]
ycoos[i] = s.meta["CRVAL3"]
_mask = np.isnan(data_array.data) | data_array.mask
data_array = np.ma.MaskedArray(data_array, mask=_mask, fill_value=np.nan)
data = np.ma.average(data_array, axis=0, weights=wts)
tsys = np.ma.average(tsyss, axis=0, weights=wts[:, 0])
xcoo = np.ma.average(xcoos, axis=0, weights=wts[:, 0])
ycoo = np.ma.average(ycoos, axis=0, weights=wts[:, 0])
exposure = exposures.sum(axis=0)
new_meta = deepcopy(spectra[0].meta)
new_meta["TSYS"] = tsys
new_meta["EXPOSURE"] = exposure
new_meta["CRVAL2"] = xcoo
new_meta["CRVAL3"] = ycoo
averaged = Spectrum.make_spectrum(Masked(data * units, data.mask), meta=new_meta, observer=observer)
if history is not None:
# Keep previous history first.
averaged._history = history + averaged._history
return averaged