"""
The classes that define various types of Scan and their calibration methods.
"""
import warnings
from collections import UserList
from copy import deepcopy
import astropy.units as u
import numpy as np
from astropy import constants as ac
from astropy.io.fits import BinTableHDU, Column
from astropy.table import Table, vstack
from astropy.utils.masked import Masked
from dysh.spectra import core
from ..coordinates import Observatory
from ..log import HistoricalBase, log_call_to_history, logger
from ..util import uniq
from .core import ( # fft_shift,
average,
find_non_blanks,
find_nonblank_ints,
mean_tsys,
sq_weighted_avg,
tsys_weight,
)
from .spectrum import Spectrum, average_spectra
[docs]
class SpectralAverageMixin:
[docs]
@log_call_to_history
def timeaverage(self, weights=None):
r"""Compute the time-averaged spectrum for this scan.
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
spectrum : :class:`~spectra.spectrum.Spectrum`
The time-averaged spectrum
.. note::
Data that are masked will have values set to zero. This is a feature of `numpy.ma.average`. Data mask fill value is NaN (np.nan)
"""
pass
[docs]
@log_call_to_history
def polaverage(self, weights=None):
"""Average all polarizations in this Scan"""
pass
[docs]
@log_call_to_history
def finalspectrum(self, weights=None):
"""Average all times and polarizations in this Scan"""
pass
@property
def exposure(self):
"""The array of exposure (integration) times. How the exposure is calculated
varies for different derrived classes.
Returns
-------
exposure : ~numpy.ndarray
The exposure time in units of the EXPOSURE keyword in the SDFITS header
"""
return self._exposure
@property
def delta_freq(self):
"""The array of channel frequency width"""
return self._delta_freq
@property
def tsys(self):
"""The system temperature array.
Returns
-------
tsys : `~numpy.ndarray`
System temperature values in K
"""
return self._tsys
@property
def tsys_weight(self):
r"""The system temperature weighting array computed from current
:math`T_{sys}`, :math:`t_{int}`, and :math:`\delta\nu`. See :meth:`tsys_weight`
"""
return tsys_weight(self.exposure, self.delta_freq, self.tsys)
[docs]
class ScanBase(HistoricalBase, SpectralAverageMixin):
"""This class describes the common interface to all Scan classes.
A Scan represents one scan number, one IF, one feed, and one or more polarizations.
Derived classes *must* implement :meth:`calibrate`.
"""
def __init__(self, sdfits):
HistoricalBase.__init__(self)
self._ifnum = -1
self._fdnum = -1
self._nchan = -1
self._npol = -1
self._scan = -1
self._pols = -1
self._sdfits = sdfits
# self._meta = {}
def _validate_defaults(self):
_required = {
"IFNUM": self._ifnum,
"FDNUM": self._fdnum,
"NCHAN": self._nchan,
"NPOL": self._npol,
"POLS": self._pols,
"SCAN": self._scan,
}
unset = []
for k, v in _required.items():
if v == -1:
unset.append(k)
if len(unset) > 0:
raise Exception(
f"The following required Scan attributes were not set by the derived class {self.__class__.__name__}:"
f" {unset}"
)
if type(self._scan) != int:
raise (f"{self.__class__.__name__}._scan is not an int: {type(self._scan)}")
@property
def scan(self):
"""
The scan number
Returns
-------
int
The scan number of the integrations in the Scan object
"""
return self._scan
@property
def nchan(self):
"""
The number of channels in this scan
Returns
-------
int
The number of channels in this scan
"""
return self._nchan
@property
def nrows(self):
"""The number of rows in this Scan
Returns
-------
int
The number of rows in this Scan
"""
return self._nrows
@property
def npol(self):
"""
The number of polarizations in this Scan
Returns
-------
int
The number of polarizations in this Scan
"""
return self._npol
@property
def ifnum(self):
"""The IF number
Returns
-------
int
The index of the Intermediate Frequency
"""
return self._ifnum
@property
def fdnum(self):
"""The feed number
Returns
-------
int
The index of the Feed
"""
return self._fdnum
@property
def pols(self):
"""The polarization number(s)
Returns
-------
list
The list of integer polarization number(s)
"""
return self._pols
@property
def is_calibrated(self):
"""
Have the data been calibrated?
Returns
-------
bool
True if the data have been calibrated, False if not.
"""
return self._calibrated is not None
@property
def meta(self):
"""
The metadata of this Scan. The metadata is a list of dictionaries, the length of which is
equal to the number of calibrated integrations in the Scan.
Returns
-------
dict
Dictionary containing the metadata of this Scan
"""
return self._meta
def _meta_as_table(self):
"""get the metadata as an astropy Table"""
# remember, self._meta is a list of dicts,
# and despite its name it is not Table metadata, it
# it Table Columns values
d = {}
if len(self.history) != 0:
d["HISTORY"] = self.history
if len(self.comments) != 0:
d["COMMENT"] = self.comments
return Table(self._meta, meta=d)
def _make_meta(self, rowindices):
"""
Create the metadata for a Scan. The metadata is a list of dictionaries, the length of which is
equal to the number of calibrated integrations in the Scan.
Parameters
----------
rowindices : list of int
The list of indices into the parent SDFITS index (DataFrame). These typically point to the
indices for only the, e.g. the SIG data, since the resultant metadata will be used to make the calibrated spectra,
which SIGREF data result in N/2 spectra.
Returns
-------
None
"""
# FITS can't handle NaN in a header, so just drop any column where NaN appears
# Ideally this should be done on the individual record level in say, Spectrum.make_spectrum.
df = self._sdfits.index(bintable=self._bintable_index).iloc[rowindices].dropna(axis=1, how="any")
self._meta = df.to_dict("records") # returns dict(s) with key = row number.
for i in range(len(self._meta)):
if "CUNIT1" not in self._meta[i]:
self._meta[i][
"CUNIT1"
] = "Hz" # @todo this is in gbtfits.hdu[0].header['TUNIT11'] but is it always TUNIT11?
self._meta[i]["CUNIT2"] = "deg" # is this always true?
self._meta[i]["CUNIT3"] = "deg" # is this always true?
restfrq = self._meta[i]["RESTFREQ"]
rfq = restfrq * u.Unit(self._meta[i]["CUNIT1"])
restfreq = rfq.to("Hz").value
self._meta[i]["RESTFRQ"] = restfreq # WCS wants no E
def _add_calibration_meta(self):
"""Add metadata that are computed after calibration."""
if not self.is_calibrated:
raise Exception("Data have to be calibrated first to add calibration metadata")
for i in range(len(self._meta)):
self._meta[i]["TSYS"] = self._tsys[i]
self._meta[i]["EXPOSURE"] = self._exposure[i]
self._meta[i]["NAXIS1"] = len(self._calibrated[i])
self._meta[i]["TSYS"] = self._tsys[i]
self._meta[i]["EXPOSURE"] = self.exposure[i]
def _set_if_fd(self, df):
"""Set the IF and FD numbers from the input dataframe and
raise an error of there are more than one
Parameters
----------
df : ~pandas.DataFrame
The DataFrame describing the selected data
"""
self._ifnum = uniq(df["IFNUM"])
self._fdnum = uniq(df["FDNUM"])
if len(self._ifnum) > 1:
raise Exception(f"Only one IFNUM is allowed per Scan, found {self.ifnum}")
if len(self._fdnum) > 1:
raise Exception(f"Only one FDNUM is allowed per Scan, found {self.fdnum}")
self._ifnum = self._ifnum[0]
self._fdnum = self._fdnum[0]
[docs]
def calibrate(self, **kwargs):
"""Calibrate the Scan data"""
pass
[docs]
def make_bintable(self):
"""
Create a :class:`~astropy.io.fits.BinaryTableHDU` from the calibrated data of this Scan.
Returns
-------
b : :class:`~astropy.io.fits.BinaryTableHDU`
A FITS binary table HDU, suitable for writing out or appending to a `~astropy.io.fits.HDUList`.
"""
# Creating a Table from the metadata dictionary and instantiating
# the BinTableHDU with that takes care of all the tricky
# numpy/pandas dtypes to FITS single character format string. No need
# to figure it out oneself (which I spent far too much time trying to do before I
# discovered this!)
if self._calibrated is None:
raise Exception("Data must be calibrated before writing.")
# Table metadata aren't preserved in BinTableHDU, so we
# have to grab them here and add them
# data_table = self._meta_as_table()
# table_meta = data_table.meta
cd = BinTableHDU(data=self._meta_as_table(), name="SINGLE DISH").columns
form = f"{np.shape(self._calibrated)[1]}E"
cd.add_col(Column(name="DATA", format=form, array=self._calibrated))
b = BinTableHDU.from_columns(cd, name="SINGLE DISH")
return b
[docs]
def write(self, fileobj, output_verify="exception", overwrite=False, checksum=False):
"""
Write an SDFITS format file (FITS binary table HDU) of the calibrated data in this Scan
Parameters
----------
fileobj : str, file-like or `pathlib.Path`
File to write to. If a file object, must be opened in a
writeable mode.
multifile: bool, optional
If True, write to multiple files if and only if there are multiple SDFITS files in this GBTFITSLoad.
Otherwise, write to a single SDFITS file.
output_verify : str
Output verification option. Must be one of ``"fix"``,
``"silentfix"``, ``"ignore"``, ``"warn"``, or
``"exception"``. May also be any combination of ``"fix"`` or
``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception"
(e.g. ``"fix+warn"``). See https://docs.astropy.org/en/latest/io/fits/api/verification.html for more info
overwrite : bool, optional
If ``True``, overwrite the output file if it exists. Raises an
``OSError`` if ``False`` and the output file exists. Default is
``False``.
checksum : bool
When `True` adds both ``DATASUM`` and ``CHECKSUM`` cards
to the headers of all HDU's written to the file.
Returns
-------
None.
"""
self.make_bintable().writeto(name=fileobj, output_verify=output_verify, overwrite=overwrite, checksum=checksum)
def __len__(self):
return self._nrows
[docs]
class ScanBlock(UserList, HistoricalBase, SpectralAverageMixin):
@log_call_to_history
def __init__(self, *args):
UserList.__init__(self, *args)
HistoricalBase.__init__(self)
self._nrows = 0
self._npol = 0
self._timeaveraged = []
self._polaveraged = []
self._finalspectrum = []
[docs]
@log_call_to_history
def calibrate(self, **kwargs):
"""Calibrate all scans in this ScanBlock"""
for scan in self.data:
scan.calibrate(**kwargs)
[docs]
@log_call_to_history
def timeaverage(self, weights="tsys"):
r"""Compute the time-averaged spectrum for all scans in this ScanBlock.
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
timeaverage: list of `~spectra.spectrum.Spectrum`
List of all the time-averaged spectra
.. note::
Data that are masked will have values set to zero. This is a feature of `numpy.ma.average`. Data mask fill value is NaN (np.nan)
"""
# warnings.simplefilter("ignore", NoVelocityWarning)
# average of the averages
self._timeaveraged = []
i = 0
for scan in self.data:
self._timeaveraged.append(scan.timeaverage(weights))
s = average_spectra(self._timeaveraged, weights=weights)
s.merge_commentary(self)
return s
[docs]
@log_call_to_history
def polaverage(self, weights="tsys"):
# @todo rewrite this to return a spectrum as timeaverage does now.
r"""Average all polarizations in all scans in this ScanBlock
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
polaverage: list of `~spectra.spectrum.Spectrum`
List of all the polarization-averaged spectra
"""
self._polaveraged = []
for scan in self.data:
self._polaveraged.append(scan.polaverage(weights))
return self._polaveraged
[docs]
@log_call_to_history
def finalspectrum(self, weights="tsys"):
r"""Average all times and polarizations in all scans this ScanBlock
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
finalspectra: list of `~spectra.spectrum.Spectrum`
List of all the time- and polarization-averaged spectra
"""
self._finalspectrum = []
for scan in self.data:
self._finalspectrum.append(scan.finalspectrum(weights))
return self._finalspectrum
[docs]
def write(self, fileobj, output_verify="exception", overwrite=False, checksum=False):
"""
Write an SDFITS format file (FITS binary table HDU) of the calibrated data in this Scan
Parameters
----------
fileobj : str, file-like or `pathlib.Path`
File to write to. If a file object, must be opened in a
writeable mode.
multifile: bool, optional
If True, write to multiple files if and only if there are multiple SDFITS files in this GBTFITSLoad.
Otherwise, write to a single SDFITS file.
output_verify : str
Output verification option. Must be one of ``"fix"``,
``"silentfix"``, ``"ignore"``, ``"warn"``, or
``"exception"``. May also be any combination of ``"fix"`` or
``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception"
(e.g. ``"fix+warn"``). See https://docs.astropy.org/en/latest/io/fits/api/verification.html for more info
overwrite : bool, optional
If ``True``, overwrite the output file if it exists. Raises an
``OSError`` if ``False`` and the output file exists. Default is
``False``.
checksum : bool
When `True` adds both ``DATASUM`` and ``CHECKSUM`` cards
to the headers of all HDU's written to the file.
Returns
-------
None.
"""
s0 = self.data[0]
# If there is only one scan, delegate to its write method
# this does not preserve scanblock history
# if len(self.data) == 1:
# print("only one scan")
# s0.write(fileobj, output_verify, overwrite, checksum)
# return
# Meta are the keys of the first scan's bintable except for DATA.
# We can use this to compare with the subsequent Scan
# keywords without have to create their bintables first.
defaultkeys = set(s0._meta[0].keys())
datashape = np.shape(s0._calibrated)
tablelist = [s0._meta_as_table()]
for scan in self.data: # [1:]:
# check data shapes are the same
thisshape = np.shape(scan._calibrated)
if thisshape != datashape:
# @todo Variable length arrays? https://docs.astropy.org/en/stable/io/fits/usage/unfamiliar.html#variable-length-array-tables
# or write to separate bintables.
raise Exception(
f"Data shapes of scans are not equal {thisshape}!={datashape}. Can't combine Scans into single"
" BinTableHDU"
)
# check that the header keywords are the same
diff = set(scan._meta[0].keys()) - defaultkeys
if len(diff) > 0:
raise Exception(
f"Scan header keywords are not the same. These keywords were not present in all Scans: {diff}."
" Can't combine Scans into single BinTableHDU"
)
tablelist.append(scan._meta_as_table())
# now do the same trick as in Scan.write() of adding "DATA" to the coldefs
# astropy Tables can be concatenated with vstack thankfully.
table = vstack(tablelist, join_type="exact")
# need to preserve table.meta because it gets lost in created of "cd" ColDefs
table_meta = table.meta
cd = BinTableHDU(table, name="SINGLE DISH").columns
data = np.concatenate([c._calibrated for c in self.data])
form = f"{np.shape(data)[1]}E"
cd.add_col(Column(name="DATA", format=form, array=data))
b = BinTableHDU.from_columns(cd, name="SINGLE DISH")
# preserve any meta
for k, v in table_meta.items():
if k == "HISTORY" or k == "COMMENT":
continue # will deal with these later
b.header[k] = v
for h in self._history:
b.header["HISTORY"] = h
for c in self._comments:
b.header["COMMENT"] = c
b.writeto(name=fileobj, output_verify=output_verify, overwrite=overwrite, checksum=checksum)
[docs]
class TPScan(ScanBase):
"""GBT specific version of Total Power Scan
Parameters
----------
gbtfits : `~fits.sdfitsload.SDFITSLoad`
input SDFITSLoad object
scan: int
scan number
sigstate : bool
Select the signal state used to form the data. True means select sig='T', False to select sig='F'.
None means select both. See table below for explanation.
calstate : bool
Select the calibration state used to form the data. True means select cal='T', False to select cal='F'.
None means select both. See table below for explanation.
scanrows : list-like
the list of rows in `sdfits` corresponding to sigstate integrations
calrows : dict
dictionary containing with keys 'ON' and 'OFF' containing list of rows in `sdfits` corresponding to cal=T (ON) and cal=F (OFF) integrations for `scan`
bintable : int
the index for BINTABLE in `sdfits` containing the scans
calibrate: bool
whether or not to calibrate the data. If `True`, the data will be (calon - caloff)*0.5, otherwise it will be SDFITS row data. Default:True
smoothref: int
the number of channels in the reference to boxcar smooth prior to calibration
apply_flags : boolean, optional. If True, apply flags before calibration.
Notes
-----
How the total power and system temperature are calculated, depending on signal and reference state parameters:
====== ===== =================================================================== ==================================
CAL SIG RESULT TSYS
====== ===== =================================================================== ==================================
None None data = 0.5* (REFCALON + REFCALOFF), regardless of sig state use all CAL states, all SIG states
None True data = 0.5* (REFCALON + REFCALOFF), where sig = 'T' use all CAL states, SIG='T'
None False data = 0.5* (REFCALON + REFCALOFF), where sig = 'F' use all CAL states, SIG='F'
True None data = REFCALON, regardless of sig state use all CAL states, all SIG states
False None data = REFCALOFF, regardless of sig state use all CAL states, all SIG states
True True data = REFCALON, where sig='T' use all CAL states, SIG='T'
True False data = REFCALON, where sig='F' use all CAL states, SIG='F'
False True data = REFCALOFF where sig='T' use all CAL states, SIG='T'
False False data = REFCALOFF, where sig='F' use all CAL states, SIG='F'
====== ===== =================================================================== ==================================
where `REFCALON` = integrations with `cal=T` and `REFCALOFF` = integrations with `cal=F`.
"""
def __init__(
self,
gbtfits,
scan,
sigstate,
calstate,
scanrows,
calrows,
bintable,
calibrate=True,
smoothref=1,
apply_flags=False,
observer_location=Observatory["GBT"],
):
ScanBase.__init__(self, gbtfits)
self._sdfits = gbtfits # parent class
self._scan = scan
self._sigstate = sigstate
self._calstate = calstate
self._scanrows = scanrows
self._smoothref = smoothref
self._apply_flags = apply_flags
if self._smoothref > 1:
warnings.warn(f"TP smoothref={self._smoothref} not implemented yet")
# @todo deal with data that crosses bintables
if bintable is None:
self._bintable_index = self._sdfits._find_bintable_and_row(self._scanrows[0])[0]
else:
self._bintable_index = bintable
self._observer_location = observer_location
df = self._sdfits._index
df = df.iloc[scanrows]
self._index = df
self._set_if_fd(df)
self._pols = uniq(df["PLNUM"])
self._nint = 0
self._npol = len(self._pols)
self._timeaveraged = None
self._polaveraged = None
self._nrows = len(scanrows)
self._tsys = None
if False:
self._npol = gbtfits.npol(self._bintable_index) # @todo deal with bintable
self._nint = gbtfits.nintegrations(self._bintable_index)
self._calrows = calrows
# all cal=T states where sig=sigstate
self._refonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._scanrows))))
# all cal=F states where sig=sigstate
self._refoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._scanrows))))
self._refcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refonrows]
self._refcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refoffrows]
# Catch blank integrations.
goodrows = find_nonblank_ints(self._refcalon, self._refcaloff)
self._refcalon = self._refcalon[goodrows]
self._refcaloff = self._refcaloff[goodrows]
self._refonrows = [self._refonrows[i] for i in goodrows]
self._refoffrows = [self._refoffrows[i] for i in goodrows]
self._nrows = len(self._refonrows) + len(self._refoffrows)
self._nchan = len(self._refcalon[0])
self._calibrate = calibrate
self._data = None
if self._calibrate:
self.calibrate()
self.calc_tsys()
self._validate_defaults()
[docs]
def calibrate(self):
"""Calibrate the data according to the CAL/SIG table above"""
# the way the data are formed depend only on cal state
# since we have downselected based on sig state in the constructor
if self.calstate is None:
self._data = (0.5 * (self._refcalon + self._refcaloff)).astype(float)
elif self.calstate:
self._data = self._refcalon.astype(float)
elif self.calstate == False:
self._data = self._refcaloff.astype(float)
else:
raise Exception(f"Unrecognized cal state {self.calstate}") # should never happen
@property
def sigstate(self):
"""The requested signal state
Returns
-------
bool
True if signal state is on ('T' in the SDFITS header), False otherwise ('F')
"""
return self._sigstate
@property
def calstate(self):
"""The requested calibration state
Returns
-------
bool
True if calibration state is on ('T' in the SDFITS header), False otherwise ('F')
"""
return self._calstate
[docs]
def calc_tsys(self, **kwargs):
"""
Calculate the system temperature array, according to table above.
"""
kwargs_opts = {"verbose": False}
kwargs_opts.update(kwargs)
if False:
if self.calstate is None:
tcal = list(self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["TCAL"])
nspect = len(tcal)
# calon = self._refcalcon
# caloff = self._refcaloff
elif self.calstate:
tcal = list(self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["TCAL"])
nspect = len(tcal)
# calon = self._refcalon
elif self.calstate == False:
pass
self._tcal = list(self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["TCAL"])
nspect = len(self._tcal)
self._tsys = np.empty(nspect, dtype=float) # should be same as len(calon)
# allcal = self._refonrows.copy()
# allcal.extend(self._refoffrows)
# tcal = list(self._sdfits.index(self._bintable_index).iloc[sorted(allcal)]["TCAL"])
# @todo this loop could be replaced with clever numpy
if len(self._tcal) != nspect:
raise Exception(f"TCAL length {len(tcal)} and number of spectra {nspect} don't match")
for i in range(nspect):
# tsys = mean_tsys(calon=calon[i], caloff=caloff[i], tcal=tcal[i])
tsys = mean_tsys(calon=self._refcalon[i], caloff=self._refcaloff[i], tcal=self._tcal[i])
self._tsys[i] = tsys
@property
def exposure(self):
"""The array of exposure (integration) times. The value depends on the cal state:
===== ======================================
CAL EXPOSURE
===== ======================================
None :math:`t_{EXP,REFON} + t_{EXP,REFOFF}`
True :math:`t_{EXP,REFON}`
False :math:`t_{EXP,REFOFF}`
===== ======================================
Returns
-------
exposure : `~numpy.ndarray`
The exposure time in units of the EXPOSURE keyword in the SDFITS header
"""
if self.calstate is None:
exp_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["EXPOSURE"].to_numpy()
exp_ref_off = (
self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["EXPOSURE"].to_numpy()
)
elif self.calstate:
exp_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["EXPOSURE"].to_numpy()
exp_ref_off = 0
elif self.calstate == False:
exp_ref_on = 0
exp_ref_off = (
self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["EXPOSURE"].to_numpy()
)
self._exposure = exp_ref_on + exp_ref_off
return self._exposure
@property
def delta_freq(self):
r"""Get the array of channel frequency width. The value depends on the cal state:
===== ================================================================
CAL :math:`\Delta\nu`
===== ================================================================
None :math:`0.5 * ( \Delta\nu_{REFON}+ \Delta\nu_{REFOFF} )`
True :math:`\Delta\nu_{REFON}`
False :math:`\Delta\nu_{REFOFF}`
===== ================================================================
Returns
-------
delta_freq: `~numpy.ndarray`
The channel frequency width in units of the CDELT1 keyword in the SDFITS header
"""
df_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["CDELT1"].to_numpy()
df_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["CDELT1"].to_numpy()
if self.calstate is None:
delta_freq = 0.5 * (df_ref_on + df_ref_off)
elif self.calstate:
delta_freq = df_ref_on
elif self.calstate == False:
delta_freq = df_ref_off
self._delta_freq = delta_freq
return self._delta_freq
[docs]
def total_power(self, i):
"""Return the total power spectrum
Parameters
----------
i : int
The index into the data array
Returns
-------
spectrum : `~spectra.spectrum.Spectrum`
"""
if not self._calibrate:
raise Exception("You must calibrate first to get a total power spectrum")
ser = self._sdfits.index(bintable=self._bintable_index).iloc[self._scanrows[i]]
meta = ser.dropna().to_dict()
meta["TSYS"] = self._tsys[i]
meta["EXPOSURE"] = self.exposure[i]
meta["NAXIS1"] = len(self._data[i])
if "CUNIT1" not in meta:
meta["CUNIT1"] = "Hz" # @todo this is in gbtfits.hdu[0].header['TUNIT11'] but is it always TUNIT11?
meta["CUNIT2"] = "deg" # is this always true?
meta["CUNIT3"] = "deg" # is this always true?
restfrq = meta["RESTFREQ"]
rfq = restfrq * u.Unit(meta["CUNIT1"])
restfreq = rfq.to("Hz").value
meta["RESTFRQ"] = restfreq # WCS wants no E
s = Spectrum.make_spectrum(
Masked(self._data[i] * u.ct, self._data[i].mask), meta=meta, observer_location=self._observer_location
)
s.merge_commentary(self)
return s
[docs]
@log_call_to_history
def timeaverage(self, weights="tsys"):
r"""Compute the time-averaged spectrum for this set of scans.
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
spectrum : :class:`~spectra.spectrum.Spectrum`
The time-averaged spectrum
.. note::
Data that are masked will have values set to zero. This is a feature of `numpy.ma.average`. Data mask fill value is NaN (np.nan)
"""
if self._npol > 1:
raise Exception("Can't yet time average multiple polarizations")
self._timeaveraged = deepcopy(self.total_power(0))
if weights == "tsys":
w = self.tsys_weight
else:
w = np.ones_like(self.tsys_weight)
non_blanks = find_non_blanks(self._data)[0]
self._timeaveraged._data = np.ma.average(self._data, axis=0, weights=w)
self._timeaveraged._data.set_fill_value(np.nan)
self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys[non_blanks])
self._timeaveraged.meta["WTTSYS"] = sq_weighted_avg(self._tsys[non_blanks], axis=0, weights=w[non_blanks])
self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"]
self._timeaveraged.meta["EXPOSURE"] = self.exposure[non_blanks].sum()
return self._timeaveraged
[docs]
class PSScan(ScanBase):
"""GBT specific version of Position Switch Scan. A position switch scan object has
one IF, one feed, and one or more polarizations.
Parameters
----------
gbtfits : `~fits.sdfitsload.SDFITSLoad`
input SDFITSLoad object
scans : dict
dictionary with keys 'ON' and 'OFF' containing unique list of ON (signal) and OFF (reference) scan numbers NOTE: there should be one ON and one OFF, a pair
scanrows : dict
dictionary with keys 'ON' and 'OFF' containing the list of rows in `sdfits` corresponding to ON (signal) and OFF (reference) integrations
calrows : dict
dictionary containing with keys 'ON' and 'OFF' containing list of rows in `sdfits` corresponding to cal=T (ON) and cal=F (OFF) integrations.
bintable : int
the index for BINTABLE in `sdfits` containing the scans
calibrate: bool
whether or not to calibrate the data. If true, data will be calibrated as TSYS*(ON-OFF)/OFF. Default: True
smoothref: int
the number of channels in the reference to boxcar smooth prior to calibration
apply_flags : boolean, optional. If True, apply flags before calibration.
observer_location : `~astropy.coordinates.EarthLocation`
Location of the observatory. See `~dysh.coordinates.Observatory`.
This will be transformed to `~astropy.coordinates.ITRS` using the time of
observation DATE-OBS or MJD-OBS in
the SDFITS header. The default is the location of the GBT.
"""
def __init__(
self,
gbtfits,
scans,
scanrows,
calrows,
bintable,
calibrate=True,
smoothref=1,
apply_flags=False,
observer_location=Observatory["GBT"],
):
ScanBase.__init__(self, gbtfits)
# The rows of the original bintable corresponding to ON (sig) and OFF (reg)
# self._scans = scans
# self._history = deepcopy(gbtfits._history)
self._scan = scans["ON"]
self._scanrows = scanrows
self._nrows = len(self._scanrows["ON"])
self._smoothref = smoothref
self._apply_flags = apply_flags
# calrows perhaps not needed as input since we can get it from gbtfits object?
# calrows['ON'] are rows with noise diode was on, regardless of sig or ref
# calrows['OFF'] are rows with noise diode was off, regardless of sig or ref
self._calrows = calrows
# @todo deal with data that crosses bintables
if bintable is None:
self._bintable_index = gbtfits._find_bintable_and_row(self._scanrows["ON"][0])[0]
else:
self._bintable_index = bintable
self._observer_location = observer_location
# df = selection.iloc[scanrows["ON"]]
df = self._sdfits._index.iloc[scanrows["ON"]]
self._set_if_fd(df)
self._pols = uniq(df["PLNUM"])
self._npol = len(self._pols)
if False:
self._nint = gbtfits.nintegrations(self._bintable_index)
# so quick with slicing!
self._sigonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._scanrows["ON"]))))
self._sigoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._scanrows["ON"]))))
self._refonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._scanrows["OFF"]))))
self._refoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._scanrows["OFF"]))))
self._sigcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigonrows]
self._sigcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigoffrows]
self._refcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refonrows]
self._refcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refoffrows]
# Catch blank integrations.
goodrows = find_nonblank_ints(self._sigcaloff, self._refcaloff, self._sigcalon, self._refcalon)
self._refcalon = self._refcalon[goodrows]
self._refcaloff = self._refcaloff[goodrows]
self._refonrows = [self._refonrows[i] for i in goodrows]
self._refoffrows = [self._refoffrows[i] for i in goodrows]
self._sigcalon = self._sigcalon[goodrows]
self._sigcaloff = self._sigcaloff[goodrows]
self._sigonrows = [self._sigonrows[i] for i in goodrows]
self._sigoffrows = [self._sigoffrows[i] for i in goodrows]
# Update number of rows after removing blanks.
nsigrows = len(self._sigonrows) + len(self._sigoffrows)
self._nrows = nsigrows
self._nchan = gbtfits.nchan(self._bintable_index)
self._tsys = None
self._exposure = None
self._calibrated = None
self._calibrate = calibrate
self._make_meta(self._sigonrows)
if self._calibrate:
self.calibrate()
self._validate_defaults()
# @property
# def scans(self):
# """The dictionary of the ON and OFF scan numbers in the PSScan.
#
# Returns
# -------
# scans : dict
# The scan number dictionary
#
# """
# return self._scans
# @todo something clever
# self._calibrated_spectrum = Spectrum(self._calibrated,...) [assuming same spectral axis]
[docs]
def calibrated(self, i):
"""Return the calibrated Spectrum.
Parameters
----------
i : int
The index into the calibrated array
Returns
-------
spectrum : `~spectra.spectrum.Spectrum`
"""
# @todo suppress astropy INFO message "overwriting Masked Quantity's current mask with specified mask."
s = Spectrum.make_spectrum(
Masked(self._calibrated[i] * u.K, self._calibrated[i].mask),
meta=self.meta[i],
observer_location=self._observer_location,
)
s.merge_commentary(self)
return s
[docs]
def calibrate(self, **kwargs):
"""
Position switch calibration, following equations 1 and 2 in the GBTIDL calibration manual
"""
kwargs_opts = {"verbose": False}
kwargs_opts.update(kwargs)
if self._smoothref > 1 and kwargs_opts["verbose"]:
print(f"PS smoothref={self._smoothref}")
self._status = 1
nspect = self.nrows // 2
self._calibrated = np.ma.empty((nspect, self._nchan), dtype="d")
self._tsys = np.empty(nspect, dtype="d")
self._exposure = np.empty(nspect, dtype="d")
tcal = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["TCAL"].to_numpy()
# @todo this loop could be replaced with clever numpy
if len(tcal) != nspect:
raise Exception(f"TCAL length {len(tcal)} and number of spectra {nspect} don't match")
for i in range(nspect):
tsys = mean_tsys(calon=self._refcalon[i], caloff=self._refcaloff[i], tcal=tcal[i])
sig = 0.5 * (self._sigcalon[i] + self._sigcaloff[i])
ref = 0.5 * (self._refcalon[i] + self._refcaloff[i])
if self._smoothref > 1:
ref = core.smooth(ref, "boxcar", self._smoothref)
self._calibrated[i] = tsys * (sig - ref) / ref
self._tsys[i] = tsys
self._exposure[i] = self.exposure[i]
logger.debug(f"Calibrated {nspect} spectra")
self._add_calibration_meta()
# tip o' the hat to Pedro S. for exposure and delta_freq
@property
def exposure(self):
"""The array of exposure (integration) times
exposure = [ 0.5*(exp_ref_on + exp_ref_off) + 0.5*(exp_sig_on + exp_sig_off) ] / 2
Returns
-------
exposure : ~numpy.ndarray
The exposure time in units of the EXPOSURE keyword in the SDFITS header
"""
exp_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["EXPOSURE"].to_numpy()
exp_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["EXPOSURE"].to_numpy()
exp_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["EXPOSURE"].to_numpy()
exp_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["EXPOSURE"].to_numpy()
exp_ref = exp_ref_on + exp_ref_off
exp_sig = exp_sig_on + exp_sig_off
if self._smoothref > 1:
nsmooth = self._smoothref
else:
nsmooth = 1.0
exposure = exp_sig * exp_ref * nsmooth / (exp_sig + exp_ref * nsmooth)
return exposure
@property
def delta_freq(self):
"""Get the array of channel frequency width
df = [ 0.5*(df_ref_on + df_ref_off) + 0.5*(df_sig_on + df_sig_off) ] / 2
Returns
-------
delta_freq: ~numpy.ndarray
The channel frequency width in units of the CDELT1 keyword in the SDFITS header
"""
df_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["CDELT1"].to_numpy()
df_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["CDELT1"].to_numpy()
df_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["CDELT1"].to_numpy()
df_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["CDELT1"].to_numpy()
df_ref = 0.5 * (df_ref_on + df_ref_off)
df_sig = 0.5 * (df_sig_on + df_sig_off)
delta_freq = 0.5 * (df_ref + df_sig)
return delta_freq
[docs]
@log_call_to_history
def timeaverage(self, weights="tsys"):
r"""Compute the time-averaged spectrum for this set of scans.
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
spectrum : :class:`~spectra.spectrum.Spectrum`
The time-averaged spectrum
.. note::
Data that are masked will have values set to zero. This is a feature of `numpy.ma.average`. Data mask fill value is NaN (np.nan)
"""
if self._calibrated is None or len(self._calibrated) == 0:
raise Exception("You can't time average before calibration.")
if self._npol > 1:
raise Exception("Can't yet time average multiple polarizations")
self._timeaveraged = deepcopy(self.calibrated(0)) # ._copy()
data = self._calibrated
if weights == "tsys":
w = self.tsys_weight
else:
w = np.ones_like(self.tsys_weight)
self._timeaveraged._data = np.ma.average(data, axis=0, weights=w)
self._timeaveraged._data.set_fill_value(np.nan)
non_blanks = find_non_blanks(data)
self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys[non_blanks])
self._timeaveraged.meta["WTTSYS"] = sq_weighted_avg(self._tsys[non_blanks], axis=0, weights=w[non_blanks])
self._timeaveraged.meta["EXPOSURE"] = np.sum(self._exposure[non_blanks])
self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"]
self._timeaveraged._history = self._history
self._timeaveraged._observer_location = self._observer_location
return self._timeaveraged
[docs]
class NodScan(ScanBase):
"""GBT specific version of Nodding Scan. A nod scan object has
one IF, two feeds, and one or more polarizations.
Parameters
----------
gbtfits : `~fits.sdfitsload.SDFITSLoad`
input SDFITSLoad object
scan : dict
dictionary with keys 'ON' and 'OFF' containing unique list of ON (signal) and OFF (reference) scan numbers
NOTE: there should be one ON and one OFF, a pair. There should be at least two beams (the nodding beams)
which will be resp. on source in each scan.
beam1: bool
Is this scan BEAM1 or BEAM2? BEAM1 is defined as being on source in the first scan of the pair, BEAM2 in the second of the pair
scanrows : dict
dictionary with keys 'ON' and 'OFF' containing the list of rows in `sdfits` corresponding to ON (signal) and OFF (reference) integrations
calrows : dict
dictionary containing with keys 'ON' and 'OFF' containing list of rows in `sdfits` corresponding to cal=T (ON) and cal=F (OFF) integrations.
bintable : int
the index for BINTABLE in `sdfits` containing the scans
calibrate: bool
whether or not to calibrate the data. If true, data will be calibrated as TSYS*(ON-OFF)/OFF.
Default: True
smoothref: int
the number of channels in the reference to boxcar smooth prior to calibration (if applicable)
apply_flags : boolean, optional. If True, apply flags before calibration.
observer_location : `~astropy.coordinates.EarthLocation`
Location of the observatory. See `~dysh.coordinates.Observatory`.
This will be transformed to `~astropy.coordinates.ITRS` using the time of
observation DATE-OBS or MJD-OBS in
the SDFITS header. The default is the location of the GBT.
"""
def __init__(
self,
gbtfits,
scan,
beam1,
scanrows,
calrows,
bintable,
calibrate=True,
smoothref=1,
apply_flags=False,
observer_location=Observatory["GBT"],
):
ScanBase.__init__(self, gbtfits)
self._scan = scan["ON"]
self._scanrows = scanrows
self._nrows = len(self._scanrows["ON"])
self._smoothref = smoothref
self._apply_flags = apply_flags
self._beam1 = beam1
# @todo allow having no calrow where noise diode was not fired
# calrows perhaps not needed as input since we can get it from gbtfits object?
# calrows['ON'] are rows with noise diode was on, regardless of sig or ref
# calrows['OFF'] are rows with noise diode was off, regardless of sig or ref
self._calrows = calrows
# @todo deal with data that crosses bintables
if bintable is None:
self._bintable_index = gbtfits._find_bintable_and_row(self._scanrows["ON"][0])[0]
else:
self._bintable_index = bintable
self._observer_location = observer_location
# df = selection.iloc[scanrows["ON"]]
df = self._sdfits._index.iloc[scanrows["ON"]]
self._set_if_fd(df)
self._pols = uniq(df["PLNUM"])
self._npol = len(self._pols)
if False:
self._nint = gbtfits.nintegrations(self._bintable_index)
# so quick with slicing!
self._sigonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._scanrows["ON"]))))
self._sigoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._scanrows["ON"]))))
self._refonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._scanrows["OFF"]))))
self._refoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._scanrows["OFF"]))))
if beam1:
self._sigcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigonrows]
self._sigcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigoffrows]
self._refcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refonrows]
self._refcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refoffrows]
else:
self._sigcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refonrows]
self._sigcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refoffrows]
self._refcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigonrows]
self._refcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigoffrows]
# Catch blank integrations.
goodrows = find_nonblank_ints(self._sigcaloff, self._refcaloff, self._sigcalon, self._refcalon)
self._refcalon = self._refcalon[goodrows]
self._refcaloff = self._refcaloff[goodrows]
self._refonrows = [self._refonrows[i] for i in goodrows]
self._refoffrows = [self._refoffrows[i] for i in goodrows]
self._sigcalon = self._sigcalon[goodrows]
self._sigcaloff = self._sigcaloff[goodrows]
self._sigonrows = [self._sigonrows[i] for i in goodrows]
self._sigoffrows = [self._sigoffrows[i] for i in goodrows]
# Update number of rows after removing blanks.
nsigrows = len(self._sigonrows) + len(self._sigoffrows)
self._nrows = nsigrows
self._nchan = len(self._sigcalon[0])
self._tsys = None
self._exposure = None
self._calibrated = None
self._calibrate = calibrate
self._make_meta(self._sigonrows)
if self._calibrate:
self.calibrate()
self._validate_defaults()
# @property
# def scans(self):
# """The dictionary of the ON and OFF scan numbers in the PSScan.
#
# Returns
# -------
# scans : dict
# The scan number dictionary
#
# """
# return self._scans
# @todo something clever
# self._calibrated_spectrum = Spectrum(self._calibrated,...) [assuming same spectral axis]
[docs]
def calibrated(self, i):
"""Return the calibrated Spectrum.
Parameters
----------
i : int
The index into the calibrated array
Returns
-------
spectrum : `~spectra.spectrum.Spectrum`
"""
s = Spectrum.make_spectrum(
Masked(self._calibrated[i] * u.K, self._calibrated[i].mask),
meta=self.meta[i],
observer_location=self._observer_location,
)
s.merge_commentary(self)
return s
[docs]
def calibrate(self, **kwargs):
"""
Position switch calibration, following equations 1 and 2 in the GBTIDL calibration manual
"""
kwargs_opts = {"verbose": False}
kwargs_opts.update(kwargs)
if self._smoothref > 1 and kwargs_opts["verbose"]:
print(f"PS smoothref={self._smoothref}")
self._status = 1
nspect = self.nrows // 2
self._calibrated = np.ma.empty((nspect, self._nchan), dtype="d")
self._tsys = np.empty(nspect, dtype="d")
self._exposure = np.empty(nspect, dtype="d")
tcal = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["TCAL"].to_numpy()
# @todo this loop could be replaced with clever numpy
if len(tcal) != nspect:
raise Exception(f"TCAL length {len(tcal)} and number of spectra {nspect} don't match")
for i in range(nspect):
tsys = mean_tsys(calon=self._refcalon[i], caloff=self._refcaloff[i], tcal=tcal[i])
sig = 0.5 * (self._sigcalon[i] + self._sigcaloff[i])
ref = 0.5 * (self._refcalon[i] + self._refcaloff[i])
if self._smoothref > 1:
ref = core.smooth(ref, "boxcar", self._smoothref)
self._calibrated[i] = tsys * (sig - ref) / ref
self._tsys[i] = tsys
self._exposure[i] = self.exposure[i]
logger.debug(f"Calibrated {nspect} spectra")
self._add_calibration_meta()
# tip o' the hat to Pedro S. for exposure and delta_freq
@property
def exposure(self):
"""The array of exposure (integration) times
exposure = [ 0.5*(exp_ref_on + exp_ref_off) + 0.5*(exp_sig_on + exp_sig_off) ] / 2
Returns
-------
exposure : ~numpy.ndarray
The exposure time in units of the EXPOSURE keyword in the SDFITS header
"""
exp_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["EXPOSURE"].to_numpy()
exp_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["EXPOSURE"].to_numpy()
exp_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["EXPOSURE"].to_numpy()
exp_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["EXPOSURE"].to_numpy()
exp_ref = exp_ref_on + exp_ref_off
exp_sig = exp_sig_on + exp_sig_off
if self._smoothref > 1:
nsmooth = self._smoothref
else:
nsmooth = 1.0
exposure = exp_sig * exp_ref * nsmooth / (exp_sig + exp_ref * nsmooth)
return exposure
@property
def delta_freq(self):
"""Get the array of channel frequency width
df = [ 0.5*(df_ref_on + df_ref_off) + 0.5*(df_sig_on + df_sig_off) ] / 2
Returns
-------
delta_freq: ~numpy.ndarray
The channel frequency width in units of the CDELT1 keyword in the SDFITS header
"""
df_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["CDELT1"].to_numpy()
df_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["CDELT1"].to_numpy()
df_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["CDELT1"].to_numpy()
df_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["CDELT1"].to_numpy()
df_ref = 0.5 * (df_ref_on + df_ref_off)
df_sig = 0.5 * (df_sig_on + df_sig_off)
delta_freq = 0.5 * (df_ref + df_sig)
return delta_freq
[docs]
@log_call_to_history
def timeaverage(self, weights="tsys"):
r"""Compute the time-averaged spectrum for this set of scans.
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
spectrum : :class:`~spectra.spectrum.Spectrum`
The time-averaged spectrum
.. note::
Data that are masked will have values set to zero. This is a feature of `numpy.ma.average`. Data mask fill value is NaN (np.nan)
"""
if self._calibrated is None or len(self._calibrated) == 0:
raise Exception("You can't time average before calibration.")
if self._npol > 1:
raise Exception("Can't yet time average multiple polarizations")
self._timeaveraged = deepcopy(self.calibrated(0))
data = self._calibrated
if weights == "tsys":
w = self.tsys_weight
else:
w = np.ones_like(self.tsys_weight)
self._timeaveraged._data = np.ma.average(data, axis=0, weights=w)
self._timeaveraged._data.set_fill_value(np.nan)
non_blanks = find_non_blanks(data)
self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys[non_blanks])
self._timeaveraged.meta["WTTSYS"] = sq_weighted_avg(self._tsys[non_blanks], axis=0, weights=w[non_blanks])
self._timeaveraged.meta["EXPOSURE"] = np.sum(self._exposure[non_blanks])
self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"]
self._timeaveraged._history = self._history
return self._timeaveraged
[docs]
class FSScan(ScanBase):
"""GBT specific version of Frequency Switch Scan
Parameters
----------
gbtfits : `~fits.sdfitsload.SDFITSLoad`
input SDFITSLoad object
scan : int
Scan number that contains integrations with a series of sig/ref and calon/caloff states.
sigrows :dict
Dictionary containing with keys 'ON' and 'OFF' containing list of rows in `sdfits`
corresponding to sig=T (ON) and sig=F (OFF) integrations.
calrows : dict
Dictionary containing with keys 'ON' and 'OFF' containing list of rows in `sdfits`
corresponding to cal=T (ON) and cal=F (OFF) integrations.
bintable : int
The index for BINTABLE in `sdfits` containing the scans.
calibrate : bool
Whether or not to calibrate the data. If true, data will be calibrated as TSYS*(ON-OFF)/OFF.
Default: True
fold : bool
Whether or not to fold the spectrum. Default: True
shift_method : str
Method to use when shifting the spectra for folding. One of 'fft' or 'interpolate'.
'fft' uses a phase shift in the time domain. 'interpolate' interpolates the signal. Default: 'fft'
use_sig : bool
Whether to use the sig as the sig, or the ref as the sig. Default: True
smoothref: int
The number of channels in the reference to boxcar smooth prior to calibration.
apply_flags : boolean, optional. If True, apply flags before calibration.
observer_location : `~astropy.coordinates.EarthLocation`
Location of the observatory. See `~dysh.coordinates.Observatory`.
This will be transformed to `~astropy.coordinates.ITRS` using the time of
observation DATE-OBS or MJD-OBS in the SDFITS header. The default is the location of the GBT.
"""
def __init__(
self,
gbtfits,
scan,
sigrows,
calrows,
bintable,
calibrate=True,
fold=True,
shift_method="fft",
use_sig=True,
smoothref=1,
apply_flags=False,
observer_location=Observatory["GBT"],
debug=False,
):
ScanBase.__init__(self, gbtfits)
# The rows of the original bintable corresponding to ON (sig) and OFF (reg)
self._scan = scan # for FS everything is an "ON"
self._sigrows = sigrows # dict with "ON" and "OFF"
self._calrows = calrows # dict with "ON" and "OFF"
self._folded = False
self._use_sig = use_sig
self._smoothref = smoothref
if self._smoothref > 1:
print(f"FS smoothref={self._smoothref} not implemented yet")
self._apply_flags = apply_flags
self._sigonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._sigrows["ON"]))))
self._sigoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._sigrows["ON"]))))
self._refonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._sigrows["OFF"]))))
self._refoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._sigrows["OFF"]))))
self._debug = debug
if self._debug:
logger.debug("---------------------------------------------------")
logger.debug("FSSCAN: ")
logger.debug(f"SigOff {self._sigoffrows}")
logger.debug(f"SigOn {self._sigonrows}")
logger.debug(f"RefOff {self._refoffrows}")
logger.debug(f"RefOn {self._refonrows}")
nsigrows = len(self._sigonrows) + len(self._sigoffrows)
nrefrows = len(self._refonrows) + len(self._refoffrows)
if nsigrows != nrefrows:
raise Exception("Number of sig rows does not match ref rows. Dangerous to proceed")
logger.debug(f"sigonrows {nsigrows}, {self._sigonrows}")
self._nrows = nsigrows
a_scanrow = self._sigonrows[0]
# @todo deal with data that crosses bintables
if bintable is None:
self._bintable_index = gbtfits._find_bintable_and_row(a_scanrow)[0]
else:
self._bintable_index = bintable
if self._debug:
logger.debug(f"bintable index is {self._bintable_index}")
self._observer_location = observer_location
self._scanrows = list(set(self._calrows["ON"])) + list(set(self._calrows["OFF"]))
df = self._sdfits._index.iloc[self._scanrows]
if self._debug:
logger.debug(f"{len(df) = }")
self._set_if_fd(df)
self._pols = uniq(df["PLNUM"])
if self._debug:
logger.debug(f"FSSCAN #pol = {self._pols}")
self._npol = len(self._pols)
if False:
self._nint = gbtfits.nintegrations(self._bintable_index)
# @todo use gbtfits.velocity_convention(veldef,velframe)
# so quick with slicing!
self._sigcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigonrows]
self._sigcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._sigoffrows]
self._refcalon = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refonrows]
self._refcaloff = gbtfits.rawspectra(self._bintable_index, setmask=apply_flags)[self._refoffrows]
# Catch blank integrations.
goodrows = find_nonblank_ints(self._sigcaloff, self._refcaloff, self._sigcalon, self._refcalon)
self._refcalon = self._refcalon[goodrows]
self._refcaloff = self._refcaloff[goodrows]
self._refonrows = [self._refonrows[i] for i in goodrows]
self._refoffrows = [self._refoffrows[i] for i in goodrows]
self._sigcalon = self._sigcalon[goodrows]
self._sigcaloff = self._sigcaloff[goodrows]
self._sigonrows = [self._sigonrows[i] for i in goodrows]
self._sigoffrows = [self._sigoffrows[i] for i in goodrows]
# Update number of rows after removing blanks.
nsigrows = len(self._sigonrows) + len(self._sigoffrows)
self._nrows = nsigrows
self._nchan = len(self._sigcalon[0])
self._tsys = None
self._exposure = None
self._calibrated = None
self._calibrate = calibrate
self._make_meta(self._sigonrows)
if self._calibrate:
self.calibrate(fold=fold, shift_method=shift_method)
if self._debug:
logger.debug("---------------------------------------------------")
self._validate_defaults()
@property
def folded(self):
"""
Has the FSscan been folded?
Returns
-------
boolean
True if the signal and reference integrations have been folded. False if not.
"""
return self._folded
[docs]
def calibrated(self, i):
"""Return the calibrated Spectrum of this FSscan
Parameters
----------
i : int
The index into the calibrated array
Returns
-------
spectrum : `~spectra.spectrum.Spectrum`
"""
s = Spectrum.make_spectrum(
Masked(self._calibrated[i] * u.K, self._calibrated[i].mask),
meta=self.meta[i],
observer_location=self._observer_location,
)
s.merge_commentary(self)
return s
[docs]
def calibrate(self, **kwargs):
"""
Frequency switch calibration, following equations ...
fold=True or fold=False is required
"""
if self._debug:
logger.debug(f'FOLD={kwargs["fold"]}')
logger.debug(f'METHOD={kwargs["shift_method"]}')
# some helper functions, courtesy proto_getfs.py
def channel_to_frequency(crval1, crpix1, cdelt1, vframe, nchan, nint, ndim=1):
""" """
# Compute the correction factor.
beta = (vframe * u.m / u.s) / ac.c
vcorr = np.sqrt((1.0 + beta) / (1.0 - beta))
# The +1 is to start counting from 1.
indx = np.arange(nchan) + 1
if ndim == 1:
freq = crval1 + cdelt1 * (indx - crpix1)
freq *= vcorr
elif ndim == 2:
indx = np.tile(indx, (nint, 1))
freq = crval1[:, np.newaxis] + cdelt1[:, np.newaxis] * (indx - crpix1[:, np.newaxis])
freq *= vcorr[:, np.newaxis]
return freq
def index_frequency(df):
"""
Create a frequency axis from an index.
This assumes all entries in the index have the same number of channels.
"""
# Could you do this with gbtfits.getspec(row).spectral_axis?
ndim = len(df.shape)
nint = df.shape[0]
if ndim == 1:
nchan = np.array([int(df["TDIM7"][1:-1].split(",")[0])])
else:
nchan = np.array([int(df["TDIM7"].iloc[i][1:-1].split(",")[0]) for i in range(len(df))])
crval1 = df["CRVAL1"]
crpix1 = df["CRPIX1"]
cdelt1 = df["CDELT1"]
vframe = df["VFRAME"] # Use the velocity frame requested by the user.
if ndim == 2:
crval1 = crval1.to_numpy()
crpix1 = crpix1.to_numpy()
cdelt1 = cdelt1.to_numpy()
vframe = vframe.to_numpy()
freq = channel_to_frequency(crval1, crpix1, cdelt1, vframe, nchan[0], nint, ndim=ndim)
# Apply units.
try:
cunit1 = u.Unit(df["CUNIT1"])
if ndim == 2:
cunit1 = cunit[0] # @todo undefined cunit[]
except KeyError:
cunit1 = u.Hz
return freq * cunit1
def do_total_power(no_cal, cal, tcal):
""" """
def vec_mean_tsys(on, off, tcal):
"""
mean_tsys implements this, albeit only in 1D
"""
pass
def do_sig_ref(sig, ref, tsys, smooth=False):
"""
smooth=True would implement smoothing the reference (or something)
"""
return (sig - ref) / ref * tsys
def do_fold(sig, ref, sig_freq, ref_freq, remove_wrap=False, shift_method="fft"):
""" """
chan_shift = (sig_freq[0] - ref_freq[0]) / np.abs(np.diff(sig_freq)).mean()
logger.debug(f"do_fold: {sig_freq[0]}, {ref_freq[0]},{chan_shift}")
ref_shift = core.data_shift(ref, chan_shift, remove_wrap=remove_wrap, method=shift_method)
# @todo weights
avg = (sig + ref_shift) / 2
return avg
kwargs_opts = {"verbose": False}
kwargs_opts.update(kwargs)
_fold = kwargs.get("fold", False)
_mode = 1 # 1: keep the sig else: keep the ref (not externally supported)
nspect = self.nrows // 2
self._calibrated = np.ma.empty((nspect, self._nchan), dtype="d")
self._tsys = np.empty(nspect, dtype="d")
self._exposure = np.empty(nspect, dtype="d")
#
sig_freq = self._sigcalon[0]
df_sig = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]
df_ref = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]
logger.debug(f"df_sig {type(df_sig)} len(df_sig)")
sig_freq = index_frequency(df_sig)
ref_freq = index_frequency(df_ref)
chan_shift = abs(sig_freq[0, 0] - ref_freq[0, 0]) / np.abs(np.diff(sig_freq)).mean()
logger.debug(f"FS: shift={chan_shift:g} nchan={self._nchan:g}")
# tcal is the same for REF and SIG, and the same for all integrations actually.
tcal = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["TCAL"].to_numpy()
logger.debug(f"TCAL: {len(tcal)} {tcal[0]}")
if len(tcal) != nspect:
raise Exception(f"TCAL length {len(tcal)} and number of spectra {nspect} don't match")
# @todo the nspect loop could be replaced with clever numpy?
for i in range(nspect):
tsys_sig = mean_tsys(calon=self._sigcalon[i], caloff=self._sigcaloff[i], tcal=tcal[i])
tsys_ref = mean_tsys(calon=self._refcalon[i], caloff=self._refcaloff[i], tcal=tcal[i])
if i == 0:
logger.debug(f"Tsys(sig/ref)[0]={tsys_sig} / {tsys_ref}")
tp_sig = 0.5 * (self._sigcalon[i] + self._sigcaloff[i])
tp_ref = 0.5 * (self._refcalon[i] + self._refcaloff[i])
#
cal_sig = do_sig_ref(tp_sig, tp_ref, tsys_ref)
cal_ref = do_sig_ref(tp_ref, tp_sig, tsys_sig)
#
if _fold:
cal_sig_fold = do_fold(cal_sig, cal_ref, sig_freq[i], ref_freq[i], shift_method=kwargs["shift_method"])
cal_ref_fold = do_fold(cal_ref, cal_sig, ref_freq[i], sig_freq[i], shift_method=kwargs["shift_method"])
self._folded = True
if self._use_sig:
self._calibrated[i] = cal_sig_fold
self._tsys[i] = tsys_ref
else:
self._calibrated[i] = cal_ref_fold
self._tsys[i] = tsys_sig
self._exposure[i] = 2 * self.exposure[i] # @todo
else:
if self._use_sig:
self._calibrated[i] = cal_sig
self._tsys[i] = tsys_ref
else:
self._calibrated[i] = cal_ref
self._tsys[i] = tsys_sig
self._exposure[i] = self.exposure[i]
self._add_calibration_meta()
logger.debug(f"Calibrated {nspect} spectra with fold={_fold} and use_sig={self._use_sig}")
# tip o' the hat to Pedro S. for exposure and delta_freq
@property
def exposure(self):
"""The array of exposure (integration) times for FSscan
exposure = [ 0.5*(exp_ref_on + exp_ref_off) + 0.5*(exp_sig_on + exp_sig_off) ] / 2
Returns
-------
exposure : ~numpy.ndarray
The exposure time in units of the EXPOSURE keyword in the SDFITS header
"""
exp_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["EXPOSURE"].to_numpy()
exp_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["EXPOSURE"].to_numpy()
exp_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["EXPOSURE"].to_numpy()
exp_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["EXPOSURE"].to_numpy()
exp_ref = exp_ref_on + exp_ref_off
exp_sig = exp_sig_on + exp_sig_off
if self._smoothref > 1:
nsmooth = self._smoothref
else:
nsmooth = 1.0
self._exposure = exp_sig * exp_ref * nsmooth / (exp_sig + exp_ref * nsmooth)
return self._exposure
@property
def delta_freq(self):
"""Get the array of channel frequency width
df = [ 0.5*(df_ref_on + df_ref_off) + 0.5*(df_sig_on + df_sig_off) ] / 2
Returns
-------
delta_freq: ~numpy.ndarray
The channel frequency width in units of the CDELT1 keyword in the SDFITS header
"""
df_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["CDELT1"].to_numpy()
df_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["CDELT1"].to_numpy()
df_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["CDELT1"].to_numpy()
df_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["CDELT1"].to_numpy()
df_ref = 0.5 * (df_ref_on + df_ref_off)
df_sig = 0.5 * (df_sig_on + df_sig_off)
self._delta_freq = 0.5 * (df_ref + df_sig)
return self._delta_freq
[docs]
def timeaverage(self, weights="tsys"):
r"""Compute the time-averaged spectrum for this set of FSscans.
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
spectrum : :class:`~spectra.spectrum.Spectrum`
The time-averaged spectrum
.. note::
Data that are masked will have values set to zero. This is a feature of `numpy.ma.average`. Data mask fill value is NaN (np.nan)
"""
if self._calibrated is None or len(self._calibrated) == 0:
raise Exception("You can't time average before calibration.")
if self._npol > 1:
raise Exception("Can't yet time average multiple polarizations %d" % self._npol)
self._timeaveraged = deepcopy(self.calibrated(0))
data = self._calibrated
if weights == "tsys":
w = self.tsys_weight
else:
w = np.ones_like(self.tsys_weight)
self._timeaveraged._data = np.ma.average(data, axis=0, weights=w)
self._timeaveraged._data.set_fill_value(np.nan)
non_blanks = find_non_blanks(data)
self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys[non_blanks])
self._timeaveraged.meta["WTTSYS"] = sq_weighted_avg(self._tsys[non_blanks], axis=0, weights=w[non_blanks])
self._timeaveraged.meta["EXPOSURE"] = np.sum(self._exposure[non_blanks])
self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"]
return self._timeaveraged
[docs]
class SubBeamNodScan(ScanBase):
r"""
Parameters
----------
sigtp: list of ~spectra.scan.TPScan
Signal total power scans
reftp: list ~spectra.scan.TPScan
Reference total power scans
calibrate: bool
Whether or not to calibrate the data.
smoothref: int
the number of channels in the reference to boxcar smooth prior to calibration
apply_flags : boolean, optional. If True, apply flags before calibration.
observer_location : `~astropy.coordinates.EarthLocation`
Location of the observatory. See `~dysh.coordinates.Observatory`.
This will be transformed to `~astropy.coordinates.ITRS` using the time of
observation DATE-OBS or MJD-OBS in
the SDFITS header. The default is the location of the GBT.
weights: str
Weighting scheme to use when averaging the signal and reference scans
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
"""
def __init__(
self,
sigtp,
reftp,
calibrate=True,
smoothref=1,
apply_flags=False,
observer_location=Observatory["GBT"],
**kwargs,
):
ScanBase.__init__(self, sigtp[0]._sdfits)
kwargs_opts = {
"timeaverage": False,
"weights": "tsys", # or None or ndarray
"debug": False,
}
kwargs_opts.update(kwargs)
w = kwargs_opts["weights"]
if len(reftp) != len(sigtp):
raise ValueError(
f"Reference and signal total power arrays are different lengths: {len(reftp)} != {len(sigtp)}"
)
self._scan = sigtp[0]._scan
self._sigtp = sigtp
self._reftp = reftp
self._ifnum = self._sigtp[0].ifnum
self._fdnum = self._sigtp[0].fdnum
self._npol = self._sigtp[0].npol
self._pols = self._sigtp[0]._pols
self._nchan = len(reftp[0]._data[0])
self._nrows = np.sum([stp.nrows for stp in self._sigtp])
self._nint = 0
self._smoothref = smoothref
if self._smoothref > 1:
print(f"SubBeamNodScan smoothref={self._smoothref} not implemented yet")
self._apply_flags = apply_flags
self._observer_location = observer_location
self._calibrated = None
if calibrate:
self.calibrate(weights=w)
self._validate_defaults()
[docs]
def calibrate(self, **kwargs):
"""Calibrate the SubBeamNodScan data"""
nspect = len(self._reftp)
self._tsys = np.empty(nspect, dtype=float)
self._exposure = np.empty(nspect, dtype=float)
self._delta_freq = np.empty(nspect, dtype=float)
self._calibrated = np.ma.empty((nspect, self._nchan), dtype=float)
for i in range(nspect):
sig = self._sigtp[i].timeaverage(weights=kwargs["weights"])
ref = self._reftp[i].timeaverage(weights=kwargs["weights"])
# Combine sig and ref.
ta = ((sig - ref) / ref).flux.value * ref.meta["WTTSYS"]
self._tsys[i] = ref.meta["WTTSYS"]
self._exposure[i] = sig.meta["EXPOSURE"]
self._delta_freq[i] = sig.meta["CDELT1"]
self._calibrated[i] = ta
[docs]
def calibrated(self, i):
meta = deepcopy(self._sigtp[i].timeaverage().meta) # use self._sigtp.meta? instead?
meta["TSYS"] = self._tsys[i]
meta["EXPOSURE"] = self._exposure[i]
naxis1 = len(self._calibrated[i])
meta["NAXIS1"] = naxis1
if "CUNIT1" not in meta:
meta["CUNIT1"] = "Hz" # @todo this is in gbtfits.hdu[0].header['TUNIT11'] but is it always TUNIT11?
meta["CUNIT2"] = "deg" # is this always true?
meta["CUNIT3"] = "deg" # is this always true?
restfrq = meta["RESTFREQ"]
rfq = restfrq * u.Unit(meta["CUNIT1"])
restfreq = rfq.to("Hz").value
meta["RESTFRQ"] = restfreq # WCS wants no E
s = Spectrum.make_spectrum(
Masked(self._calibrated[i] * u.K, self._calibrated[i].mask),
meta=meta,
observer_location=self._observer_location,
)
s.merge_commentary(self)
return s
@property
def exposure(self):
return self._exposure
@property
def delta_freq(self):
return self._delta_freq
[docs]
def timeaverage(self, weights="tsys"):
r"""Compute the time-averaged spectrum for this scan.
Parameters
----------
weights: str
'tsys' or None. If 'tsys' the weight will be calculated as:
:math:`w = t_{exp} \times \delta\nu/T_{sys}^2`
Default: 'tsys'
Returns
-------
spectrum : :class:`~spectra.spectrum.Spectrum`
The time-averaged spectrum
.. note::
Data that are masked will have values set to zero. This is a feature of `numpy.ma.average`. Data mask fill value is NaN (np.nan)
"""
if self._calibrated is None or len(self._calibrated) == 0:
raise Exception("You can't time average before calibration.")
if self._npol > 1:
raise Exception(f"Can't yet time average multiple polarizations {self._npol}")
self._timeaveraged = deepcopy(self.calibrated(0))
data = self._calibrated
if weights == "tsys":
w = self.tsys_weight
else:
w = None
self._timeaveraged._data = np.ma.average(data, axis=0, weights=w)
self._timeaveraged._data.set_fill_value(np.nan)
self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys)
self._timeaveraged.meta["WTTSYS"] = sq_weighted_avg(self._tsys, axis=0, weights=w)
self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"]
self._timeaveraged.meta["EXPOSURE"] = np.sum(self.exposure)
return self._timeaveraged