import datetime
import os
import warnings
from typing import Any, Callable, Literal, ParamSpec, TypeVar
import astropy.wcs
import numpy as np
import photutils.aperture
import PIL.Image
import scipy.ndimage
from astropy.io import fits
from astropy.utils.exceptions import AstropyWarning
from . import common, utils
from .base import _cache_clearable_result, _cache_stable_result
from .body_xy import BodyXY, MapKwargs, Unpack
from .progress import SaveMapProgressHookCLI, SaveNavProgressHookCLI, progress_decorator
T = TypeVar('T')
S = TypeVar('S')
P = ParamSpec('P')
[docs]class Observation(BodyXY):
"""
Class representing an actual observation of an astronomical body at a specific time.
This is a subclass of :class:`BodyXY`, with additional methods to interact with the
observed data, such as by saving a FITS file containing calculated backplane data.
All methods described in :class:`BodyXY`, :class:`Body` and :class:`SpiceBase` are
therefore available in instances of this class.
This class can be created by either providing a `path` to a data file to be loaded,
or by directly providing the `data` itself (and optionally a FITS `header`). The
`nx` and `ny` values for :class:`BodyXY` will automatically be calculated from the
input data.
If the input data is a FITS file (or a `header` is specified), then this class will
attempt to generate appropriate parameters (e.g. `target`, `utc`) by using the
values in the header. This allows an instance of this class to be created with a
single argument specifying the `path` to the FITS file e.g.
`Observation('path/to/file.fits')`. Manually specified parameters will take
precedence, so `Observation('path/to/file.fits', target='JUPITER')` will have
Jupiter as a target, regardless of any values saying otherwise in the FITS header.
If a FITS header is not provided (e.g. if the input path corresponds to an image
file), then at least the `target` and `utc` parameters need to be specified.
When an `Observation` object is created, the disc parameters
`(x0, y0, r0, rotation)` initialised to the most useful values possible:
1. If the input file has previously been fit by PlanetMapper, the previous parameter
values saved in the FITS header are loaded using :func:`disc_from_header`.
2. Otherwise, if there is WCS information in the FITS header, this is loaded with
:func:`disc_from_wcs`.
3. Finally, if there is no useful information in the FITS header (or no header is
provided), the disc parameters are initialised using :func:`centre_disc`.
Args:
path: Path to data file to load. If this is `None` then `data` must be specified
instead. Any user (`~`) and shell variables (e.g. `$var`) in the path are
automatically expanded if possible.
data: Array containing observation data to use instead of loading the data from
`path`. This should only be provided if `path` is None.
header: FITS header which corresponds to the provided `data`. This is optional
and should only be provided if `path` is None.
target: Name of target body, passed to :class:`Body`. If this is unspecified,
then the target will be derived from the values in the FITS header.
utc: Time of observation, passed to :class:`Body`. If this is unspecified, then
the time will be derived from the values in the FITS header.
**kwargs: Additional parameters are passed to :class:`BodyXY`. These can be used
to specify additional parameters such as`observer`. The image size is
automatically determined from the data, so passing `nx`, `ny` or `sz` as
arguments when creating an `Observation` object will raise a `TypeError`.
"""
FITS_FILE_EXTENSIONS = ('.fits', '.fits.gz')
"""
File extensions which will be read as FITS files. All other file extensions will be
assumed to be images.
"""
FITS_KEYWORD = 'PLANMAP'
"""FITS keyword used in metadata added to header of output FITS files."""
def __init__(
self,
path: str | None = None,
*,
data: np.ndarray | None = None,
header: fits.Header | None = None,
**kwargs,
) -> None:
for k in ('nx', 'ny', 'sz'):
if k in kwargs:
raise TypeError(f'Cannot set {k} for Observation objects')
self._path_arg = path
self._data_arg = data
self._header_arg = header
# Add docstrings
self.path: str | None
"""Path of input data file, or `None` if no file was provided."""
self.data: np.ndarray
"""Observed data."""
self.header: fits.Header
"""
FITS header containing data about the observation. If this is not provided, then
a basic header will be produced containing data derived from the `target` and
`utc` parameters.
"""
if path is not None:
path = os.path.expandvars(os.path.expanduser(path))
self.path = path
self.header = None # type: ignore
# TODO add warning about header being modified in place? Or copy header?
if self.path is None:
if data is None:
raise ValueError('Either `path` or `data` must be provided')
self.data = data
if header is not None:
self.header = header
else:
# TODO should we have a way to provide both path and header for e.g. JPEGS?
if data is not None:
raise ValueError('`path` and `data` are mutually exclusive')
if header is not None:
raise ValueError('`path` and `header` are mutually exclusive')
self._load_data_from_path()
# TODO validate/standardise shape of data here (cube etc.)
self.data = np.asarray(self.data)
if len(self.data.shape) == 2:
# Turn data into cube for consistency
self.data = self.data[np.newaxis, ...]
if self.header is not None:
# use values from header to fill in arguments (e.g. target) which aren't
# specified by the user
self._add_kw_from_header(kwargs, self.header)
super().__init__(nx=self.data.shape[-1], ny=self.data.shape[-2], **kwargs)
if self.header is None:
self.header = fits.Header(
{
'OBJECT': self.target,
'DATE-OBS': self.utc,
}
)
try:
self.disc_from_header()
except ValueError:
try:
self.disc_from_wcs(suppress_warnings=True)
except ValueError:
self.centre_disc()
def __repr__(self) -> str:
return f'Observation({self.path!r})'
[docs] def to_body_xy(self) -> BodyXY:
"""
Create a :class:`BodyXY` object with the same parameters and data as this
observation.
Returns:
:class:`BodyXY` object with the same disc parameters as this
:class:`Observation` instance.
"""
new = BodyXY(**BodyXY._get_kwargs(self))
BodyXY._copy_options_to_other(self, new)
return new
def _get_equality_tuple(self) -> tuple:
# Use nan_to_num to convert NaNs to zeros, so that NaNs in the data don't
# cause the equality check to fail. Then use isnan to compare the NaN masks
# to ensure e.g. [1, NaN] != [1, 0]. Compare .data to get booleans rather
# than numpy's array of booleans.
return (
self.path,
np.nan_to_num(self.data).data,
np.isnan(self.data).data,
self.header,
super()._get_equality_tuple(),
)
def _get_kwargs(self) -> dict[str, Any]:
kw = super()._get_kwargs() | dict(
path=self._path_arg,
data=self._data_arg,
header=self._header_arg,
)
kw.pop('nx')
kw.pop('ny')
return kw
def _load_data_from_path(self):
assert self.path is not None
if any(self.path.endswith(ext) for ext in self.FITS_FILE_EXTENSIONS):
self._load_fits_data()
else:
self._load_image_data()
def _load_fits_data(self):
assert self.path is not None
with fits.open(self.path, memmap=False) as hdul:
for idx, hdu in enumerate(hdul):
if hdu.data is not None:
data = hdu.data
if idx:
# pylint: disable-next=no-member
header = hdul[0].header.copy() # type: ignore
header.update(hdu.header.copy())
else:
header = hdu.header.copy()
break
else:
raise ValueError('No data found in provided FITS file')
if len(data.shape) == 2:
# If greyscale image, add another dimension so that it is an image cube with
# a single frame. This will ensure that data will always be a cube.
data = np.array([data])
self.data = data
self.header = header
def _load_image_data(self):
assert self.path is not None
image = np.flipud(np.array(PIL.Image.open(self.path)))
if len(image.shape) == 2:
# If greyscale image, add another dimension so that it is an image cube with
# a single frame. This will ensure that data will always be a cube.
image = np.array([image])
else:
# If RGB image, change the axis order so wavelength is the first axis (i.e.
# consistent with FITS)
image = np.moveaxis(image, 2, 0)
self.data = image
@classmethod
def _add_kw_from_header(cls, kw: dict, header: fits.Header):
# fill in kwargs with values from header (if they aren't specified by the user)
_try_get_header_value(
kw,
header,
'target',
[cls._make_fits_kw('TARGET'), 'OBJECT', 'TARGET', 'TARGNAME'],
)
_try_get_header_value(
kw,
header,
'observer',
[cls._make_fits_kw('OBSERVER'), 'TELESCOP'],
value_fn=lambda v: 'EARTH' if str(v).startswith('ESO-') else v,
)
_try_get_header_value(
kw,
header,
'utc',
[cls._make_fits_kw('UTC-OBS'), 'MJD-AVG', 'EXPMID', 'DATE-AVG'],
)
if 'utc' not in kw:
try:
# If the header has a MJD value for the start and end of the
# observation, average them and use astropy to convert this
# mid-observation MJD into a fits format time string
beg = float(header['MJD-BEG']) # type: ignore
end = float(header['MJD-END']) # type: ignore
mjd = (beg + end) / 2
kw['utc'] = mjd
except (KeyError, TypeError, ValueError):
pass
if 'utc' not in kw:
try:
kw['utc'] = header['DATE-OBS'] + ' ' + header['TIME-OBS'] # type: ignore
except KeyError:
pass
_try_get_header_value(
kw,
header,
'utc',
['DATE-OBS', 'DATE-BEG', 'DATE-END', 'MJD-BEG', 'MJD-END'],
)
_try_get_header_value(
kw, header, 'observer_frame', [cls._make_fits_kw('OBSERVER-FRAME')]
)
_try_get_header_value(
kw, header, 'illumination_source', [cls._make_fits_kw('ILLUMINATION')]
)
_try_get_header_value(
kw, header, 'aberration_correction', [cls._make_fits_kw('ABCORR')]
)
_try_get_header_value(
kw, header, 'subpoint_method', [cls._make_fits_kw('SUBPOINT-METHOD')]
)
_try_get_header_value(
kw, header, 'surface_method', [cls._make_fits_kw('SURFACE-METHOD')]
)
# API overrides
def set_img_size(self, nx: int | None = None, ny: int | None = None) -> None:
""":meta private:"""
raise TypeError('Cannot set image size for Observation objects')
# Auto disc id
def _get_wcs_from_header(self, suppress_warnings: bool = False) -> astropy.wcs.WCS:
with warnings.catch_warnings():
if suppress_warnings:
warnings.simplefilter('ignore', category=AstropyWarning)
return astropy.wcs.WCS(self.header).celestial
@_cache_stable_result
def _get_disc_params_from_wcs(
self,
suppress_warnings: bool = False,
validate: bool = True,
use_header_offsets: bool = True,
) -> tuple[float, float, float, float]:
wcs = self._get_wcs_from_header(suppress_warnings=suppress_warnings)
if wcs.naxis == 0:
raise ValueError('No WCS information found in FITS header')
if validate:
if not all(u == 'deg' for u in wcs.world_axis_units):
raise ValueError(
'WCS coordinates are not in degrees'
) # pragma: no cover
if not wcs.world_axis_physical_types == ['pos.eq.ra', 'pos.eq.dec']:
raise ValueError(
'WCS axes are not RA/Dec coordinates'
) # pragma: no cover
if wcs.has_distortion:
raise ValueError('WCS conversion contains distortion terms')
if use_header_offsets:
dra_arcsec = float(self.header.get('HIERARCH NAV RA_OFFSET', 0.0)) # type: ignore
ddec_arcsec = float(self.header.get('HIERARCH NAV DEC_OFFSET', 0.0)) # type: ignore
dra = dra_arcsec / 3600
ddec = ddec_arcsec / 3600
else:
dra = 0.0
ddec = 0.0
x0, y0 = wcs.world_to_pixel_values(self.target_ra + dra, self.target_dec + ddec)
b1, b2 = wcs.pixel_to_world_values(x0, y0 + 1)
c1, c2 = wcs.pixel_to_world_values(x0, y0)
rotation = np.rad2deg(np.arctan2(b1 - c1, b2 - c2))
s = self.angular_dist(b1, b2, c1, c2)
arcsec_per_px = s * 60 * 60 # s = degrees/px
r0 = self.target_diameter_arcsec / (2 * arcsec_per_px)
return x0, y0, r0, rotation
[docs] def disc_from_wcs(
self,
suppress_warnings: bool = False,
validate: bool = True,
use_header_offsets: bool = True,
) -> None:
"""
Set disc parameters using WCS information in the observation's FITS header.
See also :func:`rotation_from_wcs` and :func:`plate_scale_from_wcs`.
.. note::
There may be very slight differences between the coordinates converted
directly from the WCS information and the coordinates converted by
PlanetMapper.
Args:
suppress_warnings: Hide warnings produced by astropy when calculating WCS
conversions.
validate: Run checks to ensure the WCS conversion has appropriate RA/Dec
coordinate dimensions.
use_header_offsets: If present, use the `HIERARCH NAV RA_OFFSET` and
`HIERARCH NAV DEC_OFFSET` values from the FITS headerr to adjust the
target's disc location by the specified arcsecond offsets. If these
keywords are not present or `use_header_offsets` is `False`, no
adjustment is made.
Raises:
ValueError: if no WCS information is found in the FITS header, or validation
fails.
"""
x0, y0, r0, rotation = self._get_disc_params_from_wcs(
suppress_warnings, validate, use_header_offsets
)
self.set_x0(x0)
self.set_y0(y0)
self.set_r0(r0)
self.set_rotation(rotation)
self.set_disc_method('wcs')
[docs] def position_from_wcs(self, *args, **kwargs) -> None:
"""
Set disc position `(x0, y0)` using WCS information in the observation's FITS
header.
See also :func:`disc_from_wcs`.
Args:
*args, **kwargs: See :func:`disc_from_wcs` for additional arguments.
Raises:
ValueError: if no WCS information is found in the FITS header, or validation
fails.
"""
x0, y0, r0, rotation = self._get_disc_params_from_wcs(*args, **kwargs)
self.set_x0(x0)
self.set_y0(y0)
self.set_disc_method('wcs_position')
[docs] def rotation_from_wcs(self, *args, **kwargs) -> None:
"""
Set disc rotation using WCS information in the observation's FITS header.
See also :func:`disc_from_wcs`.
Args:
*args, **kwargs: See :func:`disc_from_wcs` for additional arguments.
Raises:
ValueError: if no WCS information is found in the FITS header, or validation
fails.
"""
x0, y0, r0, rotation = self._get_disc_params_from_wcs(*args, **kwargs)
self.set_rotation(rotation)
self.set_disc_method('wcs_rotation')
[docs] def plate_scale_from_wcs(self, *args, **kwargs) -> None:
"""
Set plate scale (i.e. `r0`) using WCS information in the observation's FITS
header.
See also :func:`disc_from_wcs`.
Args:
*args, **kwargs: See :func:`disc_from_wcs` for additional arguments.
Raises:
ValueError: if no WCS information is found in the FITS header, or validation
fails.
"""
x0, y0, r0, rotation = self._get_disc_params_from_wcs(*args, **kwargs)
self.set_r0(r0)
self.set_disc_method('wcs_plate_scale')
[docs] def get_wcs_offset(self, *args, **kwargs) -> tuple[float, float, float, float]:
"""
.. warning ::
This is a beta feature and the API may change in future.
Get the difference between the current disc parameters and the disc parameters
calculated from the WCS information in the observation's FITS header.
For example, this function can be used to retrieve the cumulative offset after
adjusting the disc position: ::
# Initialise disc with parameters from WCS
observation.disc_from_wcs()
# Adjust the disc position
observation.adjust_disc_params(1, 2, 3, 4)
observation.adjust_disc_params(dx=0.1)
# Retrieve the cumulative offset
print(observation.get_wcs_offset()) # (1.1, 2.0, 3.0, 4.0)
Similarly, this function can be used to retrieve the offset after running the
GUI to fit the disc: ::
observation.run_gui()
print(observation.get_wcs_offset())
See also :func:`get_wcs_arcsec_offset`.
Args:
*args, **kwargs: See :func:`disc_from_wcs` for additional arguments.
Returns:
`(dx, dy, dr, drotation)` tuple containing the differences in disc
parameters between the current disc parameters (i.e. those returned by
:func:`BodyXY.get_disc_params`) and the disc parameters calculated from
the WCS information in the observation's FITS header. `dx` and `dy` give the
difference in the disc centre position in pixels, `dr` gives the difference
in the disc radius in pixels, and `drotation` gives the difference in the
rotation angle in degrees.
Raises:
ValueError: if no WCS information is found in the FITS header, or validation
fails.
"""
x0_wcs, y0_wcs, r0_wcs, rotation_wcs = self._get_disc_params_from_wcs(
*args, **kwargs
)
dx = self.get_x0() - x0_wcs
dy = self.get_y0() - y0_wcs
dr = self.get_r0() - r0_wcs
drotation = (self.get_rotation() - rotation_wcs) % 360
return dx, dy, dr, drotation
[docs] def get_wcs_arcsec_offset(
self, *args, check_is_position_offset_only: bool = True, **kwargs
) -> tuple[float, float]:
"""
.. warning ::
This is a beta feature and the API may change in future.
Get the offset in RA/Dec celestial coordinates between the current disc location
and the disc location calculated from the WCS information in the observation's
FITS header.
For example, this function can be used to retrieve the cumulative offset after
adjusting the disc position: ::
# Initialise disc with parameters from WCS
observation.disc_from_wcs()
# Adjust the disc position
observation.add_arcsec_offset(10, 10)
observation.add_arcsec_offset(dra_arcsec=1.23)
# Retrieve the cumulative offset
print(observation.get_wcs_arcsec_offset()) # (11.23, 10.0)
Similarly, this function can be used to retrieve the offset after running the
GUI to fit the disc: ::
observation.run_gui()
print(observation.get_wcs_arcsec_offset())
The RA/Dec offsets returned by this function are generally only meaningful if
the disc location `(x0, y0)` is the only difference between the current disc
parameters and those derived from the WCS. Therefore, by default this function
checks that the `dr` and `drotation` values returned by :func:`get_wcs_offset`
are sufficiently small to be considered a position offset only, and raises a
`ValueError` if this is not the case. This check can be disabled by setting
`check_is_position_offset_only` to `False`.
See also :func:`get_wcs_offset`.
Args:
*args, **kwargs: See :func:`disc_from_wcs` for additional arguments.
check_is_position_offset_only: If `True` (the default), check that the
`dr` and `drotation` values returned by :func:`get_wcs_offset` are
sufficiently small to be considered a position offset only. If this is
`False`, then the `dr` and `drotation` values are not checked.
Returns:
`(dra_arcsec, ddec_arcsec)` tuple containing the offsets in arcseconds in
the RA and Dec celestial coordinates between the current disc location (i.e.
those returned by :func:`BodyXY.get_disc_params`) and the disc location
calculated from the WCS information in the observation's FITS header.
Raises:
ValueError: if no WCS information is found in the FITS header, or validation
fails. A ValueError is also raised if `check_is_position_offset_only`
is `True` and the `dr` or `drotation` values returned by
:func:`get_wcs_offset` are not sufficiently small.
"""
dx, dy, dr, drotation = self.get_wcs_offset(*args, **kwargs)
if check_is_position_offset_only:
if abs(dr) > 1e-3:
raise ValueError(
f'r0 is different between WCS and observation (dr={dr})'
)
if abs((drotation + 180) % 360 - 180) > 1e-3:
# ^ modulo operation makes 359 -> -1 so -ve drotation works properly
raise ValueError(
f'rotation is different between WCS and observation (drotation={drotation})'
)
ra0, dec0 = self.xy2radec(0, 0)
ra1, dec1 = self.xy2radec(dx, dy)
dra_arcsec = (ra1 - ra0) * 3600
ddec_arcsec = (dec1 - dec0) * 3600
return dra_arcsec, ddec_arcsec
def _get_img_for_fitting(self) -> np.ndarray:
img = np.nansum(self.data, axis=0)
mask_img = np.isnan(img)
img[mask_img] = np.nanmin(img) # Mask nan values for com calculation etc.
return img
[docs] def fit_disc_position(self) -> None:
"""
Automatically find and set `x0` and `y0` so that the planet's disc is fit to the
brightest part of the data.
"""
threshold_img = self._get_img_for_fitting()
threshold = 0.5 * sum(
[ # type: ignore
np.percentile(threshold_img, 5),
np.percentile(threshold_img, 95),
]
)
threshold_img[np.where(threshold_img <= threshold)] = 0
threshold_img[np.where(threshold_img > threshold)] = 1
x0, y0 = np.array(scipy.ndimage.center_of_mass(threshold_img))[::-1]
self.set_x0(x0)
self.set_y0(y0)
self.set_disc_method('fit_position')
[docs] def fit_disc_radius(self) -> None:
"""
Automatically find and set `r0` using aperture photometry.
This routine calculates the brightness in concentric annular apertures around
`(x0, y0)` and sets `r0` as the radius where the brightness decreases the
fastest. Note that this uses circular apertures, so will be less reliable for
targets with greater flattening and may not work well for targets which are not
entirely in the image frame.
Raises:
ValueError: if `x0` or `y0` are not within the image frame.
"""
if not self._xy_in_image_frame(self.get_x0(), self.get_y0()):
raise ValueError(
'x0 and y0 must be within the image frame to fit the radius'
)
img = self._get_img_for_fitting()
centroid = np.array([self.get_x0(), self.get_y0()])
r_ceil = max(int(min(*centroid, *(img.shape - centroid))), 2)
if r_ceil > 100:
r_list = np.linspace(1, r_ceil + 1, 100)
else:
r_list = np.array(range(1, r_ceil + 1))
apertures = [photutils.aperture.CircularAperture(centroid, r) for r in r_list]
val_list = []
for aperture in apertures:
table = photutils.aperture.aperture_photometry(img, aperture)
aperture_sum = float(table['aperture_sum']) # type: ignore
val_list.append(aperture_sum / aperture.area)
val_list = np.array(val_list)
r_list = r_list[1:] - 0.5 * (
r_list[1] - r_list[0]
) # Get radii corresponding to dv
dv_list = np.diff(val_list)
r0 = r_list[dv_list.argmin()]
self.set_r0(r0)
self.set_disc_method('fit_r0')
# Mapping
[docs] def get_mapped_data(
self,
interpolation: (
Literal['nearest', 'linear', 'quadratic', 'cubic'] | int | tuple[int, int]
) = 'linear',
*,
spline_smoothing: float = 0,
**map_kwargs: Unpack[MapKwargs],
) -> np.ndarray:
"""
Projects the observed :attr:`data` onto a map. See
:func:`BodyXY.generate_map_coordinates` for details about customising the
projection used.
For larger datasets, it can take some time to map every wavelength. Therefore,
the mapped data is automatically cached (in a similar way to backplanes - see
:class:`BodyXY`) so that subsequent calls to this function do not have to
recompute the mapped data. As with cached backplanes, the cached mapped data is
automatically cleared if any disc parameters are changed (i.e. you shouldn't
need to worry about the cache, it all happens 'magically' behind the scenes).
Args:
interpolation: Interpolation used when mapping. This can either any of
`'nearest'`, `'linear'`, `'quadratic'` or `'cubic'`. Passed to
:func:`BodyXY.map_img`.
spline_smoothing: Passed to :func:`BodyXY.map_img`.
**map_kwargs: Additional arguments are passed to
:func:`BodyXY.generate_map_coordinates` to specify and customise the map
projection.
Returns:
Array containing cube of maped of the values in `img` at each location on
the surface of the target body. Locations which are not visible or outside
the projection domain have a value of NaN.
"""
# Return a copy so that the cached value isn't tainted by any modifications
return self._get_mapped_data(
interpolation=interpolation, spline_smoothing=spline_smoothing, **map_kwargs
).copy()
@_cache_clearable_result
@progress_decorator
def _get_mapped_data(
self,
*,
interpolation: (
Literal['nearest', 'linear', 'quadratic', 'cubic'] | int | tuple[int, int]
),
spline_smoothing: float,
**map_kwargs: Unpack[MapKwargs],
):
projected = []
if interpolation == 'linear' and np.any(np.isnan(self.data)):
data = np.nan_to_num(self.data)
print(
'Warning, data contains NaN values which will be set to 0 before interpolating'
)
else:
data = self.data
for idx, img in enumerate(data):
self._update_progress_hook(idx / len(data))
projected.append(
self.map_img(
img,
spline_smoothing=spline_smoothing,
interpolation=interpolation,
**map_kwargs,
)
)
return np.array(projected)
# Output
@classmethod
def _make_fits_kw(cls, keyword: str) -> str:
return f'HIERARCH {cls.FITS_KEYWORD} {keyword}'
[docs] def make_filename(
self, extension: str = '.fits', prefix: str = '', suffix: str = ''
) -> str:
"""
Automatically generates a useful filename from the target name and date of the
observation, e.g. `'JUPITER_2000-01-01T123456.fits'`.
Args:
extension: Optionally specify the file extension to add to the filename.
prefix: Optionally specify filename prefix.
suffix: Optionally specify filename suffix.
Returns:
Filename built from the target name and observation date.
"""
return '{prefix}{target}_{date}{suffix}{extension}'.format(
prefix=prefix,
target=self.target,
date=self.dtm.strftime('%Y-%m-%dT%H%M%S'),
extension=extension,
suffix=suffix,
)
[docs] @progress_decorator
def save_observation(
self,
path: str,
*,
include_wireframe: bool = True,
wireframe_kwargs: dict[str, Any] | None = None,
show_progress: bool = False,
print_info: bool = True,
) -> None:
"""
Save a FITS file containing the observed data and generated backplanes.
The primary HDU in the FITS file will be the :attr:`data` and :attr:`header`
of the observed data, with appropriate metadata automatically added to the
header by :func:`add_header_metadata`. The backplanes are generated from all the
registered backplanes in :attr:`BodyXY.backplanes` and are saved as additional
HDUs in the FITS file.
For larger image sizes, the backplane generation can be slow, so this function
may take some time to complete.
Args:
path: Filepath of output file.
include_wireframe: Toggle generating and saving wireframe overlay image as
an additional backplane of the output FITS file. The wireframe is
generated by :func:`BodyXY.get_wireframe_overlay_img`.
wireframe_kwargs: Dictionary of keyword arguments passed to
:func:`BodyXY.get_wireframe_overlay_img` to customise the wireframe
overlay.
show_progress: Display a progress bar rather than printing progress info.
This does not have an effect if `show_progress=True` was set when
creating this `Observation`.
print_info: Toggle printing of progress information (defaults to `True`).
"""
if show_progress and self._get_progress_hook() is None:
print_info = False
self._set_progress_hook(SaveNavProgressHookCLI())
else:
show_progress = False
if print_info:
print('Saving observation to', path)
progress_max = 10 + len(self.backplanes)
with utils.filter_fits_comment_warning():
data = self.data
header = self.header.copy()
self._update_progress_hook(1 / progress_max)
self.add_header_metadata(header)
hdul = fits.HDUList([fits.PrimaryHDU(data=data, header=header)])
for bp_idx, (name, backplane) in enumerate(self.backplanes.items()):
self._update_progress_hook((bp_idx + 1) / progress_max)
if print_info:
print(' Creating backplane:', name)
img = backplane.get_img()
header = fits.Header([('ABOUT', backplane.description)])
header.add_comment('Backplane generated by PlanetMapper software.')
hdu = fits.ImageHDU(data=img, header=header, name=name)
hdul.append(hdu)
if include_wireframe:
if print_info:
print(' Creating wireframe...')
wireframe = self.get_wireframe_overlay_img(**wireframe_kwargs or {})
header = fits.Header([('ABOUT', 'Wireframe image overlay')])
header.add_comment(
'Wireframe overlay generated by PlanetMapper software.'
)
hdu = fits.ImageHDU(data=wireframe, header=header, name='WIREFRAME')
hdul.append(hdu)
if print_info:
print(' Saving file...')
utils.check_path(path)
hdul.writeto(path, overwrite=True, output_verify='warn')
if print_info:
print('File saved')
if show_progress:
self._update_progress_hook(1)
self._remove_progress_hook()
[docs] @progress_decorator
def save_mapped_observation(
self,
path: str,
*,
interpolation: (
Literal['nearest', 'linear', 'quadratic', 'cubic'] | int | tuple[int, int]
) = 'linear',
spline_smoothing: float = 0,
include_backplanes: bool = True,
include_wireframe: bool = True,
wireframe_kwargs: dict[str, Any] | None = None,
show_progress: bool = False,
print_info: bool = True,
**map_kwargs: Unpack[MapKwargs],
) -> None:
"""
Save a FITS file containing the mapped observation in a cylindrical projection.
The mapped data is generated using :func:`get_mapped_data`, and mapped backplane
data is saved by default.
For larger image sizes, the map projection and backplane generation can be slow,
so this function may take some time to complete.
Args:
path: Filepath of output file.
interpolation: Interpolation used when mapping. This can either any of
`'nearest'`, `'linear'`, `'quadratic'` or `'cubic'`. Passed to
:func:`BodyXY.map_img`.
spline_smoothing: Passed to :func:`BodyXY.map_img`.
include_backplanes: Toggle generating and saving backplanes to output FITS
file.
include_wireframe: Toggle generating and saving wireframe overlay map as an
additional backplane of the output FITS file. The wireframe is generated
by :func:`BodyXY.get_wireframe_overlay_map`.
wireframe_kwargs: Dictionary of keyword arguments passed to
:func:`BodyXY.get_wireframe_overlay_map` to customise the wireframe
overlay.
show_progress: Display a progress bar rather than printing progress info.
This does not have an effect if `show_progress=True` was set when
creating this `Observation`.
print_info: Toggle printing of progress information (defaults to `True`).
**map_kwargs: Additional arguments are passed to
:func:`BodyXY.generate_map_coordinates` to specify and customise the map
projection.
"""
if show_progress and self._get_progress_hook() is None:
print_info = False
self._set_progress_hook(SaveMapProgressHookCLI(len(self.data)))
else:
show_progress = False
if print_info:
print('Saving map to', path)
progress_max = 15 + (len(self.backplanes) if include_backplanes else 0)
with utils.filter_fits_comment_warning():
if print_info:
print(' Projecting mapped data...')
data = self.get_mapped_data(
interpolation=interpolation,
spline_smoothing=spline_smoothing,
**map_kwargs,
)
header = self.header.copy()
self._update_progress_hook(1 / progress_max)
self.add_header_metadata(header)
self._add_map_header_metadata(
header,
interpolation=interpolation,
spline_smoothing=spline_smoothing,
**map_kwargs,
)
self._add_map_wcs_to_header(header, **map_kwargs)
hdul = fits.HDUList([fits.PrimaryHDU(data=data, header=header)])
if include_backplanes:
for bp_idx, (name, backplane) in enumerate(self.backplanes.items()):
self._update_progress_hook((bp_idx + 1) / progress_max)
if print_info:
print(' Creating backplane:', name)
img = backplane.get_map(**map_kwargs)
header = fits.Header([('ABOUT', backplane.description)])
header.add_comment('Backplane generated by PlanetMapper software.')
self._add_map_wcs_to_header(header, **map_kwargs)
hdu = fits.ImageHDU(data=img, header=header, name=name)
hdul.append(hdu)
if include_wireframe:
if print_info:
print(' Creating wireframe...')
wireframe = self.get_wireframe_overlay_map(
**wireframe_kwargs or {}, # type: ignore
**map_kwargs,
)
header = fits.Header([('ABOUT', 'Wireframe map overlay')])
header.add_comment(
'Wireframe overlay generated by PlanetMapper software.'
)
hdu = fits.ImageHDU(data=wireframe, header=header, name='WIREFRAME')
hdul.append(hdu)
if print_info:
print(' Saving file...')
utils.check_path(path)
hdul.writeto(path, overwrite=True, output_verify='warn')
if print_info:
print('File saved')
if show_progress:
self._update_progress_hook(1)
self._remove_progress_hook()
def _add_map_header_metadata(
self,
header: fits.Header,
*,
interpolation: str | int | tuple[int, int],
spline_smoothing: float,
**map_kwargs: Unpack[MapKwargs],
):
lons, lats, xx, yy, transformer, info = self.generate_map_coordinates(
**map_kwargs
)
if isinstance(interpolation, tuple):
interpolation = str(interpolation)
self.append_to_header(
'MAP INTERPOLATION',
interpolation,
'Interpolation method used in mapping.',
header=header,
)
if interpolation != 'nearest':
self.append_to_header(
'MAP SPLINE-SMOOTHING',
spline_smoothing,
'Interpolation spline smoothing factor used in mapping.',
header=header,
)
self.append_to_header(
'MAP PROJECTION',
info['projection'],
'Projection used for mapping.',
header=header,
)
try:
self.append_to_header(
'MAP DEGREE-INTERVAL',
info['degree_interval'],
'[deg] Degree interval in output map.',
header=header,
)
except KeyError:
pass
try:
self.append_to_header(
'MAP LON',
info['lon'],
'Central longitude of map projection.',
header=header,
)
except KeyError:
pass
try:
self.append_to_header(
'MAP LAT',
info['lat'],
'Central latitude of map projection.',
header=header,
)
except KeyError:
pass
try:
self.append_to_header(
'MAP SIZE',
info['size'],
'Size of output map.',
header=header,
)
except KeyError:
pass
def _add_map_wcs_to_header(
self,
header: fits.Header,
**map_kwargs: Unpack[MapKwargs],
) -> None:
lons, lats, xx, yy, transformer, info = self.generate_map_coordinates(
**map_kwargs
)
if info['projection'] == 'rectangular':
# Add new values
header['CTYPE1'] = 'Planetographic longitude, positive {}'.format(
self.positive_longitude_direction
)
header['CUNIT1'] = 'deg'
header['CRPIX1'] = 1
header['CRVAL1'] = lons[0][0]
header['CDELT1'] = lons[0][1] - lons[0][0]
header['CTYPE2'] = 'Planetographic latitude'
header['CUNIT2'] = 'deg'
header['CRPIX2'] = 1
header['CRVAL2'] = lats[0][0]
header['CDELT2'] = lats[1][0] - lats[0][0]
else:
# Remove values which correspond to previous projection
for n in ['1', '2']:
for key in [
f'CTYPE{n}',
f'CUNIT{n}',
f'CRPIX{n}',
f'CRVAL{n}',
f'CDELT{n}',
]:
header.remove(key, ignore_missing=True, remove_all=True)
# Remove values which correspond to previous projection
for a in ['1', '2']:
for b in ['1', '2', '3']:
for key in [f'PC{a}_{b}', f'PC{b}_{a}', f'CD{a}_{b}', f'CD{b}_{a}']:
header.remove(key, ignore_missing=True, remove_all=True)
[docs] def run_gui(self) -> list[tuple[float, float]]:
"""
Run an interactive GUI to display and adjust the fitted observation.
This modifies the :class:`Observation` object in-place, so can be used within a
script to e.g. interactively fit the planet's disc. Simply run the GUI, adjust
the parameters until the disc is fit, then close the GUI and the
:class:`Observation` object will have your new values: ::
# Load in some data
observation = planetmapper.Observation('exciting_data.fits')
# Use the GUI to manually fit the disc and set the x0,y0,r0,rotation values
observation.run_gui()
# At this point, you can use the manually fitted observation
observation.plot_wireframe_xy()
.. hint ::
Once you have manually fitted the disc, you can simply close the user
interface window and the disc parameters will be updated to the new values.
This means that you don't need to click the :guilabel:`Save...` button
unless you specifically want to save a navigated file to disk.
The return value can also be used to interactively select a locations:::
observation = planetmapper.Observation('exciting_data.fits')
clicks = observation.run_gui()
ax = observation.plot_wireframe_radec()
for x, y in clicks:
ra, dec = observation.xy2radec()
ax.scatter(ra, dec)
See the :ref:`graphical user interface tutorial <gui examples>` for more details
about the GUI.
.. note ::
The :guilabel:`Open...` button is hidden for user interfaces created by
this method to ensure that only one :class:`Observation` object is modified
by the user interface.
If you want the full user interface functionality instead, then call
`planetmapper` from the command line or create and run a user interface
manually using :func:`planetmapper.gui.GUI.run`.
Returns:
List of `(x, y)` pixel coordinate tuples corresponding to where the user
clicked on the plot window to mark a location.
"""
# pylint: disable=cyclic-import
from .gui import GUI # Prevent circular imports
gui = GUI(allow_open=False)
gui.set_observation(self)
gui.run()
return gui.click_locations
def _try_get_header_value(
kw: dict,
header: fits.Header,
kw_key: str,
header_keys: list[str],
value_fn: Callable[[Any], Any] | None = None,
) -> bool:
if value_fn is None:
value_fn = lambda x: x
if kw_key not in kw:
for hk in header_keys:
try:
kw[kw_key] = value_fn(header[hk])
return True
except KeyError:
pass
return False