import warnings
from copy import deepcopy
import astropy.units as u
import numpy as np
from astropy.coordinates import SpectralCoord
from astropy.io import registry
from astropy.io.fits.verify import VerifyWarning
from astropy.modeling.fitting import LinearLSQFitter
from astropy.table import Table
from astropy.time import Time
from astropy.wcs import WCS, FITSFixedWarning
from specutils import Spectrum1D
from ..coordinates import ( # is_topocentric,; topocentric_velocity_to_frame,
KMS,
Observatory,
astropy_frame_dict,
change_ctype,
decode_veldef,
get_velocity_in_frame,
make_target,
replace_convention,
sanitize_skycoord,
veldef_to_convention,
)
from ..plot import specplot as sp
from . import baseline, get_spectral_equivalency
[docs]
class Spectrum(Spectrum1D):
"""
This class contains a spectrum and its attributes. It is built on
`~specutils.Spectrum1D` with added attributes like baseline model.
Note that `~specutils.Spectrum1D` can contain multiple spectra but
we probably will not use that because the restriction that it can
have only one spectral axis conflicts with slight Doppler shifts.
See `~specutils.Spectrum1D` for the instantiation arguments.
*Note:* `velocity_convention` should be one of {'radio', 'optical', 'relativistic'}; the old `~specutils.Spectrum1D` documentation is wrong (there should not be a 'doppler\\_' prefix).
"""
def __init__(self, *args, **kwargs):
# print(f"ARGS={args}")
self._target = kwargs.pop("target", None)
if self._target is not None:
# print(f"self._target is {self._target}")
self._target = sanitize_skycoord(self._target)
self._target.sanitized = True
self._velocity_frame = self._target.frame.name
else:
self._velocity_frame = None
# @TODO - have _observer_location attribute instead
# and observer property returns getITRS(observer_location,obstime)
self._observer = kwargs.pop("observer", None)
Spectrum1D.__init__(self, *args, **kwargs)
self._spectral_axis._target = self._target
self._spectral_axis._observer = self._observer
if "DATE-OBS" in self.meta:
self._obstime = Time(self.meta["DATE-OBS"])
else:
self._obstime = None
# if "CTYPE1" in self.meta:
# # may not need these attributes anymore.
# if self._target is not None:
# self._target.topocentric = is_topocentric(self.meta["CTYPE1"])
# self.topocentric = is_topocentric(self.meta["CTYPE1"])
# if mask is not set via the flux input (NaNs in flux or flux.mask),
# then set the mask to all False == good
if self.mask is None:
self._mask = np.full(np.shape(self.flux), False)
self._baseline_model = None
self._subtracted = False
self._exclude_regions = None
self._plotter = None
@property
def exclude_regions(self):
return self._exclude_regions
##@todo
# def exclude_region(self,region):
# where region is SpectralRegion, channels, velocity, etc. See core.py baseline method.
#
# def region_to_mask():
# set spectrum mask to True inside exclude_regions. normally we don't do this for baselining
@property
def baseline_model(self):
"""Returns the computed baseline model or None if it has not yet been computed."""
return self._baseline_model
[docs]
def baseline(self, degree, exclude=None, **kwargs):
"""
Compute and optionally remove a baseline. The model for the
baseline can be either a
`1D polynomial model <https://docs.astropy.org/en/latest/api/astropy.modeling.polynomial.Polynomial1D.html>`_ or a
`1D Chebyshev polynomial of the first kind <https://docs.astropy.org/en/latest/api/astropy.modeling.polynomial.Chebyshev1D.html>`_. The code uses `astropy.modeling`
and `astropy.fitter` to compute the baseline. See the documentation for those modules. This method will set the `baseline_model` attribute to the fitted model function which can be evaluated over a domain.
Parameters
----------
degree : int
The degree of the polynomial series, a.k.a. baseline order
exclude : list of 2-tuples of int or ~astropy.units.Quantity, or ~specutils.SpectralRegion
List of region(s) to exclude from the fit. The tuple(s) represent a range in the form [lower,upper], inclusive.
In channel units.
Examples: One channel-based region: [11,51], Two channel-based regions: [(11,51),(99,123)]. One ~astropy.units.Quantity region: [110.198*u.GHz,110.204*u.GHz]. One compound ~specutils.SpectralRegion: SpectralRegion([(110.198*u.GHz,110.204*u.GHz),(110.196*u.GHz,110.197*u.GHz)]).
Default: no exclude region
TODO: Are these OR'd with the existing mask? make that an option
TODO: Allow these to be Quantities (spectral axis units or equivalent). See list_to_spectral_region()
model : str
One of 'polynomial' or 'chebyshev', Default: 'polynomial'
fitter : `~astropy.fitting._FitterMeta`
The fitter to use. Default: `~astropy.fitter.LinearLSQFitter` (with `calc_uncertaintes=True`). Be care when choosing a different fitter to be sure it is optimized for this problem.
remove : bool
If True, the baseline is removed from the spectrum. Default: False
"""
kwargs_opts = {
"remove": False,
"model": "polynomial",
"fitter": LinearLSQFitter(calc_uncertainties=True),
}
kwargs_opts.update(kwargs)
self._baseline_model = baseline(self, degree, exclude, **kwargs)
if kwargs_opts["remove"]:
s = self.subtract(self._baseline_model(self.spectral_axis))
self._data = s._data
self._subtracted = True
def _undo_baseline(self):
"""
Undo the most recently computed baseline. If the baseline
has been subtracted, it will be added back. The `baseline_model`
attribute is set to None. Exclude regions are untouched.
"""
if self._baseline_model is None:
return
if self._subtracted:
s = self.add(self._baseline_model(self.spectral_axis))
self._data = s._data
self._baseline_model = None
def _set_exclude_regions(self, exclude):
"""
Set the mask for the regions to exclude.
Parameters
----------
exclude : list of 2-tuples of int or ~astropy.units.Quantity, or ~specutils.SpectralRegion
List of region(s) to exclude from the fit. The tuple(s) represent a range in the form [lower,upper], inclusive.
In channel units.
Examples: One channel-based region: [11,51], Two channel-based regions: [(11,51),(99,123)]. One ~astropy.units.Quantity region: [110.198*u.GHz,110.204*u.GHz]. One compound ~specutils.SpectralRegion: SpectralRegion([(110.198*u.GHz,110.204*u.GHz),(110.196*u.GHz,110.197*u.GHz)]).
"""
pass
[docs]
def list_to_spectral_region(self, inlist):
# todo utility code to convert a input list of channels or quantities to a spectral region with units of self.spectral_axis.unit. This could go in core.py
# combine this with _set_exclude_regions
pass
[docs]
def bshow(self):
"""Show the baseline model"""
print(f"baseline model {self._baseline_model}")
[docs]
def plot(self, **kwargs):
if self._plotter is None:
self._plotter = sp.SpectrumPlot(self, **kwargs)
self._plotter.plot(**kwargs)
@property
def obstime(self):
return self._obstime
@property
def plotter(self):
return self._plotter
[docs]
def stats(self):
"""
Compute some statistics of this `Spectrum`. The mean, rms,
data minimum and data maximum are calculated. Note this works
with slicing, so, e.g., `myspectrum[45:153].stats()` will return
the statistics of the slice.
Returns
-------
stats : tuple
Tuple consisting of (mean,rms,datamin,datamax)
TODO: maybe make this a dict
"""
mean = self.mean()
rms = self.data.std()
dmin = self.min()
dmax = self.max()
return (mean, rms, dmin, dmax)
[docs]
def smooth(self):
# todo use specutils.manipulation.smoothing
# option to smooth baseline too?
pass
@property
def equivalencies(self):
"""Get the spectral axis equivalencies that can be used in converting the axis
between km/s and frequency or wavelength"""
equiv = u.spectral()
sa = self.spectral_axis
if sa.doppler_rest is not None:
rfq = sa.doppler_rest
elif "RESTFREQ" in self.meta:
cunit1 = self.meta.get("CUNIT1", self.wcs.wcs.cunit[0])
# @todo this could be done with a dict str->function
rfq = self.meta["RESTFREQ"] * cunit1
else:
rfq = None
if rfq is not None:
equiv.extend(get_spectral_equivalency(rfq, self.velocity_convention))
return equiv
@property
def target(self):
return self._target
@property
def observer(self):
return self._observer
@property
def velocity_frame(self):
return self._velocity_frame
# This is already in specutils.OneDSpectrumMixin
# @property
# def velocity_convention(self):
# return self._spectral_axis.doppler_convention
@property
def doppler_convention(self):
return self.velocity_convention
[docs]
def axis_velocity(self, unit=KMS):
"""Get the spectral axis in velocity units.
*Note*: This is not the same as `Spectrum.velocity`, which includes the source radial velocity.
"""
return self._spectral_axis.to(unit)
# not needed
# def velocity_axis_to(self, unit=KMS, toframe=None, doppler_convention=None):
# if toframe is not None:
# self.set_frame(toframe)
# if doppler_convention is not None:
# return self._spectral_axis.to(unit=unit, doppler_convention=doppler_convention).to(unit)
# else:
# return self.velocity.to(unit)
[docs]
def get_velocity_shift_to(self, toframe):
if self._target is None:
raise Exception("Can't calculate velocity because Spectrum.target is None")
return get_velocity_in_frame(self._target, toframe, self._observer, self._obstime)
[docs]
def set_frame(self, toframe):
# @TODO VELDEF should be changed as well?
"""Set the sky coordinate and doppler tracking reference frame of this Spectrum. The header 'CTYPE1' will be changed accordingly.
To make a copy of this Spectrum with new coordinate referece frmae instead, use `with_frame`.
Parameters
----------
toframe - str
The coordinate reference frame identifying string, as used by astropy, e.g. 'hcrs', 'icrs', etc.
"""
if "topo" in toframe:
actualframe = self.observer # ???
else:
actualframe = astropy_frame_dict.get(toframe, toframe)
# print(f"actual frame is {actualframe}")
self._spectral_axis = self._spectral_axis.with_observer_stationary_relative_to(actualframe)
self._meta["CTYPE1"] = change_ctype(self._meta["CTYPE1"], toframe)
# @todo shouldn't have two variables for the same thing.
# Still needed?
# self.topocentric = self._target.topocentric = "topo" in toframe
[docs]
def with_frame(self, toframe):
"""Return a copy of this Spectrum with a new coordinate reference frame.
Parameters
----------
toframe - str
The coordinate reference frame identifying string, as used by astropy, e.g. 'hcrs', 'icrs', etc.
Returns
-------
spectrum : `~dysh.spectra.Spectrum`
A new Spectrum object
"""
s = deepcopy(self)
s.set_frame(toframe)
return s
[docs]
def set_convention(self, doppler_convention):
"""Set the velocity convention of this Spectrum. The spectral axis of this Spectrum will be replaced
with a new spectral axis with the input velocity convention. The header 'VELDEF' value will
be changed accordingly.
To make a copy of this Spectrum with a new velocity convention instead, use `with_velocity_convention`.
Parameters
----------
doppler_convention - str
The velocity convention, one of 'radio', 'optical', 'relativistic'
"""
# replicate() gives the same asnwer as
# self._spectral_axis.to(unit=self._spectral_axis.unit, doppler_convention=doppler_convention)
new_sp_axis = self.spectral_axis.replicate(doppler_convention=doppler_convention)
self._spectral_axis = new_sp_axis
self.meta["VELDEF"] = replace_convention(self.meta["VELDEF"], doppler_convention)
[docs]
def with_velocity_convention(self, doppler_convention):
"""Returns a copy of this Spectrum with the input velocity convention. The header 'VELDEF' value will
be changed accordingly.
Parameters
----------
doppler_convention - str
The velocity convention, one of 'radio', 'optical', 'relativistic'
Returns
-------
spectrum : `~dysh.spectra.Spectrum`
A new Spectrum object
"""
if False:
# this doesn't work.
# for some reason, the axis velocity
# still contains the difference between TOPO and observed frame
s = self.__class__(
flux=self.flux,
wcs=self.wcs,
meta=self.meta,
velocity_convention=doppler_convention,
target=self._target,
observer=self._spectral_axis.observer,
)
s.meta["VELDEF"] = replace_convention(self.meta["VELDEF"], doppler_convention)
s = deepcopy(self)
s.set_convention(doppler_convention)
return s
[docs]
def savefig(self, file, **kwargs):
"""Save the plot"""
if self._plotter is None:
raise Exception("You have to invoke plot() first")
self._plotter.figure.savefig(file, **kwargs)
def _write_table(self, fileobj, format, **kwargs):
"""
Write this `Spectrum` as an ~astropy.table.Table.
Parameters
----------
fileobj : str, file-like or `pathlib.Path`
File to write to. If a file object, must be opened in a
writeable mode.
format : str
The output format. Must be a format supported by ~astropy.table.Table.write
kwargs : variable
Additional keyword arguments supported by ~astropy.table.Table.write
"""
flux = self.flux
axis = self.spectral_axis
mask = self.mask
t = Table([axis, flux, mask], names=["spectral_axis", "flux", "mask"], meta=self.meta)
if self.uncertainty is not None:
t.add_column(self.uncertainty._array, name="uncertainty")
# f=kwargs.pop("format")
t.write(fileobj, format=format, **kwargs)
def _copy(self, **kwargs):
"""
Perform deep copy operations on each attribute of the ``Spectrum``
object.
This overrides the ``specutils.Spectrum1D`` method so that
target and observer attributes get copied.
"""
alt_kwargs = dict(
flux=deepcopy(self.flux),
spectral_axis=deepcopy(self.spectral_axis),
uncertainty=deepcopy(self.uncertainty),
wcs=deepcopy(self.wcs),
mask=deepcopy(self.mask),
meta=deepcopy(self.meta),
unit=deepcopy(self.unit),
velocity_convention=deepcopy(self.velocity_convention),
rest_value=deepcopy(self.rest_value),
target=deepcopy(self.target),
observer=deepcopy(self.observer),
)
alt_kwargs.update(kwargs)
s = self.__class__(**alt_kwargs)
# s.topocentric = is_topocentric(meta["CTYPE1"]) # GBT-specific to use CTYPE1 instead of VELDEF
return s
[docs]
@classmethod
def make_spectrum(cls, data, meta, use_wcs=True, observer_location=None):
# , shift_topo=False):
"""Factory method to create a Spectrum object from a data and header.
Parameters
----------
data : `~numpy.ndarray`
The data array. See `~specutils.Spectrum1D`
meta : dict
The metadata, typically derived from an SDFITS header. Required items in `meta` are 'CTYPE[123]','CRVAL[123]', 'CUNIT[123]', 'VELOCITY', 'EQUINOX', 'RADESYS'
use_wcs : bool
If True, create a WCS object from `meta`
observer_location : `~astropy.coordinates.EarthLocation`
Location of the observatory. See `~dysh.coordinates.Observatory`. This will be transformed to `~astropy.coordinates.ITRS` using the time of observation DATE-OBS or MJD-OBS in `meta`.
Returns
-------
spectrum : `~dysh.spectra.Spectrum`
The spectrum object
"""
# @TODO generic check_required method since I now have this code in two places (coordinates/core.py).
# should we also require DATE-OBS or MJD-OBS?
_required = set([
"CRVAL1",
"CRVAL2",
"CRVAL3",
"CTYPE1",
"CTYPE2",
"CTYPE3",
"CUNIT1",
"CUNIT2",
"CUNIT3",
"VELOCITY",
"EQUINOX",
"RADESYS",
])
# for k in _required:
# print(f"{k} {k in header}")
if not _required <= meta.keys():
raise ValueError(f"Header (meta) is missing one or more required keywords: {_required}")
# @TODO WCS is expensive.
# Possibly figure how to calculate spectral_axis instead.
# @TODO allow fix=False in WCS constructor?
if use_wcs:
# skip warnings about DATE-OBS being converted to MJD-OBS
warnings.filterwarnings("ignore", category=FITSFixedWarning)
# skip warnings FITS keywords longer than 8 chars or containing
# illegal characters (like _)
warnings.filterwarnings("ignore", category=VerifyWarning)
wcs = WCS(header=meta)
# reset warnings?
else:
wcs = None
# is_topo = is_topocentric(meta["CTYPE1"]) # GBT-specific to use CTYPE1 instead of VELDEF
target = make_target(meta)
vc = veldef_to_convention(meta["VELDEF"])
vf = astropy_frame_dict[decode_veldef(meta["VELDEF"])[1]]
# could be clever:
# obstime = Time(meta.get("DATE-OBS",meta.get("MJD-OBS",None)))
# if obstime is not None: blah blah
if "DATE-OBS" in meta:
obstime = Time(meta["DATE-OBS"])
elif "MJD-OBS" in meta:
obstime = Time(meta["MJD-OBS"])
if "DATE-OBS" in meta or "MJD-OBS" in meta:
if observer_location is None:
obsitrs = None
else:
obsitrs = SpectralCoord._validate_coordinate(observer_location.get_itrs(obstime=obstime))
else:
warnings.warn(
"'meta' does not contain DATE-OBS or MJD-OBS. Spectrum won't be convertible to certain coordinate"
" frames"
)
obsitrs = None
s = cls(
data,
wcs=wcs,
meta=meta,
velocity_convention=vc,
radial_velocity=target.radial_velocity,
rest_value=meta["RESTFRQ"] * u.Hz,
observer=obsitrs,
target=target,
)
# For some reason, Spectrum1D.spectral_axis created with WCS do not inherit
# the radial velocity. In fact, they get no radial_velocity attribute at all!
# This method creates a new spectral_axis with the given radial velocity.
if observer_location is None:
s.set_radial_velocity_to(target.radial_velocity)
# I THINK THIS IS NO LONGER NEEDED
# if shift_topo: # and is_topo
# vshift = topocentric_velocity_to_frame(target, vf, observer=Observatory["GBT"], obstime=obstime)
#
return s
# @TODO figure how how to document write()
####################################################################
# There is probably a less brute-force way to do this but I haven't
# been able to figure it out. astropy.io.registry tools are not
# well explained. register_writer 'consumes' the format keyword, so
# it cannot be passed along via a single overaarching write method,
# e.g., spectrum_writer()
####################################################################
[docs]
def ascii_spectrum_writer_basic(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="ascii.basic", **kwargs)
[docs]
def ascii_spectrum_writer_fixed_width(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="ascii.fixed_width", **kwargs)
[docs]
def ascii_spectrum_writer_ipac(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="ascii.ipac", **kwargs)
[docs]
def spectrum_writer_votable(spectrum, fileobj, **kwargs):
spectrum._write_table(fileobj, format="votable", **kwargs)
with registry.delay_doc_updates(Spectrum):
registry.register_writer("ascii.basic", Spectrum, ascii_spectrum_writer_basic)
registry.register_writer("basic", Spectrum, ascii_spectrum_writer_basic)
registry.register_writer("ascii.commented_header", Spectrum, ascii_spectrum_writer_commented_header)
registry.register_writer("commented_header", Spectrum, ascii_spectrum_writer_commented_header)
registry.register_writer("ascii.fixed_width", Spectrum, ascii_spectrum_writer_fixed_width)
registry.register_writer("fixed_width", Spectrum, ascii_spectrum_writer_fixed_width)
registry.register_writer("ascii.ipac", Spectrum, ascii_spectrum_writer_ipac)
registry.register_writer("ipac", Spectrum, ascii_spectrum_writer_ipac)
registry.register_writer("votable", Spectrum, spectrum_writer_votable)