Source code for dysh.spectra.scan

from collections import UserList
from copy import deepcopy

import astropy.units as u
import numpy as np

from ..coordinates import Observatory, make_target, veldef_to_convention
from ..util import uniq
from . import average, find_non_blanks, mean_tsys, sq_weighted_avg, tsys_weight
from .spectrum import Spectrum


[docs] class ScanMixin(object): @property def status(self): """Status flag, will be used later for undo""" return self._status @property def nchan(self): return self._nchan @property def nrows(self): """The number of rows in this Scan""" return self._nrows @property def npol(self): """The number of polarizations in this Scan""" return self._npol
[docs] def nif(self): """The number of IFs in this Scan""" return self._nif
[docs] def nfeed(self): """The number of feeds in this Scan""" return self._nfeed
[docs] def calibrate(self, **kwargs): """Calibrate the Scan data""" pass
[docs] def timeaverage(self, weights=None): r"""Compute the time-averaged spectrum for this scan. Parameters ---------- weights: str 'tsys' or None. If 'tsys' the weight will be calculated as: :math:`w = t_{exp} \times \delta\nu/T_{sys}^2` Default: 'tsys' Returns ------- spectrum : :class:`~spectra.spectrum.Spectrum` The time-averaged spectrum """ pass
[docs] def polaverage(self, weights=None): """Average all polarizations in this Scan""" pass
[docs] def finalspectrum(self, weights=None): """Average all times and polarizations in this Scan""" pass
def __len__(self): return self._nrows
[docs] class ScanBlock(UserList, ScanMixin): def __init__(self, *args): super().__init__(*args) self._status = None self._nrows = 0 self._npol = 0 self._timeaveraged = [] self._polaveraged = [] self._finalspectrum = []
[docs] def calibrate(self, **kwargs): """Calibrate all scans in this ScanBlock""" for scan in self.data: scan.calibrate(**kwargs)
[docs] def timeaverage(self, weights="tsys"): r"""Compute the time-averaged spectrum for all scans in this ScanBlock. Parameters ---------- weights: str 'tsys' or None. If 'tsys' the weight will be calculated as: :math:`w = t_{exp} \times \delta\nu/T_{sys}^2` Default: 'tsys' Returns ------- timeaverage: list of `~spectra.spectrum.Spectrum` List of all the time-averaged spectra """ for scan in self.data: self._timeaveraged.append(scan.timeaverage(weights)) return self._timeaveraged
[docs] def polaverage(self, weights="tsys"): r"""Average all polarizations in all scans in this ScanBlock Parameters ---------- weights: str 'tsys' or None. If 'tsys' the weight will be calculated as: :math:`w = t_{exp} \times \delta\nu/T_{sys}^2` Default: 'tsys' Returns ------- polaverage: list of `~spectra.spectrum.Spectrum` List of all the polarization-averaged spectra """ for scan in self.data: self._polaveraged.append(scan.polaverage(weights)) return self._polaveraged
[docs] def finalspectrum(self, weights="tsys"): r"""Average all times and polarizations in all scans this ScanBlock Parameters ---------- weights: str 'tsys' or None. If 'tsys' the weight will be calculated as: :math:`w = t_{exp} \times \delta\nu/T_{sys}^2` Default: 'tsys' Returns ------- finalspectra: list of `~spectra.spectrum.Spectrum` List of all the time- and polarization-averaged spectra """ for scan in self.data: self._finalspectrum.append(scan.finalspectrum(weights)) return self._finalspectrum
[docs] class TPScan(ScanMixin): """GBT specific version of Total Power Scan Parameters ---------- gbtfits : `~fits.gbtfitsload.GBFITSLoad` input GBFITSLoad object scan: int scan number sigstate : str one of 'SIG' or 'REF' to indicate if this is the signal or reference scan or 'BOTH' if it contains both calstate : str one of 'ON' or 'OFF' to indicate the calibration state of this scan, or 'BOTH' if it contains both scanrows : list-like the list of rows in `sdfits` corresponding to sig_state integrations calrows : dict dictionary containing with keys 'ON' and 'OFF' containing list of rows in `sdfits` corresponding to cal=T (ON) and cal=F (OFF) integrations for `scan` bintable : int the index for BINTABLE in `sdfits` containing the scans calibrate: bool whether or not to calibrate the data. If `True`, the data will be (calon - caloff)*0.5, otherwise it will be SDFITS row data. Default:True """ # @TODO get rid of calrows and calc tsys in gettp and pass it in. def __init__( self, gbtfits, scan, sigstate, calstate, scanrows, calrows, bintable, calibrate=True, observer_location=Observatory["GBT"], ): self._sdfits = gbtfits # parent class self._scan = scan self._sigstate = sigstate # ignored? self._calstate = calstate # ignored? self._scanrows = scanrows # print("BINTABLE = ", bintable) # @TODO deal with data that crosses bintables if bintable is None: self._bintable_index = self._sdfits._find_bintable_and_row(self._scanrows[0])[0] else: self._bintable_index = bintable self._observer_location = observer_location self._data = self._sdfits.rawspectra(self._bintable_index)[scanrows] # all cal states df = self._sdfits._index df = df.iloc[scanrows] self._index = df self._feeds = uniq(df["FDNUM"]) self._pols = uniq(df["PLNUM"]) self._ifs = uniq(df["IFNUM"]) self._status = 0 # @TODO make these an enumeration, possibly dict self._nint = 0 self._npol = len(self._pols) self._nfeed = len(self._feeds) self._nif = len(self._ifs) self._timeaveraged = None self._polaveraged = None self._nrows = len(scanrows) self._tsys = None if False: self._npol = gbtfits.npol(self._bintable_index) # TODO deal with bintable self._nint = gbtfits.nintegrations(self._bintable_index) self._calrows = calrows self._refonrows = self._calrows["ON"] self._refoffrows = self._calrows["OFF"] self._refcalon = gbtfits.rawspectra(self._bintable_index)[self._refonrows] self._refcaloff = gbtfits.rawspectra(self._bintable_index)[self._refoffrows] self._nchan = len(self._refcalon[0]) self._calibrate = calibrate if self._calibrate: self._data = (0.5 * (self._refcalon + self._refcaloff)).astype(float) # print(f"# scanrows {len(self._scanrows)}, # calrows ON {len(self._calrows['ON'])} # calrows OFF {len(self._calrows['OFF'])}") self.calc_tsys() @property def sigstate(self): return self._sigstate @property def calstate(self): return self._calstate @property def tsys(self): """The system temperature array. Returns ------- tsys : `~numpy.ndarray` System temperature values in K """ return self._tsys
[docs] def calc_tsys(self, **kwargs): """ Calculate the system temperature array """ kwargs_opts = {"verbose": False} kwargs_opts.update(kwargs) self._status = 1 tcal = list(self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["TCAL"]) nspect = len(tcal) self._tsys = np.empty(nspect, dtype=float) # should be same as len(calon) # allcal = self._refonrows.copy() # allcal.extend(self._refoffrows) # tcal = list(self._sdfits.index(self._bintable_index).iloc[sorted(allcal)]["TCAL"]) # @Todo this loop could be replaces with clever numpy if len(tcal) != nspect: raise Exception(f"TCAL length {len(tcal)} and number of spectra {nspect} don't match") for i in range(nspect): tsys = mean_tsys(calon=self._refcalon[i], caloff=self._refcaloff[i], tcal=tcal[i]) self._tsys[i] = tsys
@property def exposure(self): """Get the array of exposure (integration) times exposure = 0.5*(exp_ref_on + exp_ref_off) Note we only have access to the refon and refoff row indices so can't use sig here. This is probably incorrect Returns ------- exposure : `~numpy.ndarray` The exposure time in units of the EXPOSURE keyword in the SDFITS header """ exp_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["EXPOSURE"].to_numpy() exp_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["EXPOSURE"].to_numpy() exposure = exp_ref_on + exp_ref_off return exposure @property def delta_freq(self): """Get the array of channel frequency width df = 0.5*(df_ref_on + df_ref_off) Note we only have access to the refon and refoff row indices so can't use sig here. This is probably incorrect Returns ------- delta_freq: `~numpy.ndarray` The channel frequency width in units of the CDELT1 keyword in the SDFITS header """ df_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["CDELT1"].to_numpy() df_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["CDELT1"].to_numpy() delta_freq = 0.5 * (df_ref_on + df_ref_off) return delta_freq @property def _tsys_weight(self): r"""The system temperature weighting array computed from current :math:`T_{sys}, t_{exp}`, and `\delta\nu`. See :meth:`tsys_weight` """ return tsys_weight(self.exposure, self.delta_freq, self.tsys)
[docs] def tpmeta(self, i): ser = self._sdfits.index(bintable=self._bintable_index).iloc[self._scanrows[i]] # meta = self._sdfits.index(bintable=self._bintable_index).iloc[self._scanrows[i]].dropna().to_dict() meta = ser.dropna().to_dict() meta["TSYS"] = self._tsys[i] meta["EXPOSURE"] = self.exposure[i] meta["NAXIS1"] = len(self._data[i]) if "CUNIT1" not in meta: meta["CUNIT1"] = "Hz" # @TODO this is in gbtfits.hdu[0].header['TUNIT11'] but is it always TUNIT11? meta["CUNIT2"] = "deg" # is this always true? meta["CUNIT3"] = "deg" # is this always true? restfrq = meta["RESTFREQ"] rfq = restfrq * u.Unit(meta["CUNIT1"]) restfreq = rfq.to("Hz").value meta["RESTFRQ"] = restfreq # WCS wants no E return meta
[docs] def total_power(self, i): """Return the total power spectrum Parameters ---------- i : int The index into the data array Returns ------- spectrum : `~spectra.spectrum.Spectrum` """ # print(len(self._scanrows), i) ser = self._sdfits.index(bintable=self._bintable_index).iloc[self._scanrows[i]] # meta = self._sdfits.index(bintable=self._bintable_index).iloc[self._scanrows[i]].dropna().to_dict() meta = ser.dropna().to_dict() meta["TSYS"] = self._tsys[i] meta["EXPOSURE"] = self.exposure[i] meta["NAXIS1"] = len(self._data[i]) if "CUNIT1" not in meta: meta["CUNIT1"] = "Hz" # @TODO this is in gbtfits.hdu[0].header['TUNIT11'] but is it always TUNIT11? meta["CUNIT2"] = "deg" # is this always true? meta["CUNIT3"] = "deg" # is this always true? restfrq = meta["RESTFREQ"] rfq = restfrq * u.Unit(meta["CUNIT1"]) restfreq = rfq.to("Hz").value meta["RESTFRQ"] = restfreq # WCS wants no E return Spectrum.make_spectrum(self._data[i] * u.ct, meta, observer_location=self._observer_location)
[docs] def timeaverage(self, weights="tsys"): r"""Compute the time-averaged spectrum for this set of scans. Parameters ---------- weights: str 'tsys' or None. If 'tsys' the weight will be calculated as: :math:`w = t_{exp} \times \delta\nu/T_{sys}^2` Default: 'tsys' Returns ------- spectrum : :class:`~spectra.spectrum.Spectrum` The time-averaged spectrum """ if self._npol > 1: raise Exception("Can't yet time average multiple polarizations") if self._nif > 1: raise Exception("Can't yet time average multiple IFs") if self._nfeed > 1: raise Exception("Can't yet time average multiple feeds") self._timeaveraged = deepcopy(self.total_power(0)) if weights == "tsys": w = self._tsys_weight else: w = np.ones_like(self._tsys_weight) non_blanks = find_non_blanks(self._data) self._timeaveraged._data = average(self._data, axis=0, weights=w) self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys[non_blanks]) self._timeaveraged.meta["WTTSYS"] = sq_weighted_avg(self._tsys[non_blanks], axis=0, weights=w[non_blanks]) self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"] self._timeaveraged.meta["EXPOSURE"] = self.exposure[non_blanks].sum() return self._timeaveraged
[docs] class PSScan(ScanMixin): """GBT specific version of Position Switch Scan Parameters ---------- gbtfits : `~fit.gbtfitsload.GBFITSLoad` input GBFITSLoad object scans : dict dictionary with keys 'ON' and 'OFF' containing unique list of ON (signal) and OFF (reference) scan numbers scanrows : dict dictionary with keys 'ON' and 'OFF' containing the list of rows in `sdfits` corresponding to ON (signal) and OFF (reference) integrations calrows : dict dictionary containing with keys 'ON' and 'OFF' containing list of rows in `sdfits` corresponding to cal=T (ON) and cal=F (OFF) integrations. bintable : int the index for BINTABLE in `sdfits` containing the scans calibrate: bool whether or not to calibrate the data. If true, data will be calibrated as TSYS*(ON-OFF)/OFF. Default: True """ def __init__( self, gbtfits, scans, scanrows, calrows, bintable, calibrate=True, observer_location=Observatory["GBT"] ): # The rows of the original bintable corresponding to ON (sig) and OFF (reg) self._sdfits = gbtfits # parent class self._scans = scans self._scanrows = scanrows self._nrows = len(self._scanrows["ON"]) # print(f"len(scanrows ON) {len(self._scanrows['ON'])}") # print(f"len(scanrows OFF) {len(self._scanrows['OFF'])}") # calrows perhaps not needed as input since we can get it from gbtfits object? # calrows['ON'] are rows with noise diode was on, regardless of sig or ref # calrows['OFF'] are rows with noise diode was off, regardless of sig or ref self._calrows = calrows # print("BINTABLE = ", bintable) # @TODO deal with data that crosses bintables if bintable is None: self._bintable_index = gbtfits._find_bintable_and_row(self._scanrows["ON"][0])[0] else: self._bintable_index = bintable self._observer_location = observer_location df = self._sdfits._index df = df.iloc[scanrows["ON"]] self._feeds = uniq(df["FDNUM"]) self._pols = uniq(df["PLNUM"]) self._ifs = uniq(df["IFNUM"]) self._npol = len(self._pols) self._nfeed = len(self._feeds) self._nif = len(self._ifs) if False: self._nint = gbtfits.nintegrations(self._bintable_index) # todo use gbtfits.velocity_convention(veldef,velframe) # so quick with slicing! self._sigonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._scanrows["ON"])))) self._sigoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._scanrows["ON"])))) self._refonrows = sorted(list(set(self._calrows["ON"]).intersection(set(self._scanrows["OFF"])))) self._refoffrows = sorted(list(set(self._calrows["OFF"]).intersection(set(self._scanrows["OFF"])))) self._sigcalon = gbtfits.rawspectra(self._bintable_index)[self._sigonrows] self._nchan = len(self._sigcalon[0]) self._sigcaloff = gbtfits.rawspectra(self._bintable_index)[self._sigoffrows] self._refcalon = gbtfits.rawspectra(self._bintable_index)[self._refonrows] self._refcaloff = gbtfits.rawspectra(self._bintable_index)[self._refoffrows] self._tsys = None self._exposure = None self._calibrated = None self._calibrate = calibrate if self._calibrate: self.calibrate() @property def tsys(self): """The system temperature array. This will be `None` until calibration is done. Returns ------- tsys : `~numpy.ndarray` System temperature values in K """ return self._tsys # TODO something clever # self._calibrated_spectrum = Spectrum(self._calibrated,...) [assuming same spectral axis]
[docs] def calibrated(self, i): """Return the calibrated Spectrum. Parameters ---------- i : int The index into the calibrated array Returns ------- spectrum : `~spectra.spectrum.Spectrum` """ meta = self._sdfits.index(bintable=self._bintable_index).iloc[self._scanrows["ON"][i]].dropna().to_dict() meta["TSYS"] = self._tsys[i] meta["EXPOSURE"] = self._exposure[i] meta["NAXIS1"] = len(self._calibrated[i]) meta["TSYS"] = self._tsys[i] meta["EXPOSURE"] = self.exposure[i] if "CUNIT1" not in meta: meta["CUNIT1"] = "Hz" # @TODO this is in gbtfits.hdu[0].header['TUNIT11'] but is it always TUNIT11? meta["CUNIT2"] = "deg" # is this always true? meta["CUNIT3"] = "deg" # is this always true? restfrq = meta["RESTFREQ"] rfq = restfrq * u.Unit(meta["CUNIT1"]) restfreq = rfq.to("Hz").value meta["RESTFRQ"] = restfreq # WCS wants no E return Spectrum.make_spectrum(self._calibrated[i] * u.K, meta=meta, observer_location=self._observer_location)
[docs] def calibrate(self, **kwargs): """ Position switch calibration, following equations 1 and 2 in the GBTIDL calibration manual """ kwargs_opts = {"verbose": False} kwargs_opts.update(kwargs) self._status = 1 nspect = self.nrows // 2 self._calibrated = np.empty((nspect, self._nchan), dtype="d") self._tsys = np.empty(nspect, dtype="d") self._exposure = np.empty(nspect, dtype="d") tcal = list(self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["TCAL"]) # @Todo this loop could be replaced with clever numpy if len(tcal) != nspect: raise Exception(f"TCAL length {len(tcal)} and number of spectra {nspect} don't match") for i in range(nspect): tsys = mean_tsys(calon=self._refcalon[i], caloff=self._refcaloff[i], tcal=tcal[i]) sig = 0.5 * (self._sigcalon[i] + self._sigcaloff[i]) ref = 0.5 * (self._refcalon[i] + self._refcaloff[i]) self._calibrated[i] = tsys * (sig - ref) / ref self._tsys[i] = tsys self._exposure[i] = self.exposure[i]
# tip o' the hat to Pedro S. for exposure and delta_freq @property def exposure(self): """Get the array of exposure (integration) times exposure = [ 0.5*(exp_ref_on + exp_ref_off) + 0.5*(exp_sig_on + exp_sig_off) ] / 2 Returns ------- exposure : ~numpy.ndarray The exposure time in units of the EXPOSURE keyword in the SDFITS header """ exp_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["EXPOSURE"].to_numpy() exp_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["EXPOSURE"].to_numpy() exp_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["EXPOSURE"].to_numpy() exp_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["EXPOSURE"].to_numpy() exp_ref = exp_ref_on + exp_ref_off exp_sig = exp_sig_on + exp_sig_off # exposure = 0.5*(exp_ref + exp_sig) # exposure = exp_ref + exp_sig nsmooth = 1.0 # In case we start smoothing the reference spectra. exposure = exp_sig * exp_ref * nsmooth / (exp_sig + exp_ref * nsmooth) return exposure @property def delta_freq(self): """Get the array of channel frequency width df = [ 0.5*(df_ref_on + df_ref_off) + 0.5*(df_sig_on + df_sig_off) ] / 2 Returns ------- delta_freq: ~numpy.ndarray The channel frequency width in units of the CDELT1 keyword in the SDFITS header """ df_ref_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._refonrows]["CDELT1"].to_numpy() df_ref_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._refoffrows]["CDELT1"].to_numpy() df_sig_on = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigonrows]["CDELT1"].to_numpy() df_sig_off = self._sdfits.index(bintable=self._bintable_index).iloc[self._sigoffrows]["CDELT1"].to_numpy() df_ref = 0.5 * (df_ref_on + df_ref_off) df_sig = 0.5 * (df_sig_on + df_sig_off) delta_freq = 0.5 * (df_ref + df_sig) return delta_freq @property def _tsys_weight(self): r"""The system temperature weighting array computed from current :math`T_{sys}`, :math:`t_{int}`, and :math:`\delta\nu`. See :meth:`tsys_weight` """ return tsys_weight(self.exposure, self.delta_freq, self.tsys)
[docs] def timeaverage(self, weights="tsys"): r"""Compute the time-averaged spectrum for this set of scans. Parameters ---------- weights: str 'tsys' or None. If 'tsys' the weight will be calculated as: :math:`w = t_{exp} \times \delta\nu/T_{sys}^2` Default: 'tsys' Returns ------- spectrum : :class:`~spectra.spectrum.Spectrum` The time-averaged spectrum """ if self._calibrated is None or len(self._calibrated) == 0: raise Exception("You can't time average before calibration.") if self._npol > 1: raise Exception("Can't yet time average multiple polarizations") if self._nif > 1: raise Exception("Can't yet time average multiple IFs") if self._nfeed > 1: raise Exception("Can't yet time average multiple feeds") self._timeaveraged = deepcopy(self.calibrated(0)) data = self._calibrated if weights == "tsys": w = self._tsys_weight else: w = np.ones_like(self._tsys_weight) self._timeaveraged._data = average(data, axis=0, weights=w) non_blanks = find_non_blanks(data) self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys[non_blanks]) self._timeaveraged.meta["WTTSYS"] = sq_weighted_avg(self._tsys[non_blanks], axis=0, weights=w[non_blanks]) self._timeaveraged.meta["EXPOSURE"] = np.sum(self._exposure[non_blanks]) self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"] return self._timeaveraged
[docs] class SubBeamNodScan(ScanMixin): # SBNodScan? r""" Parameters ---------- sigtp: list of ~spectra.scan.TPScan Signal total power scans reftp: list ~spectra.scan.TPScan Reference total power scans fulltp: ~spectra.scan.TPScan A full (sig+ref) total power scans, used only for method='scan' method: str Method to use when processing. One of 'cycle' or 'scan'. 'cycle' is more accurate and averages data in each SUBREF_STATE cycle. 'scan' reproduces GBTIDL's snodka function which has been shown to be less accurate. Default:'cycle' calibrate: bool Whether or not to calibrate the data. weights: str Weighting scheme to use when averaging the signal and reference scans 'tsys' or None. If 'tsys' the weight will be calculated as: :math:`w = t_{exp} \times \delta\nu/T_{sys}^2` Default: 'tsys' """ def __init__( self, sigtp, reftp, fulltp=None, method="cycle", calibrate=True, observer_location=Observatory["GBT"], **kwargs ): kwargs_opts = { "timeaverage": False, "weights": "tsys", # or None or ndarray "debug": False, } kwargs_opts.update(kwargs) w = kwargs_opts["weights"] if len(reftp) != len(sigtp): raise ValueError( f"Reference and signal total power arrays are different lengths: {len(reftp)} != {len(sigtp)}" ) self._sigtp = sigtp self._reftp = reftp self._fulltp = fulltp self._nchan = len(reftp[0]._data[0]) self._npol = 1 self._nint = 0 self._nif = 1 self._nfeed = 1 self._method = method.lower() if self._method not in ["cycle", "scan"]: raise ValueError(f"Method {self._method} unrecognized. Must be one of 'cycle' or 'scan'") self._observer_location = observer_location self._calibrated = None if calibrate: self.calibrate(weights=w)
[docs] def calibrate(self, **kwargs): """Calibrate the Scan data""" nspect = len(self._reftp) self._tsys = np.empty(nspect, dtype=float) self._exposure = np.empty(nspect, dtype=float) self._delta_freq = np.empty(nspect, dtype=float) self._calibrated = np.empty((nspect, self._nchan), dtype=float) if self._method == "cycle": for i in range(nspect): ref_avg = self._reftp[i].timeaverage(weights=kwargs["weights"]) sig_avg = self._sigtp[i].timeaverage(weights=kwargs["weights"]) # Combine sig and ref. ta = ((sig_avg - ref_avg) / ref_avg).flux.value * ref_avg.meta["WTTSYS"] self._tsys[i] = ref_avg.meta["WTTSYS"] self._exposure[i] = sig_avg.meta["EXPOSURE"] self._delta_freq[i] = sig_avg.meta["CDELT1"] self._calibrated[i] = ta elif self._method == "scan": # Process the whole scan as a single block. # This is less accurate, but might be needed if # the scan was aborted and there are not enough # sig/ref cycles to do a per cycle calibration. for i in range(len(self._reftp)): on = self._sigtp[i].timeaverage(weights=kwargs["weights"]).data off = self._reftp[i].timeaverage(weights=kwargs["weights"]).data fulltpavg = self._fulltp[i].timeaverage(weights=kwargs["weights"]) tsys = fulltpavg.meta["TSYS"] # data is a Spectrum. not consistent with other Scan classes # where _calibrated is a numpy array. data = tsys * (on - off) / off # data.meta["MEANTSYS"] = 0.5 * np.mean((on.meta["TSYS"] + off.meta["TSYS"])) # data.meta["WTTSYS"] = tsys # data.meta["TSYS"] = data.meta["WTTSYS"] # self._tsys.append(ref_avg.meta["WTTSYS"]) self._calibrated[i] = data else: raise ValueError(f"Method {self._method} unrecognized. Must be one of 'cycle' or 'scan'")
[docs] def calibrated(self, i): meta = deepcopy(self._sigtp[i].timeaverage().meta) naxis1 = len(self._calibrated[i]) meta["TSYS"] = self._tsys[i] meta["EXPOSURE"] = self._exposure[i] meta["NAXIS1"] = len(self._calibrated[i]) if "CUNIT1" not in meta: meta["CUNIT1"] = "Hz" # @TODO this is in gbtfits.hdu[0].header['TUNIT11'] but is it always TUNIT11? meta["CUNIT2"] = "deg" # is this always true? meta["CUNIT3"] = "deg" # is this always true? restfrq = meta["RESTFREQ"] rfq = restfrq * u.Unit(meta["CUNIT1"]) restfreq = rfq.to("Hz").value meta["RESTFRQ"] = restfreq # WCS wants no E return Spectrum.make_spectrum(self._calibrated[i] * u.K, meta, observer_location=self._observer_location)
@property def exposure(self): return self._exposure @property def delta_freq(self): return self._delta_freq @property def tsys(self): return self._tsys @property def _tsys_weight(self): r"""The system temperature weighting array computed from current :math:`T_{sys}, t_{exp}`, and `\delta\nu`. See :meth:`tsys_weight` """ return tsys_weight(self.exposure, self.delta_freq, self.tsys)
[docs] def timeaverage(self, weights="tsys"): if self._calibrated is None or len(self._calibrated) == 0: raise Exception("You can't time average before calibration.") if self._npol > 1: raise Exception("Can't yet time average multiple polarizations") if self._nif > 1: raise Exception("Can't yet time average multiple IFs") if self._nfeed > 1: raise Exception("Can't yet time average multiple feeds") self._timeaveraged = deepcopy(self.calibrated(0)) data = self._calibrated nchan = len(data[0]) if weights == "tsys": w = self._tsys_weight else: w = None if self._method == "scan": w = None # don't double tsys weight(?) self._timeaveraged = deepcopy(self.calibrated(0)) self._timeaveraged._data = average(data, axis=0, weights=w) self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys) # data.meta["MEANTSYS"] = 0.5 * np.mean((on.meta["TSYS"] + off.meta["TSYS"])) self._timeaveraged.meta["WTTSYS"] = self._timeaveraged.meta["WTTSYS"] self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"] self._timeaveraged.meta["EXPOSURE"] = np.sum(self.exposure) if self._method == "cycle": # or weights = "gbtidl" # GBTIDL method of weighting subbeamnod data ta_avg = np.zeros(nchan, dtype="d") wt_avg = 0.0 # A single value for now, but it should be an array once we implement vector TSYS. tsys_wt = 0.0 tsys_avg = 0.0 for i in range(len(data)): wt_avg += self.tsys[i] ** -2.0 tsys_wt_ = tsys_weight(self.exposure[i], self.delta_freq[i], self.tsys[i]) tsys_wt += tsys_wt_ ta_avg[:] += data[i] * self.tsys[i] ** -2.0 wt1 = self.tsys**-2.0 wt2 = tsys_weight(self.exposure, self.delta_freq, self.tsys) ta_avg /= wt_avg tsys_avg /= tsys_wt self._timeaveraged._data = ta_avg self._timeaveraged.meta["MEANTSYS"] = np.mean(self._tsys) self._timeaveraged.meta["WTTSYS"] = tsys_avg # sq_weighted_avg(self._tsys, axis=0, weights=w) self._timeaveraged.meta["TSYS"] = self._timeaveraged.meta["WTTSYS"] self._timeaveraged.meta["EXPOSURE"] = np.sum(self.exposure) return self._timeaveraged