"""Load generic SDFITS files
- Not typically used directly. Sub-class for specific telescope SDFITS flavors.
"""
import warnings
from collections.abc import Sequence
from enum import Enum
from typing import TYPE_CHECKING
import astropy.units as u
import numpy as np
import pandas as pd
from astropy.io import fits
from astropy.io.fits import BinTableHDU, Column
from astropy.table import Table
from astropy.utils.masked import Masked
if TYPE_CHECKING:
from numpy.typing import ArrayLike
from ..log import logger
from ..spectra.spectrum import Spectrum
from ..util import select_from, uniq
from ..util.timers import Benchmark
# Apply monkey patch for fitsio Unicode handling (must be before fitsio usage)
from . import fitsio_unicode_patch, index_file # noqa: F401
from .lazyflag import LazyFlagArray, LazyFlagContainer
try:
import fitsio
HAS_FITSIO = True
except ImportError:
HAS_FITSIO = False
# Memory logging utility
def _mem_gb():
"""Return current process memory usage in GB"""
import os
import psutil
return psutil.Process(os.getpid()).memory_info().rss / (1024**3)
def _log_mem(msg):
"""Log memory usage with a message"""
pass
# logger.info(f"[MEM {_mem_gb():.2f} GB] {msg}")
[docs]
class FITSBackend(Enum):
"""Backend options for reading FITS data."""
ASTROPY = "astropy"
FITSIO = "fitsio"
[docs]
class SDFITSLoad:
"""
Generic Container for a bintable(s) from selected HDU(s) for a single SDFITS file.
For multiple SDFITS files, see :class:`~gbtfitsload.GBTFITSLoad`.
Parameters
----------
filename : str
input file name
source : str
target source to select from input file. Default: all sources
hdu : int or list
Header Data Unit to select from input file. Default: all HDUs
"""
def __init__(self, filename: str, source: str | None = None, hdu: int | list[int] | None = None, **kwargs):
kwargs_opts = {
"fix": False, # fix non-standard header elements
"verbose": False,
"index_file_threshold": 0,
"fitsbackend": None, # choose a default FITSBackend
}
kwargs_opts.update(kwargs)
fb = kwargs.get("fitsbackend")
if fb is None:
self._fits_backend = None
else:
self._fits_backend = FITSBackend(kwargs.get("fitsbackend"))
if kwargs_opts["verbose"]:
print(f"==SDFITSLoad {filename}")
# We cannot use this to get mmHg as it will disable all default astropy units!
# https://docs.astropy.org/en/stable/api/astropy.units.cds.enable.html#astropy.units.cds.enable
# u.cds.enable() # to get mmHg
self._filename = filename
self._bintable = []
self._index = None
self._index_source = None # Track whether index was loaded from "index_file" or "fits"
self._index_file_threshold = kwargs_opts.get("index_file_threshold", 100 * 1024 * 1024)
self._binheader = []
_log_mem(f"SDFITSLoad before fits.open({filename})")
self._hdu = fits.open(filename)
_log_mem(f"SDFITSLoad after fits.open({filename})")
self._header = self._hdu[0].header
self.load(hdu, **kwargs_opts)
_log_mem("SDFITSLoad after load()")
doindex = kwargs_opts.get("index", True)
doflag = kwargs_opts.get("flags", True) # for testing
if doindex:
self.create_index()
_log_mem("SDFITSLoad after create_index()")
# add default channel masks
# These are numpy masks where False is not flagged, True is flagged.
# There is one 2-D flag mask arraywith shape NROWSxNCHANNELS per bintable
self._flagmask = None
if doflag:
self._init_flags()
_log_mem("SDFITSLoad after _init_flags()")
def __del__(self):
# We need to ensure that any open HDUs are properly
# closed in order to avoid the ResourceWarning about
# unclosed file(s)
try:
self._hdu.close()
except Exception:
pass
def _init_flags(self):
"""Initialize the channel masks using lazy (sparse) flag arrays.
Uses :class:`~lazyflag.LazyFlagArray` to avoid allocating dense
``(nrows, nchan)`` boolean arrays upfront. For large files this
reduces memory from tens of GB to a few MB.
"""
self._flagmask = LazyFlagContainer(len(self._bintable))
self._additional_channel_mask = LazyFlagContainer(len(self._bintable))
# Check original file on disk for FLAGS column (not the in-memory
# bintable, which may have been mutated by _update_column).
if not hasattr(self, "_file_has_flags"):
self._file_has_flags = {}
if HAS_FITSIO:
with fitsio.FITS(self._filename) as f:
for i in range(len(self._bintable)):
hdu_index = i + 1
if hdu_index < len(f):
col_names = [c.upper() for c in f[hdu_index].get_colnames()]
self._file_has_flags[i] = "FLAGS" in col_names
else:
with fits.open(self._filename, memmap=True) as hdul:
for i in range(len(self._bintable)):
hdu_index = i + 1
if hdu_index < len(hdul):
col_names = [c.upper() for c in hdul[hdu_index].columns.names]
self._file_has_flags[i] = "FLAGS" in col_names
for i in range(len(self._flagmask)):
nc = self.nchan(i)
nr = self.nrows(i)
has_flags = self._file_has_flags.get(i, False)
logger.debug(f"_init_flags bintable {i}: {nr=} {nc=} has_flags={has_flags}")
self._flagmask[i] = LazyFlagArray(
nrows=nr,
nchan=nc,
filename=self._filename,
hdu_index=i + 1,
has_flags_column=has_flags,
)
self._additional_channel_mask[i] = LazyFlagArray(nrows=nr, nchan=nc)
[docs]
def info(self):
"""Return the `~astropy.HDUList` info()"""
return self._hdu.info()
@property
def columns(self):
"""The column names in the binary table, minus the DATA column
Returns
-------
~pandas.Index
The column names as a DataFrame Index
"""
# return a list instead?
return self._index.columns
@property
def bintable(self):
"""The list of bintables"""
return self._bintable
@property
def binheader(self):
"""The list of bintable headers"""
return self._binheader
@property
def filename(self):
"""The input SDFITS filename"""
return self._filename
@property
def total_rows(self):
"""Returns the total number of rows summed over all binary table HDUs"""
return sum(self._nrows)
[docs]
def index(self, hdu=None, bintable=None):
"""
Return The index table.
Parameters
----------
hdu : int or list
Header Data Unit to select from the index. Default: all HDUs
bintable : int
The index of the `bintable` attribute, None means all bintables
Returns
-------
index : ~pandas.DataFrame
The index of this SDFITS file
"""
df = self._index
if hdu is None and bintable is None:
return df
if hdu is not None:
df = df[df["HDU"] == hdu]
if bintable is not None:
df = df[df["BINTABLE"] == bintable]
return df
[docs]
def create_index(self, hdu: int | list[int] | None = None, skipindex=("DATA", "FLAGS"), force_fits: bool = False):
"""
Create the index of the SDFITS file.
This method will first attempt to load from a .index file (fast path).
If no .index file is found, it falls back to reading from the FITS file.
Parameters
----------
hdu : int or list
Header Data Unit to select from input file. Default: all HDUs
skipindex : list
List of str column names to not put in the index. The default are
the multidimensional columns DATA and FLAGS
force_fits : bool
If True, skip .index file and always load from FITS. Default: False.
Used internally for lazy loading when .index columns are insufficient.
"""
# Check FITS file size to decide whether to use .index file
import os
fits_size = os.path.getsize(self._filename)
fits_size_mb = fits_size / (1024 * 1024)
threshold_mb = self._index_file_threshold / (1024 * 1024)
if not HAS_FITSIO:
force_fits = True
# Determine whether to use .index file based on size threshold
use_index_file = fits_size >= self._index_file_threshold and not force_fits
if use_index_file:
logger.debug(
f"FITS file size: {fits_size_mb:.1f} MB >= {threshold_mb:.1f} MB threshold. "
f"Will attempt to use .index file for faster loading."
)
elif force_fits:
logger.debug(f"force_fits=True: Loading directly from FITS file ({fits_size_mb:.1f} MB)")
else:
logger.debug(
f"FITS file size: {fits_size_mb:.1f} MB < {threshold_mb:.1f} MB threshold. "
f"Loading directly from FITS file (no .index file needed for small files)."
)
# Try to load from .index file first (fast path for large files)
index_path = index_file.get_index_path(self._filename)
if index_path.exists() and use_index_file:
try:
logger.debug(f"Loading index from .index file: {index_path}")
with Benchmark(" parse .index file", logger=logger.debug):
self._index = index_file.parse_sdfits_index_file(index_path)
with Benchmark(" reconstruct BINTABLE column", logger=logger.debug):
# Reconstruct BINTABLE column (= HDU - 1)
if "HDU" in self._index.columns:
self._index["BINTABLE"] = self._index["HDU"] - 1
else:
logger.warning(".index file missing HDU column - cannot reconstruct BINTABLE")
with Benchmark(" adding primary HDU", logger=logger.debug):
self._add_primary_hdu()
# Log info about lazy loading
logger.debug(
"Index loaded from .index file (44/93 columns). "
"Missing columns (TCAL, WCS, calibration metadata, etc.) will be automatically loaded "
"from FITS file when first accessed."
)
logger.debug(f" Loaded {len(self._index)} rows, {len(self._index.columns)} columns from .index file")
self._index_source = "index_file"
return
except Exception as e:
logger.warning(f"Failed to load .index file: {e}. Falling back to FITS file.")
# Fall through to FITS reading
# Fall back to reading from FITS file
if hdu is not None:
ldu = list([hdu])
else:
ldu = range(1, len(self._hdu))
self._index = None
for i in ldu:
# Create a DataFrame without the data column.
# Use fitsio to read only the columns we need, avoiding loading DATA/FLAGS into memory
all_columns = self._hdu[i].columns.names
columns_to_read = [col for col in all_columns if col not in skipindex]
logger.debug(f" Reading {len(columns_to_read)} columns from HDU {i} via fitsio...")
if HAS_FITSIO:
with Benchmark(f" fitsio.read ({len(columns_to_read)} cols)", logger=logger.debug):
with fitsio.FITS(self._filename) as fits_file:
data = fits_file[i].read(columns=columns_to_read)
logger.debug(f" fitsio returned {len(data)} rows")
with Benchmark(" pd.DataFrame", logger=logger.debug):
df = pd.DataFrame(data)
else:
df = pd.DataFrame(np.lib.recfunctions.drop_fields(self._hdu[i].data, skipindex))
# Select columns that are strings, decode them and remove white spaces.
# The fitsio_unicode_patch handles latin-1 decoding automatically
# Include "string" to avoid pandas deprecation warning.
# See https://pandas.pydata.org/docs/user_guide/migration-3-strings.html#string-migration-select-dtypes
df_obj = df.select_dtypes(["object", "string"])
with Benchmark(f" string cleanup ({len(df_obj.columns)} cols)", logger=logger.debug):
# df[df_obj.columns] = df_obj.apply(lambda x: x.str.decode("utf-8").str.strip())
for col in df_obj.columns:
# FITS strings are NULL-padded, so truncate at NULL byte first
# Then remove any remaining control characters and strip whitespace
# If fitsio was imported, then the fitsio_unicode_patch was applied.
# If not we have to work around.
if HAS_FITSIO:
df[col] = df[col].str.split("\x00").str[0].str.strip()
else:
df[col] = df[col].str.decode("latin-1", errors="replace")
df[col] = df[col].str.split("\x00").str[0]
df[col] = df[col].str.strip()
with Benchmark(" add columns + concat", logger=logger.debug):
ones = np.ones(len(df.index), dtype=int)
# create columns to track HDU and BINTABLE numbers and original row index
df["HDU"] = i * ones
df["BINTABLE"] = (i - 1) * ones
df["ROW"] = np.arange(len(df))
if self._index is None:
self._index = df
else:
self._index = pd.concat([self._index, df], axis=0, ignore_index=True)
self._add_primary_hdu()
self._index_source = "fits"
def _add_primary_hdu(self):
"""
Add the columns to the index for header keywords that are not in primary header or not in the DATA column.
This will get handy things like SITELONG, SITELAT, TELESCOP, etc.
Returns
-------
None.
"""
# T* are in the binary table header
# NAXIS* have a different meaning in the primary hdu, we want the bintable values
# BITPIX, GCOUNT,PCOUNT,XTENSION are FITS reserved keywords
ignore = [
"TUNIT",
"TTYPE",
"TFORM",
"TFIELDS",
"NAXIS",
"COMMENT",
"GCOUNT",
"PCOUNT",
"XTENSION",
"BITPIX",
]
cols = {}
for h in self._hdu:
c = dict(filter(lambda item: not any(sub in item[0] for sub in ignore), h.header.items()))
cols.update(c)
logger.debug(f"_add_primary_hdu: ADDING COLUMNS {cols}")
for k, v in cols.items():
if k not in self._index.columns:
self._index[k] = v
elif self._index[k].iloc[0] != v:
warnings.warn(
f"Column {k} is defined in the primary header and in the binary table index, but their values do"
" not match. Will not update this column in the index.",
UserWarning,
stacklevel=2,
)
[docs]
def load(self, hdu=None, **kwargs):
"""
Load the bintable for given hdu.
Note mmHg and UTC are unrecognized units. mmHg is in astropy.units.cds but UTC is just wrong.
Parameters
----------
hdu : int or list
Header Data Unit to select from input file. Default: all HDUs
"""
self._bintable = []
self._binheader = []
self._nrows = []
self._ncols = []
if hdu is not None:
ldu = list([hdu])
else:
ldu = range(1, len(self._hdu))
for i in ldu:
j = i - 1
self._bintable.append(self._hdu[i])
self._binheader.append(self._hdu[i].header)
self._nrows.append(self._binheader[j]["NAXIS2"])
self._ncols.append(self._binheader[j]["TFIELDS"])
logger.debug(f"Loading HDU {i} from {self._filename}")
[docs]
def velocity_convention(self, veldef, velframe):
"""
Compute the velocity convention string use for velocity conversions,
given the VELDEF and VELFRAME values.
Return value must be a recognized string of `~specutils.Spectrum1D`, one of
{"doppler_relativistic", "doppler_optical", "doppler_radio"}
Sub-classes should implement, because different observatories use VELDEF and
VELFRAME inconsistently. This base class method hard-coded to return "doppler_radio."
Parameters
----------
veldef : str
The velocity definition string (`VELDEF` FITS keyword)
velframe : str
The velocity frame string (`VELFRAME` FITS keyword)
"""
return "doppler_radio"
[docs]
def udata(self, key, bintable=None):
"""
The unique list of values of a given header keyword
Parameters
----------
key : str
The keyword to retrieve
bintable : int
The index of the `bintable` attribute, None means all bintables
Returns
-------
udata : list
The unique set of values for the input keyword.
"""
if self._index is None:
raise ValueError("Can't retrieve keyword {key} because no index is present.")
if bintable is not None:
df = self._index[self._index["BINTABLE"] == bintable]
else:
df = self._index
return uniq(df[key])
[docs]
def ushow(self, key, bintable=None):
"""
Print the unique list of values of a given header keyword
Parameters
----------
key : str
The keyword to retrieve
bintable : int
The index of the `bintable` attribute, None means all bintables
"""
print(f"BINTABLE: {bintable} {key}: {self.udata(key, bintable)}")
[docs]
def naxis(self, naxis, bintable):
"""
The NAXISn value of the input bintable.
Parameters
----------
naxis : int
The NAXIS whose length is requested
bintable : int
The index of the `bintable` attribute
Returns
-------
naxis : the length of the NAXIS
"""
nax = f"NAXIS{naxis}"
return self._binheader[bintable][nax]
[docs]
def nintegrations(self, bintable, source=None):
"""
The number of integrations on a given source divided by the number of polarizations
Parameters
----------
bintable : int
The index of the `bintable` attribute
source: str
The source name (OBJECT keyword) or None for all sources. Default: None
Returns
-------
nintegrations : the number of integrations
"""
if source is not None:
df = select_from("OBJECT", source, self._index[bintable])
# nfeed = df["FEED"].nunique()
numsources = len(df)
# nint = numsources//(self.npol(bintable)*nfeed)
nint = numsources // self.npol(bintable)
else:
nint = self.nrows(bintable) // self.npol(bintable)
return nint
def _find_bintable_and_row(self, row):
"""Given a row number from a multi-bintable spanning index, return
the bintable and original row number
Parameters
----------
row : int
The record (row) index to retrieve
Returns:
tuple of ints (bintable, row)
"""
return (self._index.iloc[row]["BINTABLE"], self._index.iloc[row]["ROW"])
[docs]
def rawspectra(
self,
bintable: int,
setmask: bool = False,
rows: "ArrayLike | None" = None,
fits_backend: FITSBackend | None = None,
) -> np.ma.MaskedArray:
"""
Get the raw (unprocessed) spectra from the input bintable.
Parameters
----------
bintable : int
The index of the `bintable` attribute
setmask : bool
If True, set the data mask according to the current flags in the `_flagmask` attribute.
If False, set the data mask to False.
rows : array-like or None
If provided, load only these rows. If None, load all rows.
fits_backend : FITSBackend or None
Backend to use for reading data. Options:
- None (default): auto-select (fitsio when rows specified, astropy otherwise)
- FITSBackend.ASTROPY: force astropy (memory-mapped, efficient for full loads)
- FITSBackend.FITSIO: force fitsio (efficient for selective row loading)
Returns
-------
rawspectra : ~numpy.ma.MaskedArray
The DATA column of the input bintable, masked according to `setmask`
"""
if self._fits_backend is not None: # performance testing only!
fits_backend = self._fits_backend
# Auto-select backend if not specified
if fits_backend is None:
# Check if data is already loaded in memory via astropy
data_is_cached = getattr(self._bintable[bintable], "_data_loaded", False)
if data_is_cached:
# Data is in memory - use astropy to slice cached data (no disk I/O)
fits_backend = FITSBackend.ASTROPY
elif rows is not None and HAS_FITSIO:
# Data not cached and specific rows requested - use fitsio for efficient selective loading
fits_backend = FITSBackend.FITSIO
else:
# Full load without cache - use astropy (memory-mapped)
fits_backend = FITSBackend.ASTROPY
if fits_backend == FITSBackend.FITSIO and HAS_FITSIO:
return self._rawspectra_fitsio(bintable, rows=rows, setmask=setmask)
else:
return self._rawspectra_astropy(bintable, rows=rows, setmask=setmask)
def _rawspectra_astropy(
self, bintable: int, rows: "ArrayLike | None" = None, setmask: bool = False
) -> np.ma.MaskedArray:
"""
Load DATA column via astropy (memory-mapped, efficient for full array access).
Parameters
----------
bintable : int
The index of the `bintable` attribute
rows : array-like or None
If provided, slice these rows after loading. If None, load all rows.
setmask : bool
If True, set the data mask according to flags.
Returns
-------
np.ma.MaskedArray
The raw spectra data
"""
nrows_req = len(rows) if rows is not None else self.nrows(bintable)
_log_mem(f"_rawspectra_astropy: loading DATA via astropy, {nrows_req} rows requested")
data = self._bintable[bintable].data[:]["DATA"]
data_size_mb = data.nbytes / (1024**2)
_log_mem(f"_rawspectra_astropy: loaded {data.shape}, {data_size_mb:.1f} MB")
if rows is not None:
rows_array = np.asarray(rows)
if len(rows_array) == 0:
nchan = self.nchan(bintable)
return np.ma.MaskedArray(np.empty((0, nchan)), mask=False)
data = data[rows_array]
if setmask and self._flagmask is not None:
mask = self._flagmask[bintable][rows_array]
else:
mask = False
elif setmask and self._flagmask is not None:
mask = self._flagmask[bintable]
else:
mask = False
return np.ma.MaskedArray(data, mask=mask)
def _rawspectra_fitsio(
self, bintable: int, rows: "ArrayLike | None" = None, setmask: bool = False
) -> np.ma.MaskedArray:
"""
Load DATA column via fitsio (efficient for selective row loading).
Parameters
----------
bintable : int
The index of the `bintable` attribute
rows : array-like or None
If provided, load only these rows. If None, load all rows.
setmask : bool
If True, set the data mask according to flags.
Returns
-------
np.ma.MaskedArray
The raw spectra data
"""
# Get HDU index - bintables start at HDU 1 (HDU 0 is primary)
hdu_index = bintable + 1
nrows_req = len(rows) if rows is not None else self.nrows(bintable)
_log_mem(f"_rawspectra_fitsio: loading DATA via fitsio, {nrows_req} rows requested")
with fitsio.FITS(self._filename) as fits_file:
if rows is not None:
rows_array = np.asarray(rows)
if len(rows_array) == 0:
nchan = self.nchan(bintable)
return np.ma.MaskedArray(np.empty((0, nchan)), mask=False)
# fitsio can read specific rows efficiently
data = fits_file[hdu_index].read(columns=["DATA"], rows=rows_array)["DATA"]
_log_mem(f"_rawspectra_fitsio: loaded {data.shape}, {data.nbytes / (1024**2):.1f} MB")
if setmask and self._flagmask is not None:
mask = self._flagmask[bintable][rows_array]
else:
mask = False
else:
data = fits_file[hdu_index].read(columns=["DATA"])["DATA"]
_log_mem(f"_rawspectra_fitsio: loaded ALL rows {data.shape}, {data.nbytes / (1024**2):.1f} MB")
if setmask and self._flagmask is not None:
mask = self._flagmask[bintable]
else:
mask = False
return np.ma.MaskedArray(data, mask=mask)
[docs]
def load_full_rows(self, rows: np.ndarray, bintable: int = 0, exclude_data: bool = True) -> pd.DataFrame:
"""
Load full rows (all columns) for specific row indices from FITS file.
This enables lazy loading when row data is needed - loads the entire row
once rather than column by column.
Parameters
----------
rows : np.ndarray
Row indices to load
bintable : int
The bintable index (default 0)
exclude_data : bool
If True (default), exclude the DATA column (it's huge and loaded separately)
Returns
-------
pd.DataFrame
DataFrame with all columns for the requested rows
"""
if len(rows) == 0:
return pd.DataFrame()
hdu_index = bintable + 1
rows_array = np.asarray(rows)
_log_mem(f"load_full_rows: loading {len(rows_array)} rows from bintable {bintable}")
with fitsio.FITS(self._filename) as fits_file:
available_cols = fits_file[hdu_index].get_colnames()
# Exclude DATA column (huge, loaded separately) and other problematic columns
cols_to_load = [c for c in available_cols if c.upper() not in ("DATA", "FLAGS")]
if not exclude_data:
cols_to_load = available_cols
data = fits_file[hdu_index].read(columns=cols_to_load, rows=rows_array)
# Convert to DataFrame
result = {}
for col in data.dtype.names:
val = data[col]
# Handle string columns - decode bytes and strip whitespace
if val.dtype.kind in ("S", "U"):
if val.dtype.kind == "S":
val = np.char.decode(val, "utf-8")
val = np.char.strip(val)
result[col] = val
df = pd.DataFrame(result)
# Add DATE column from DATE-OBS if not present
# The Scan code expects DATE for aperture efficiency calculation,
# but DATE-OBS is the actual observation date per row
if "DATE" not in df.columns and "DATE-OBS" in df.columns:
# Extract just the date part (YYYY-MM-DD) from DATE-OBS
df["DATE"] = df["DATE-OBS"].str[:10]
return df
[docs]
def rawspectrum(self, i, bintable=0, setmask=False):
"""
Get a single raw (unprocessed) spectrum from the input bintable.
Parameters
----------
i : int
The row index to retrieve.
bintable : int or None
The index of the `bintable` attribute. If None, the underlying bintable is computed from i
setmask : bool
If True, set the data mask according to the current flags in the `_flagmask` attribute.
Returns
-------
rawspectrum : ~numpy.ma.MaskedArray
The i-th row of DATA column of the input bintable, masked according to `setmask`
"""
if bintable is None:
(bt, row) = self._find_bintable_and_row(i)
else:
bt = bintable
row = i
# Use rawspectra with a single row to avoid triggering astropy's data cache
# This ensures fitsio is used for partial row loading
return self.rawspectra(bt, setmask=setmask, rows=[row])[0]
[docs]
def getrow(self, i, bintable=0):
"""
Get a :class:`~astropy.io.fits.fitsrec.FITS_record` from the input bintable
Parameters
----------
i : int
The record (row) index to retrieve
bintable : int
The index of the `bintable` attribute
Returns
-------
row : :class:`~astropy.io.fits.fitsrec.FITS_record`
The i-th record of the input bintable
"""
return self._bintable[bintable].data[i]
[docs]
def getspec(self, i, bintable=0, observer_location=None, setmask=False):
"""
Get a row (record) as a Spectrum
Parameters
----------
i : int
The record (row) index to retrieve
bintable : int, optional
The index of the `bintable` attribute. default is 0.
observer_location : `~astropy.coordinates.EarthLocation`
Location of the observatory. See `~dysh.coordinates.Observatory`.
This will be transformed to `~astropy.coordinates.ITRS` using the time of observation DATE-OBS or MJD-OBS in
the SDFITS header. The default is None.
setmask : bool
If True, set the data mask according to the current flags in the `_flagmask` attribute.
Returns
-------
s : `~dysh.spectra.spectrum.Spectrum`
The Spectrum object representing the data row.
"""
df = self.index(bintable=bintable)
meta = df.iloc[i].dropna().to_dict()
data = self.rawspectrum(i, bintable, setmask=setmask)
meta["NAXIS1"] = len(data)
if "CUNIT1" not in meta:
# @todo this is in gbtfits.hdu[0].header['TUNIT11'] or whereever CRVAL1 is stored
# for good measure CTYPE1 and CDELT1 also store this unit, and better be the same
meta["CUNIT1"] = "Hz"
logger.debug("Fixing CUNIT1 to Hz")
meta["CUNIT2"] = "deg" # is this always true?
meta["CUNIT3"] = "deg" # is this always true?
restfrq = meta["RESTFREQ"]
rfq = restfrq * u.Unit(meta["CUNIT1"])
restfreq = rfq.to("Hz").value
meta["RESTFRQ"] = restfreq # WCS wants no E
# @todo could we safely store it in meta['BUNIT']
# for now, loop over the binheader keywords to find the matching TUNITxx that belongs to TTYPExx='DATA'
# if BUNIT was also found, a comparison is made, but TUNIT wins
# NOTE: sdfits file from sdf.write() currently have
if "BUNIT" in meta:
bunit = meta["BUNIT"]
logger.debug(f"BUNIT {bunit} already set")
else:
bunit = None
h = self.binheader[0]
for k, v, _c in h.cards:
if k == "BUNIT":
bunit = v
ukey = None
for k, v, _c in h.cards: # loop over the (key,val,comment) for all cards in the header
if v == "DATA":
ukey = "TUNIT" + k[5:]
break
if ukey is not None: # ukey should almost never be "None" in standard SDFITS
for k, v, _c in h.cards:
if k == ukey:
if bunit != v:
logger.debug(f"Found BUNIT={bunit}, now finding {ukey}={v}, using the latter")
if len(v) == 0:
logger.debug("Blank BUNIT; overriding unit as 'ct'")
bunit = u.ct
else:
bunit = v
break
if bunit is None:
logger.debug("no bunit yet")
if bunit is not None:
bunit = u.Unit(bunit)
else:
bunit = u.ct
logger.debug(f"BUNIT = {bunit}")
# use from_unmasked so we don't get the astropy INFO level message about replacing a mask
# (doesn't work -- the INFO message comes from the Spectrum1D constructor)
masked_data = Masked.from_unmasked(data.data, data.mask) * bunit
s = Spectrum.make_spectrum(masked_data, meta, observer_location=observer_location)
return s
[docs]
def nrows(self, bintable: int = 0) -> int:
"""
The number of rows of the input bintable
Parameters
----------
bintable : int
The index of the `bintable` attribute
Returns
-------
nrows : int
Number of rows, i.e., the length of the input bintable
"""
return self._nrows[bintable]
[docs]
def ncols(self, bintable: int = 0) -> int:
"""
The number of columns of the input bintable
Parameters
----------
bintable : int
The index of the `bintable` attribute
Returns
-------
ncols : int
Number of columns, i.e., the width of the input bintable
"""
return self._ncols[bintable]
[docs]
def nchan(self, bintable: int = 0) -> int:
"""
The number of channels per row of the input bintable. Assumes all rows have same length.
Parameters
----------
bintable : int
The index of the `bintable` attribute
Returns
-------
nchan : int
Number channels in the first spectrum of the input bintable
"""
# Get nchan from FITS metadata without loading data
# This avoids triggering astropy's data cache which would cause
# subsequent rawspectra() calls to load the entire DATA column
if HAS_FITSIO:
try:
with fitsio.FITS(self._filename) as fits_file:
hdu_index = bintable + 1
dtype_info = fits_file[hdu_index].get_rec_dtype()[0]
# DATA column dtype is like ('DATA', '>f4', (16384,))
data_shape = dtype_info.fields["DATA"][0].shape
if data_shape:
return data_shape[0]
except (KeyError, IndexError, AttributeError):
pass
# Fallback: load single row (this triggers astropy cache, but at least it works)
return np.shape(self.rawspectrum(0, bintable))[0]
[docs]
def npol(self, bintable: int = 0) -> int:
"""
The number of polarizations present in the input bintable.
Parameters
----------
bintable : int
The index of the `bintable` attribute
Returns
-------
npol: int
Number of polarizations as given by `CRVAL4` FITS header keyword.
"""
return len(self.udata(key="CRVAL4", bintable=bintable))
[docs]
def nsources(self, bintable: int = 0) -> int:
"""
The number of sources present in the input bintable.
Parameters
----------
bintable : int
The index of the `bintable` attribute
Returns
-------
nsources: int
Number of sources as given by `OBJECT` FITS header keyword.
"""
return self.udata(bintable=bintable, key="OBJECT")
[docs]
def nscans(self, bintable: int = 0) -> int:
"""
The number of scans present in the input bintable.
Parameters
----------
bintable : int
The index of the `bintable` attribute
Returns
-------
nscans: int
Number of scans as given by `SCAN` FITS header keyword.
"""
return self.udata(key="SCAN", bintable=bintable)
def _summary(self, bintable: int = 0):
j = bintable
nrows = self.naxis(bintable=j, naxis=2)
nflds = self._binheader[j]["TFIELDS"]
restfreq = np.unique(self.index(bintable=j)["RESTFREQ"]) / 1.0e9
print(f"HDU {j + 1}")
print(f"BINTABLE: {self._nrows[j]} rows x {nflds} cols with {self.nchan(j)} chans")
print(f"Selected {self._nrows[j]}/{nrows} rows")
print("Sources: ", self.nsources(j))
print("RESTFREQ:", restfreq, "GHz")
print("Scans: ", self.nscans(j))
print("Npol: ", self.npol(j))
print("Nint: ", self.nintegrations(j))
[docs]
def summary(self):
"""Print a summary of each record of the data"""
print(f"File: {self._filename}")
for i in range(len(self._bintable)):
print("i=", i)
self._summary(i)
def _column_mask(self, column_dict):
"""
Generate a boolean array given a dictionary.
The keys in the dictionary are the column names and the values the column values.
It will combine multiple dictionary items using `numpy.logical_and`.
Parameters
----------
column_dict : dict
Dictionary with column names and column values as keys and values.
Returns
-------
_mask : `numpy.array`
Array of booleans with True where the column equals column value.
"""
_mask = np.zeros((len(column_dict), len(self[next(iter(column_dict.keys()))])), dtype=bool)
for i, (k, v) in enumerate(column_dict.items()):
_mask[i, :] = self[k] == v
if _mask.shape[0] > 1:
_mask = np.logical_and.reduce(_mask)
return _mask.ravel()
def __repr__(self):
return str(self._filename)
def __str__(self):
return str(self._filename)
def _bintable_from_rows(self, rows, bintable):
"""
Extract a bintable from an existing
bintable in this SDFITSLoad object
Parameters
----------
rows: int or list-like
Range of rows in the bintable(s) to write out. e.g. 0, [14,25,32].
Rows will be sorted again here, just in case this was not done.
bintable : int
The index of the `bintable` attribute
Returns
-------
outbintable : ~astropy.iofits.BinTableHDU
The binary table HDU created containing the selected rows
"""
# ensure it is a list if int was given
if type(rows) == int: # noqa: E721
rows = [rows]
# ensure rows are sorted
rows.sort()
header = self._bintable[bintable].header
data = self._bintable[bintable].data[rows]
outbintable = BinTableHDU(data, header)
return outbintable
[docs]
def write(
self,
fileobj,
rows=None,
bintable=None,
flags=True,
output_verify="exception",
overwrite=False,
checksum=False,
):
"""
Write the `SDFITSLoad` to a new file, potentially sub-selecting rows or bintables.
Parameters
----------
fileobj : str, file-like or `pathlib.Path`
File to write to. If a file object, must be opened in a
writeable mode.
rows: int or list-like
Range of rows in the bintable(s) to write out. e.g. 0, [14,25,32]. Default: None, meaning all rows
Note: Currently `rows`, if given, must be contained in a single bintable and bintable must be given
bintable : int
The index of the `bintable` attribute or None for all bintables. Default: None
flags: bool, optional
If True, write the applied flags to a `FLAGS` column in the binary table
output_verify : str
Output verification option. Must be one of ``"fix"``,
``"silentfix"``, ``"ignore"``, ``"warn"``, or
``"exception"``. May also be any combination of ``"fix"`` or
``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception"
(e.g. ``"fix+warn"``). See https://docs.astropy.org/en/latest/io/fits/api/verification.html for more info
overwrite : bool, optional
If ``True``, overwrite the output file if it exists. Raises an
``OSError`` if ``False`` and the output file exists. Default is
``False``.
checksum : bool
When `True` adds both ``DATASUM`` and ``CHECKSUM`` cards
to the headers of all HDU's written to the file.
"""
if bintable is None:
if rows is None:
# write out everything
self._hdu.writeto(fileobj, output_verify=output_verify, overwrite=overwrite, checksum=checksum)
else:
raise ValueError("You must specify bintable if you specify rows")
elif rows is None:
# bin table index counts from 0 and starts at the 2nd HDU (hdu index 1), so add 2
self._hdu[0 : bintable + 2]._hdu.writeto(
fileobj, output_verify=output_verify, overwrite=overwrite, checksum=checksum
)
else:
hdu0 = self._hdu[0].copy()
# need to get imports correct first
# hdu0.header["DYSHVER"] = ('dysh '+version(), "This file was created by dysh")
outhdu = fits.HDUList(hdu0)
outbintable = self._bintable_from_rows(rows, bintable)
outhdu.append(outbintable)
outhdu.writeto(fileobj, output_verify=output_verify, overwrite=overwrite, checksum=checksum)
outhdu.close()
[docs]
def rename_column(self, oldname, newname):
"""
Rename a column in both index table and any binary tables in this SDFITSLoad
Parameters
----------
oldname : str
The SDFITS binary table column to rename, e.g. 'SITELAT', case insensitive
newname : str
The new name for the SDFITS binary table column, case insensitive
All names will be uppercased before rename.
Returns
-------
None.
"""
ou = oldname.upper()
nu = newname.upper()
self._index.rename(columns={ou: nu}, inplace=True) # noqa: PD002
self._rename_binary_table_column(ou, nu)
[docs]
def delete_column(self, column):
"""
Delete one or more columns from both the index table and any binary tables in this SDFITSLoad
*Warning: cannot be undone*
Parameters
----------
column : str or list-like
The column name(s) to delete.
Returns
-------
None.
"""
warnings.warn(f"Deleting column {column}. Cannot be undone!") # noqa: B028
if isinstance(column, str):
cu = column.upper()
if cu == "DATA":
raise Exception("You can't delete the DATA column. It's for your own good.")
self._index.drop(columns=cu, inplace=True) # noqa: PD002
self._delete_binary_table_column(column)
elif isinstance(column, (Sequence, np.ndarray)):
cu = [c.upper() for c in column]
self._index.drop(columns=cu, inplace=True) # noqa: PD002
for c in column:
self._delete_binary_table_column(c)
def _delete_binary_table_column(self, column, bintable=None):
"""
Delete a column in one or more binary tables of this SDFITSLoad
*Warning: cannot be undone*
Parameters
----------
oldname : str
The SDFITS binary table column to delete, e.g. 'SITELAT', case insensitive
bintable : int, optional
Index of the binary table on which to operate, or None for all binary tables. The default is None.
All names will be uppercased before rename.
Returns
-------
None.
"""
cu = column.upper()
if cu == "DATA":
raise Exception("You can't delete the DATA column. It's a bad idea.")
if bintable is None:
for b in self._bintable:
b.columns.del_col(cu)
else:
self._bintable[bintable].columns.del_col(cu)
def _rename_binary_table_column(self, oldname, newname, bintable=None):
"""
Rename a column in one or more binary tables of this SDFITSLoad
Parameters
----------
oldname : str
The SDFITS binary table column to rename, e.g. 'SITELAT', case insensitive
newname : str
The new name for the SDFITS binary table column, case insensitive
bintable : int, optional
Index of the binary table on which to operate, or None for all binary tables. The default is None.
All names will be uppercased before rename.
Returns
-------
None.
"""
ou = oldname.upper()
nu = newname.upper()
if bintable is None:
for b in self._bintable:
b.columns.change_attrib(ou, "name", nu)
else:
b = self._bintable[bintable]
b.columns.change_attrib(ou, "name", nu)
def _add_binary_table_column(self, name, value, bintable=None):
"""
Add a new column to the SDFITS binary table(s). Length of value must match
number of rows in binary table `bintable`, or sum of all rows if `bintable=None`.
Parameters
----------
name : str
column name to add
value : list-like or ~`astropy.io.fits.Column`
The column values
bintable : int, optional
Index of the binary table on which to operate, or None for all binary tables. The default is None.
Returns
-------
None.
"""
# If we pass the data through a astropy Table first, then the conversion of
# numpy array dtype to FITS format string (e.g, '12A') gets done automatically and correctly.
is_col = isinstance(value, Column)
if is_col:
lenv = len(value.array)
else:
lenv = len(value)
if bintable is not None:
if lenv != self.nrows(bintable):
raise ValueError(
f"Length of values array ({len(value)}) for column {name} and total number of rows"
f" ({self.nrows(bintable)}) aren't equal."
)
if is_col:
self._bintable[bintable].columns.add_col(value)
else:
t1 = Table(names=[name], data=[value])
t = BinTableHDU(t1)
self._bintable[bintable].columns.add_col(t.columns[name])
else:
if lenv != self.total_rows:
raise ValueError(
f"Length of values array ({len(value)}) for column {name} and total number of rows"
f" ({self.total_rows}) aren't equal."
)
# Split values up by length of the individual binary tables
start = 0
for i in range(len(self._bintable)):
n = self._nrows[i]
if isinstance(value, Column):
cut = Column(
name=value.name,
format=value.format,
dim=value.dim,
array=value.array[start : start + n],
)
self._bintable[i].columns.add_col(cut)
else:
t = BinTableHDU(Table(names=[name], data=[value[start : start + n]]))
self._bintable[i].columns.add_col(t.columns[name])
start = start + n
def _update_column_added(self, bintable, coldefs):
self.bintable[bintable].data = fits.fitsrec.FITS_rec.from_columns(
columns=coldefs,
nrows=self.bintable[bintable]._nrows,
fill=False,
character_as_bytes=self.bintable[bintable]._character_as_bytes,
)
def _update_binary_table_column(self, column_dict, bintable=0):
"""
Change or add one or more columns to a SDFITS binary table.
Parameters
----------
column_dict : dict
Dictionary with column names as keys and column values. Values can be numpy arrays or `~astropy.io.fits.Column`
bintable : int
Index of the binary table to be updated.
"""
b = bintable
for k, v in column_dict.items():
is_col = isinstance(v, Column)
if is_col:
value = v.array
else:
value = v
# self._bintable[i].data is an astropy.io.fits.fitsrec.FITS_rec, with length equal
# to number of rows
is_str = isinstance(value, str)
if is_str or not isinstance(value, np.ndarray):
value = np.full(self._bintable[b]._nrows, value)
# NOTE: if k is from the primary header and not a data column
# then this test fails, and we will ADD a new binary data column.
# So the primary header and data column could be inconsistent.
# It actually happens in
# test_gbtfitsload.py:test_set_item for SITELONG=[-42.21]*g.total_rows
# Indeed there is a warning in _add_primary_hdu about this.
# The behavior is intended,the column takes precedence over
# the primary hdu, but we should document this publicly.
if k in self._bintable[b].data.names:
# have to assigned directly to array because somewhere
# deep in astropy a ref is kept to the original coldefs data which
# gets recopied if a column is added.
self._bintable[b].data.columns[k].array = value
self._bintable[b].data[k] = value
# otherwise we need to add rather than replace/update
else:
self._add_binary_table_column(k, value, b)
self._bintable[b].update_header()
def _update_column(self, column_dict, bintable=None):
"""Change or add one or more columns to the SDFITS binary table(s)
Parameters
----------
column_dict : dict
Dictionary with column names as keys and column values. Values can be numpy arrays or `~astropy.io.fits.Column`
bintable : int
Index of the binary table to be updated.
If None, it will try to update all binary tables.
"""
# if there is only one bintable, it is straightforward.
# BinTableHDU interface will take care of the types and data lengths matching.
# It will even allow the case where len(values) == 1 to replace all column values with
# a single value.
if bintable is None:
if len(self._bintable) == 1:
self._update_binary_table_column(column_dict, bintable=0)
else:
start = 0
for k, v in column_dict.items():
is_col = isinstance(v, Column)
if is_col:
value = v.array
else:
value = v
is_str = isinstance(value, str)
if not is_str and isinstance(value, (Sequence, np.ndarray)) and len(value) != self.total_rows:
raise ValueError(
f"Length of values array ({len(value)}) for column {k} and total number of rows ({self.total_rows})"
" aren't equal."
)
# Split values up by length of the individual binary tables
for j in range(len(self._bintable)):
b = self._bintable[j]
if k in b.data.names:
n = len(b.data)
if is_str or not isinstance(value, (Sequence, np.ndarray)):
b.data.columns[k].array = value
b.data[k] = value
else:
b.data.columns[k].array = value[start : start + n]
b.data[k] = value[start : start + n]
start = start + n
else:
if not is_str and isinstance(value, Sequence):
v1 = np.array(value)
else:
v1 = value
n = len(b.data)
if is_str or not isinstance(value, (Sequence, np.ndarray)):
# we have to make an array from value if the user
# did a single assignment for a column, e.g. sdf["TCAL"] = 3.
# Need a new variable here or multiple loops keep expanding value
v1 = np.full(n, value)
else:
v1 = np.array(value[start : start + n])
start = start + n
self._add_binary_table_column(k, v1, j)
self._bintable[j].update_header()
else:
self._update_binary_table_column(column_dict, bintable=bintable)
def __getitem__(self, items):
# items can be a single string or a list of strings.
# Want case insensitivity
if isinstance(items, str):
items = items.upper()
elif isinstance(items, (Sequence, np.ndarray)):
items = [i.upper() for i in items]
else:
raise KeyError(f"Invalid key {items}. Keys must be str or list of str")
# Deal with columns that have multiple dimensions
for multikey in ["DATA", "FLAGS"]:
if multikey in items:
if not np.all(
[b.data[multikey].shape == self._bintable[0].data[multikey].shape for b in self._bintable]
):
raise ValueError(
f"{multikey} column has different shapes (number of channels) in the binary tables of this SDFITSLoad."
"So they cannot be returned as a single array."
f"They can be accessed via the _bintable.data['{multikey}'] attribute."
)
if len(self._bintable) == 1:
return self._bintable[0].data[multikey]
else:
return np.vstack([b.data[multikey] for b in self._bintable])
# Lazy loading: if column(s) missing and loaded from .index file, trigger full load
if isinstance(items, str):
missing_keys = [items] if items not in self._index.columns else []
else:
missing_keys = [k for k in items if k not in self._index.columns]
if missing_keys and self._index_source == "index_file":
logger.debug(
f"Column(s) {missing_keys} not available in .index file. "
f"Triggering full FITS index load to access all columns..."
)
# Force full reload from FITS (skip .index file)
self._index_source = None # Reset to prevent infinite loop
self.create_index(force_fits=True)
logger.debug(f" ✓ Full index loaded: {len(self._index.columns)} columns now available")
return self._index[items]
def __setitem__(self, items, values):
if isinstance(items, str):
items = items.upper()
d = {items: values}
# we won't support multiple keys for setting right now.
# ultimately it could be done with recursive call to __setitem__
# for each key/val pair
# elif isinstance(items, (Sequence, np.ndarray)):
# items = [i.upper() for i in items]
# d = {i: values[i] for i in items}
else:
raise KeyError(f"Invalid key {items}. Keys must be str")
if "DATA" in items:
warnings.warn("Beware: you are changing the DATA column.") # noqa: B028
# warn if changing an existing column
if isinstance(items, str):
iset = set([items])
else:
iset = set(items)
col_exists = len(set(self.columns).intersection(iset)) > 0
if col_exists and "DATA" not in items:
warnings.warn(f"Changing an existing SDFITS column {items}") # noqa: B028
try:
self._update_column(d)
except Exception as e:
raise Exception(f"Could not update SDFITS binary table for {items} because {e}") # noqa: B904
# only update the index if the binary table could not be updated.
# DATA is not in the index.
if "DATA" not in items:
self._index[items] = values