Source code for odhin.deblend

# -*- coding: utf-8 -*-
"""
@author: raphael.bacher@gipsa-lab.fr
"""

import logging

import numpy as np
import scipy.signal as ssl
import yaml
from scipy.ndimage import median_filter

import astropy.units as units
from astropy.table import Table
from mpdaf.obj import Cube, Image, Spectrum
from mpdaf.sdetect import Source

from .parameters import Params
from .regularization import regulDeblendFunc
from .utils import (
    convertIntensityMap,
    extractHST,
    generatePSF_HST,
    getBlurKernel,
    getMainSupport,
    load_filter,
)
from .version import version as __version__

__all__ = ('Deblending', 'deblendGroup')


[docs]def deblendGroup(group, outfile, conf, imLabel, timestamp): """Deblend a given group.""" logger = logging.getLogger(__name__) logger.debug('group %d, start, %d sources', group.ID, group.nbSources) debl = Deblending(group, conf, imLabel) logger.debug('group %d, createIntensityMap', group.ID) debl.createIntensityMap() logger.debug('group %d, findSources', group.ID) debl.findSources() logger.debug('group %d, write', group.ID) debl.write(outfile, conf, timestamp) logger.debug('group %d, done', group.ID)
[docs]class Deblending: """ Main class for deblending process Parameters ---------- cube : mpdaf Cube The Cube object to be deblended nbands : int The number of MUSE spectral bins to consider for computation. MUSE FSF is assumed constant within a bin. conf : dict The settings dict. Attributes ---------- estimatedCube : numpy.ndarray Estimated cube estimatedCubeCont : numpy.ndarray Estimated cube continuum residuals : numpy.ndarray The cube of residuals (datacube - estimated cube) sources: list list of estimated spectra varSources: list list of variances of estimated spectra listIntensityMap (HR, LRConvol) : list list of Abundance Map of each object detected, at high resolution, and after convolution and subsampling """ def __init__(self, group, conf, imLabel): self.params = Params(**conf.get('params', {})) self.group = group cube = Cube(conf['cube']) self.cube = cube = cube[:, group.region.sy, group.region.sx] im = cube[0] self.imLabel = Image(data=imLabel[group.region.sy, group.region.sx], wcs=im.wcs.copy(), copy=False) self.segmap = (extractHST(Image(conf['segmap']), im, integer_mode=True) .data.filled(0)) # List of all HST ids in the segmap listHST_ID = np.unique(self.segmap) listHST_ID = listHST_ID[listHST_ID > 0] self.listHST_ID = ['bg'] + list(listHST_ID) self.nbSources = len(self.listHST_ID) # include background # spatial shapes self.nlbda = cube.shape[0] self.shapeLR = cube.shape[1:] # load the HR images and filters self.listImagesHR = [] filtResp = [] lbda = cube.wave.coord() for band, d in conf['hr_bands'].items(): imhr = extractHST(Image(d['file']), im) # store the flux conversion factor in the header imhr.primary_header['photflam'] = d.get('photflam', 1) self.listImagesHR.append(imhr) if 'filter' in d: filt = load_filter(d['filter'], lbda) else: filt = np.ones(self.nlbda) filtResp.append(filt) self.filtResp = np.array(filtResp) self.nBands = self.params.nBands # compute bands limit indices idx = np.linspace(0, self.nlbda, self.nBands + 1, dtype=int) self.idxBands = np.array([idx[:-1], idx[1:]]).T # compute FWHM at the center of bands bands_center = self.cube.wave.coord(self.idxBands.mean(axis=1)) self.listFWHM = (self.params.fsf_a_muse + self.params.fsf_b_muse * bands_center) self.PSF_HST = generatePSF_HST(self.params.alpha_hst, self.params.beta_hst) # for each HST band list all MUSE bands inside it self.listBands = self._getListBands(self.nBands, self.filtResp) def _getListBands(self, nBands, filtResp): """For each HST band, get the list of all MUSE bands inside it (if one of the band limits has filter values > 0). """ listBands = [] nl = filtResp.shape[1] lind = list(np.linspace(0, nl - 1, nBands + 1, dtype=int)) for filt in filtResp: mask = filt[lind] > 0 # check if band limits have non-zero values bands_idx = np.where(mask[:-1] | mask[1:]) listBands.append(list(bands_idx[0])) return listBands
[docs] def createIntensityMap(self): """Create intensity maps from HST images and segmentation map. To be called before calling findSources(). """ # for each HST filter, create the high resolution intensity matrix # (nbSources x Nb pixels) self.listIntensityMapHR = [] # [bands, array(sources, im.shape)] for im in self.listImagesHR: # clip image data to avoid negative abundances data = np.maximum(im.data, 10**(-9)) intensityMapHR = np.zeros((self.nbSources, np.prod(data.shape))) # put intensityMap of background in first position (estimated # spectrum of background will also be first) intensityMapHR[0] = 1 for k, hst_id in enumerate(self.listHST_ID[1:], start=1): mask = self.segmap == hst_id arr = np.where(mask, data, 0) intensityMapHR[k] = arr.ravel() self.listIntensityMapHR.append(intensityMapHR)
[docs] def findSources(self, store=False): """Main function to estimate sources spectra. Parameters ---------- store : bool store intermediate results """ regul = self.params.regul filt_w = self.params.filt_w cubeLR = self.cube.data.filled(np.ma.median(self.cube.data)) cubeLRVar = self.cube.var.filled(np.ma.median(self.cube.var)) if regul: # precompute continuum cubeLR_c = median_filter(cubeLR, size=(filt_w, 1, 1), mode='reflect') # compute HST-MUSE transfer functions for all MUSE FSF fwhm considered self.listTransferKernel = self._generateHSTMUSE_transfer_PSF() shapeLR = (self.nbSources, self.nlbda) # Lists of [HR band, Spectral band] nbImagesHR = len(self.listImagesHR) def _create_result_list(): return [[None] * self.nBands for _ in range(nbImagesHR)] tmp_sources = [] tmp_var = [] self.listIntensityMapLRConvol = _create_result_list() self.listAlphas = _create_result_list() self.listRSS = _create_result_list() self.listCorrFlux = _create_result_list() if store: self.listMask = _create_result_list() self.listccoeff = _create_result_list() self.listlcoeff = _create_result_list() self.listY = _create_result_list() self.listYc = _create_result_list() self.listYl = _create_result_list() self.spatialMask = _create_result_list() # If there are several HR images the process is applied on each image # and then the estimated spectra are combined using a mean weighted by # the response filters for j in range(nbImagesHR): tmp_sources.append(np.zeros(shapeLR)) tmp_var.append(np.zeros(shapeLR)) for i, (imin, imax) in enumerate(self.idxBands): # Do the estimation only if MUSE band is in HST band if i in self.listBands[j]: # Create intensity maps at MUSE resolution intensityMapLRConvol = convertIntensityMap( self.listIntensityMapHR[j], self.cube[0], self.listImagesHR[j], self.listFWHM[i], self.params.fsf_beta_muse, self.listTransferKernel[i] ) # truncate intensity map support after convolution using # alpha_cut supp = getMainSupport( intensityMapLRConvol[1:], alpha=self.params.alpha_cut) intensityMapLRConvol[1:][~supp] = 0 # put ones everywhere for background intensity map intensityMapLRConvol[0] = 1. # U : n x k (n number of pixels, k number of objects, # lmbda number of wavelengths) # Y : n x lmbda # Yvar : n x lmbda delta = imax - imin U = intensityMapLRConvol.T Y = cubeLR[imin:imax].reshape(delta, -1).T if regul: Y_c = cubeLR_c[imin:imax].reshape(delta, -1).T Yvar = cubeLRVar[imin:imax].reshape(delta, -1).T # normalize intensity maps in flux to get flux-calibrated # estimated spectra U /= np.sum(U, axis=0) if regul: # apply regularization # remove background from intensity matrix as # intercept is used instead U_ = U[:, 1:] # generate support: U.shape = (image size, nsources) support = np.zeros(U.shape[0], dtype=bool) for u in range(U_.shape[1]): support[U_[:, u] > 0.1 * np.max(U_[:, u])] = True # Y_sig2 = np.var(Y[~support, :], axis=0) Y_sig2 = np.mean(Yvar, axis=0) res = regulDeblendFunc(U_, Y, Y_c=Y_c, support=support, Y_sig2=Y_sig2, filt_w=filt_w) # res -> (res, intercepts, listMask, c_coeff, l_coeff, # Y, Y_l, Y_c, c_alphas, listRSS, listA) # get spectra estimation tmp_sources[j][1:, imin:imax] = res[0] # for background spectrum get intercept (multiply by # number of pixels to get tot flux) tmp_sources[j][0, imin:imax] = res[1] * U.shape[0] # store all elements for checking purposes self.listAlphas[j][i] = res[8] self.listRSS[j][i] = res[9] self.listCorrFlux[j][i] = res[10] if store: self.spatialMask[j][i] = support self.listMask[j][i] = res[2] self.listccoeff[j][i] = res[3] self.listlcoeff[j][i] = res[4] self.listY[j][i] = res[5] self.listYl[j][i] = res[6] self.listYc[j][i] = res[7] else: # use classical least squares solution tmp_sources[j][:, imin:imax] = np.linalg.lstsq(U, Y)[0] # get spectra variance : as spectra is obtained by # (U^T.U)^(-1).U^T.Y # variance of estimated spectra is obtained by # (U^T.U)^(-1).Yvar Uinv = np.linalg.pinv(U) tmp_var[j][:, imin:imax] = np.dot(Uinv**2, Yvar) self.listIntensityMapLRConvol[j][i] = intensityMapLRConvol else: self.listIntensityMapLRConvol[j][i] = np.zeros( (self.nbSources, self.shapeLR[0] * self.shapeLR[1])) self._combineSpectra(tmp_sources, tmp_var) self._rebuildCube(tmp_sources) self._getContinuumCube(tmp_sources) self._getResiduals()
def _generateHSTMUSE_transfer_PSF(self): """Generate HST to MUSE transfer PSF, for each spectral band.""" hst = self.listImagesHR[0] dy, dx = hst.get_step(unit=units.arcsec) # get odd shape shape_1 = np.array(hst.shape) // 2 * 2 + 1 center = shape_1 // 2 # Build "distances to center" matrix. ind = np.indices(shape_1) rsq = ((ind[0] - center[0]) * dx)**2 + (((ind[1] - center[1])) * dy)**2 # Build HST FSF asq_hst = self.params.fwhm_hst**2 / 4.0 / \ (2.0**(1.0 / self.params.beta_hst) - 1.0) psf_hst = 1.0 / (1.0 + rsq / asq_hst)**self.params.beta_hst psf_hst /= psf_hst.sum() # FIXME: use Moffat(rsq, asq_hst, self.params.beta_hst) ? listTransferKernel = [] for fwhm in self.listFWHM: # Build MUSE FSF asq = fwhm ** 2 / 4.0 / ( 2.0 ** (1.0 / self.params.fsf_beta_muse) - 1.0) im_muse = 1.0 / (1.0 + rsq / asq) ** self.params.fsf_beta_muse im_muse /= im_muse.sum() listTransferKernel.append(getBlurKernel( imHR=psf_hst, imLR=im_muse, sizeKer=(21, 21))) return listTransferKernel def _combineSpectra(self, tmp_sources, tmp_var): """Combine spectra estimated on each HST image.""" weigthTot = np.ma.masked_values(self.filtResp.sum(axis=0), 0) self.sources = np.sum(self.filtResp[:, None, :] * tmp_sources, axis=0) / weigthTot self.varSources = np.sum(self.filtResp[:, None, :] * tmp_var, axis=0) / weigthTot # for background, get voxel mean instead of sum self.sources[0] /= self.cube.data.size self.varSources[0] /= self.cube.data.size def _rebuildCube(self, tmp_sources): """Create the estimated cube. We have to work on each MUSE spectral bin as the spatial distribution is different on each bin. """ estimatedCube = np.zeros((self.nlbda, np.prod(self.shapeLR))) weigthTot = np.ma.masked_values(self.filtResp.sum(axis=0), 0) filtResp = self.filtResp / weigthTot for i, (imin, imax) in enumerate(self.idxBands): estim = [] for j, resp in enumerate(filtResp): arr = np.dot(tmp_sources[j][:, imin:imax].T, self.listIntensityMapLRConvol[j][i]) arr *= resp[imin:imax][:, np.newaxis] estim.append(arr) estimatedCube[imin:imax, :] = np.sum(estim, axis=0) self.estimatedCube = self.cube.clone() self.estimatedCube.data = estimatedCube.reshape(self.cube.shape) def _getResiduals(self): self.residuals = self.cube.data - self.estimatedCube.data def _getContinuumCube(self, tmp_sources, w=101): """ Build continuum cube by median filtering (much faster here as it is done on objects spectra instead of all pixel spectra) """ self.sourcesCont = ssl.medfilt(self.sources, kernel_size=(1, w)) self.tmp_sourcesCont = [ssl.medfilt(tmp_source, kernel_size=(1, w)) for tmp_source in tmp_sources] self.estimatedCubeCont = self._rebuildCube(self.tmp_sourcesCont) @property def Xi2_tot(self): return (1 / (self.residuals.size - 3) * np.sum(self.residuals**2 / self.cube.var))
[docs] def calcXi2_source(self, k): mask = self.listIntensityMapLRConvol[0][0][k].reshape(self.shapeLR) > 0 return (1 / (self.residuals[:, mask].size - 3) * np.sum(self.residuals[:, mask]**2 / self.cube.var[:, mask]))
[docs] def calcCondNumber(self, listobj=None): """Compute condition number.""" if listobj is None: mat = np.array(self.listIntensityMapLRConvol[0][0][1:]) else: mat = np.array(self.listIntensityMapLRConvol[0][0][listobj][1:]) mat /= mat.sum(axis=1)[:, None] return np.linalg.cond(mat)
[docs] def write(self, outfile, conf, timestamp): group = self.group origin = ('Odhin', __version__, self.cube.filename, self.cube.primary_header.get('CUBE_V', '')) src = Source.from_data(group.ID, group.region.ra, group.region.dec, origin=origin) idxSources = [k for k, iden in enumerate(self.listHST_ID) if iden in group.listSources] cond_number = self.calcCondNumber(idxSources) src.header['GRP_ID'] = group.ID src.header['GRP_AREA'] = group.region.area src.header['GRP_NSRC'] = group.nbSources src.header['COND_NB'] = cond_number src.header['XI2_TOT'] = self.Xi2_tot # we add a timestamp which allows to control that the sources are # consistent with a given catalog src.header['ODH_TS'] = timestamp # add spectra from objects in the blob for k, iden in enumerate(self.listHST_ID): if iden in group.listSources: sp = Spectrum(data=self.sources[k], var=self.varSources[k], wave=self.cube.wave, copy=False) src.spectra[iden] = sp # build sources table ids = [f'bg_{group.ID}' if id_ == 'bg' else id_ for id_ in self.listHST_ID] rows = [(ids[k], group.ID, self.calcXi2_source(k)) for k in idxSources] t = Table(rows=rows, names=('id', 'group_id', 'xi2')) t['group_area'] = group.region.area t['nb_sources'] = group.nbSources t['condition_number'] = cond_number t['xi2_group'] = self.Xi2_tot src.tables['sources'] = t # save cubes and images src.cubes['MUSE'] = self.cube src.cubes['FITTED'] = self.estimatedCube src.images['MUSE_WHITE'] = self.cube.mean(axis=0) src.images['FITTED'] = self.estimatedCube.mean(axis=0) src.images['LABEL'] = self.imLabel # save params src.header.add_comment('') src.header.add_comment('ODHIN PARAMETERS:') src.header.add_comment('') for line in yaml.dump(conf).splitlines(): src.header.add_comment(line) src.write(outfile)