"""
@author: raphael.bacher@gipsa-lab.fr
"""
import logging
import numpy as np
from scipy import ndimage
from scipy.interpolate import interp1d
from scipy.signal import fftconvolve
from skimage.measure import regionprops
import astropy.units as u
from astropy.table import Table, vstack
from photutils.psf import TopHatWindow, create_matching_kernel
__all__ = ('generatePSF_HST', 'generateMoffatIm', 'extractHST')
def get_fig_ax(ax=None):
if ax is None:
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
return ax
def cmap(max_label, background_color='#000000', random_state=None):
"""
A matplotlib colormap consisting of random (muted) colors, taken from
photutils SegmentationImage.cmap.
"""
from matplotlib import colors
try:
from photutils.utils.colormaps import make_random_cmap
except ImportError:
from photutils.utils.colormaps import random_cmap as make_random_cmap
cmap = make_random_cmap(max_label + 1, random_state=random_state)
if background_color is not None:
cmap.colors[0] = colors.hex2color(background_color)
return cmap
def block_sum(ar, fact):
"""
Subsample a matrix *ar* by a integer factor *fact* using sums.
"""
sx, sy = ar.shape
X, Y = np.ogrid[0:sx, 0:sy]
regions = sy // fact[1] * (X // fact[0]) + Y // fact[1]
res = ndimage.sum(ar, labels=regions, index=np.arange(regions.max() + 1))
res.shape = (sx // fact[0], sy // fact[1])
return res
[docs]def generatePSF_HST(alphaHST, betaHST, shape=(375, 375), shapeMUSE=(25, 25)):
"""
Generate PSF HST at MUSE resolution with Moffat model.
To increase precision (as this PSF is sharp) the construction is made on
a larger array then subsampled.
"""
PSF_HST_HR = generateMoffatIm(shape=shape,
center=(shape[0] // 2, shape[1] // 2),
alpha=alphaHST, beta=betaHST, dim=None)
factor = (shape[0] // shapeMUSE[0], shape[1] // shapeMUSE[1])
PSF_HST = block_sum(PSF_HST_HR, (factor[0], factor[1]))
PSF_HST[PSF_HST < 0.001] = 0
PSF_HST /= np.sum(PSF_HST)
return PSF_HST
def getMainSupport(u, alpha=0.999):
"""Get mask containing a fraction alpha of total map intensity."""
mask = np.zeros_like(u, dtype=bool)
for i, row in enumerate(u):
s = np.cumsum(np.sort(row)[::-1])
keepNumber = np.sum(s < alpha * s[-1])
mask[i, np.argsort(row, axis=None)[-keepNumber:]] = True
return mask
def load_filter(filterfile, x):
"""Resample response of HST filter on the spectral grid of MUSE
Parameters
----------
filterfile: str
Response filter file containing a 2d array (wavelength,amplitude).
x : ndarray
Wavelengths.
"""
filt = np.loadtxt(filterfile)
f = interp1d(filt[:, 0], filt[:, 1], fill_value=0, bounds_error=False)
return np.array(f(x))
def Moffat(r, alpha, beta, normalize=False):
"""Compute Moffat values for array of distances *r* and Moffat
parameters *alpha* and *beta*.
"""
arr = (beta - 1) / (np.pi * alpha ** 2) * (1 + (r / alpha) ** 2) ** (-beta)
if normalize:
arr /= np.sum(arr)
return arr
[docs]def generateMoffatIm(center=(12, 12), shape=(25, 25), alpha=2, beta=2.5,
dx=0., dy=0., dim="MUSE"):
"""
Generate Moffat FSF image
By default alpha is supposed to be given in arsec, if not it is given in
MUSE pixel. a,b allow to decenter slightly the Moffat image.
"""
ind = np.indices(shape)
r = np.sqrt(((ind[0] - center[0] + dx) ** 2 +
(ind[1] - center[1] + dy) ** 2))
if dim == "MUSE":
r = r * 0.2
elif dim == "HST":
r = r * 0.03
return Moffat(r, alpha, beta, normalize=True)
def convertIntensityMap(intensityMap, muse, hst, fwhm, beta, imPSFMUSE):
"""
Parameters
----------
intensityMap : `ndarray`
The matrix of intensity maps (one row per object) at HST resolution.
muse : `mpdaf.obj.Image` of `mpdaf.obj.Cube`
The MUSE image or cube to use as the template for the HST image.
hst : `mpdaf.obj.Image`
The HST image to be resampled.
fwhm : `float`
FWHM of MUSE FSF.
beta : `float`
Moffat beta parameter of MUSE FSF.
imPSFMUSE : ndarray
Transfer kernel.
Returns
-------
intensityMapMuse : `ndarray`
The matrix of intensity maps (one row per object) at MUSE resolution
(and convolved by MUSE FSF or HST-MUSE transfert function)
"""
intensityMapMuse = np.zeros((intensityMap.shape[0], muse.data.size))
hst_ref = hst.copy()
imPSFMUSE = imPSFMUSE / np.sum(imPSFMUSE)
for i in range(intensityMap.shape[0]):
hst_ref.data = intensityMap[i].reshape(hst_ref.shape)
hst_ref.data = fftconvolve(hst_ref.data, imPSFMUSE, mode="same")
hst_ref_muse = hst_ref.align_with_image(
muse, cutoff=0.0, flux=True, inplace=False, antialias=False)
hst_ref_muse = rescale_hst_like_muse(hst_ref_muse, muse, inplace=True)
hst_ref_muse.mask[:] = False
intensityMapMuse[i] = hst_ref_muse.data.flatten()
return intensityMapMuse
def rescale_hst_like_muse(hst, muse, inplace=True):
"""Rescale an HST image to have the same flux units as a given MUSE image.
Parameters
----------
hst : `mpdaf.obj.Image`
The HST image to be resampled.
muse : `mpdaf.obj.Image` or `mpdaf.obj.Cube`
A MUSE image or cube with the target flux units.
inplace : bool
If True, replace the contents of the input HST image.
Returns
-------
out : `mpdaf.obj.Image`
The rescaled HST image.
"""
if not inplace:
hst = hst.copy()
# Calculate the calibration factor needed to convert from
# electrons/s in the HST image to MUSE flux-density units.
cal = (hst.primary_header['photflam'] *
u.Unit("erg cm-2 s-1 Angstrom-1")).to(muse.unit)
# Rescale the HST image to have the same units as the MUSE image.
hst.data *= cal.value
if hst.var is not None:
hst.var *= cal.value ** 2
hst.unit = muse.unit
return hst
def getBlurKernel(imHR, imLR, sizeKer, returnImBlurred=False, cut=0.00001):
"""
Compute convolution kernel between two images (typically one from HST
and one from MUSE).
Parameters:
-----------
imLR,imHR - blurred and sharp images respectively
szKer - 2 element vector specifying the size of the required kernel
Returns:
-------
kernel - the recovered kernel,
imLRsynth - the sharp image convolved with the recovered kernel
"""
window = TopHatWindow(0.35)
kernel = create_matching_kernel(imHR, imLR, window=window)
imLRsynth = fftconvolve(imHR, kernel, "same")
residuals = np.sum((imLR - imLRsynth) ** 2)
if residuals > 0.1 * np.sum(imLR ** 2):
print("Warning : residuals are strong, maybe the linear inversion "
"is not constrained enough.")
print(residuals, np.sum(imLR ** 2))
kernel[kernel < cut] = 0.
if returnImBlurred:
return kernel, imLRsynth
else:
return kernel
def calcMainKernelTransfert(params, imHST):
"""Build the transfert kernel between an HR kernel and LR kernel."""
# get parameters
fsf_beta_muse = params.fsf_beta_muse
fwhm_muse = params.fwhm_muse
fwhm_hst = params.fwhm_hst
beta_hst = params.beta_hst
dy, dx = imHST.get_step(unit=u.arcsec)
shape = np.array(imHST.shape)
# get odd shape
shape_1 = shape // 2 * 2 + 1
center = shape_1 // 2
# Build "distances to center" matrix.
ind = np.indices(shape_1)
rsq = ((ind[0] - center[0]) * dx)**2 + (((ind[1] - center[1])) * dy)**2
# Build HST FSF
asq_hst = fwhm_hst**2 / 4.0 / (2.0**(1.0 / beta_hst) - 1.0)
psf_hst = 1.0 / (1.0 + rsq / asq_hst)**beta_hst
psf_hst /= psf_hst.sum()
# Build MUSE FSF
asq = fwhm_muse**2 / 4.0 / (2.0**(1.0 / fsf_beta_muse) - 1.0)
im_muse = 1.0 / (1.0 + rsq / asq)**fsf_beta_muse
im_muse /= im_muse.sum()
return getBlurKernel(imHR=psf_hst, imLR=im_muse, sizeKer=(21, 21))
def createIntensityMap(imHR, segmap, imLR, kernel_transfert, params):
"""Create intensity maps from HST images and segmentation map.
Parameters
----------
imHR : `mpdaf.obj.Image`
the reference high resolution image
segmap : `mpdaf.obj.Image`
the segmentation map at high resolution
imLR : `mpdaf.obj.Image`
image at the targeted low resolution
kernel_transfert : ndarray
convolution kernel from imHR to imLR
params : `odhin.Params`
additional parameters
"""
intensityMapHR = np.zeros(imHR.shape)
# avoid negative abundances
sel = segmap.data > 0
intensityMapHR[sel] = np.maximum(imHR.data[sel], 10**(-9))
intensityMapLRConvol = convertIntensityMap(
intensityMapHR[None, :], imLR, imHR, params.fwhm_muse,
params.fsf_beta_muse, kernel_transfert
).reshape(imLR.shape)
intensityMapLRConvol[imLR.mask] = 0
return intensityMapLRConvol
def check_segmap_catalog(segmap, cat, idname='ID'):
"""Check that segmap and catalog are consistent and fix if needed.
Avoid discrepancy between the catalog and the segmentation map
(there are some segmap objects missing from the Rafelski 2015 catalog). If
some sources are found in the segmap but are missing in the catalog, then
additional rows with new IDs are added to the catalog.
"""
keys = np.unique(segmap.data.data)
keys = keys[keys > 0]
missing = keys[~np.in1d(keys, cat[idname])]
if missing.size > 0:
logger = logging.getLogger(__name__)
logger.warning('found %d sources in segmap that are missing in the '
'catalog (ID: %s), adding them to the catalog',
missing.size, missing)
regions = {reg.label: reg
for reg in regionprops(segmap._data, cache=False)}
coords = [regions[i].centroid for i in missing]
dec, ra = segmap.wcs.pix2sky(coords).T
cat2 = Table([missing, ra, dec], names=('ID', 'RA', 'DEC'))
cat2['MISSING_SOURCE'] = True
cat = vstack([cat, cat2])
cat.add_index(idname)
return cat