# SPDX-FileCopyrightText: 2021 Division of Intelligent Medical Systems, DKFZ
# SPDX-FileCopyrightText: 2021 Janek Groehl
# SPDX-License-Identifier: MIT
from simpa.utils import Tags
from simpa.io_handling import save_data_field
from simpa.core.processing_components.multispectral import MultispectralProcessingAlgorithm
from simpa.utils.libraries.spectrum_library import Spectrum
import numpy as np
import scipy.linalg as linalg
from scipy.optimize import nnls
[docs]class LinearUnmixing(MultispectralProcessingAlgorithm):
"""
Performs linear spectral unmixing (LU) using Fast Linear Unmixing for PhotoAcoustic Imaging (FLUPAI)
on the defined data field for each chromophore specified in the component settings.
If tag LINEAR_UNMIXING_NON_NEGATIVE is set to True non-negative linear unmixing is performed, which solves the
KKT (Karush-Kuhn-Tucker) conditions for the non-negative least squares problem.
This component saves a dictionary containing the chromophore concentrations and corresponding wavelengths for
each chromophore. If the tag LINEAR_UNMIXING_COMPUTE_SO2 is set True the blood oxygen saturation
is saved as well, however, this is only possible if the chromophores oxy- and deoxyhemoglobin are specified.
IMPORTANT:
Linear unmixing should only be performed with at least two wavelengths:
e.g. Tags.WAVELENGTHS: [750, 800]
Parameters:
Tags.DATA_FIELD (required)
Tags.LINEAR_UNMIXING_SPECTRA (required)
Tags.WAVELENGTHS (default: None, if None, then settings[Tags.WAVELENGTHS] will be used.)
Tags.LINEAR_UNMIXING_COMPUTE_SO2 (default: False)
Tags.LINEAR_UNMIXING_NON_NEGATIVE (default: False)
global_settings (required)
component_settings_key (required)
"""
def __init__(self, global_settings, component_settings_key: str):
super(LinearUnmixing, self).__init__(global_settings=global_settings,
component_settings_key=component_settings_key)
self.chromophore_spectra_dict = {} # dictionary containing the spectrum for each chromophore and wavelength
self.absorption_matrix = [] # endmember matrix needed in LU
self.pseudo_inverse_absorption_matrix = []
self.chromophore_concentrations = [] # list of LU results
self.chromophore_concentrations_dict = {} # dictionary of LU results
self.wavelengths = [] # list of wavelengths
[docs] def run(self):
self.logger.info("Performing linear spectral unmixing...")
if Tags.WAVELENGTHS in self.component_settings:
self.wavelengths = self.component_settings[Tags.WAVELENGTHS]
else:
if Tags.WAVELENGTHS in self.global_settings:
self.wavelengths = self.global_settings[Tags.WAVELENGTHS]
else:
msg = "Was not able to get wavelengths from component_settings or global_settings."
self.logger.critical(msg)
raise AssertionError(msg)
if len(self.wavelengths) < 2:
msg = "Linear unmixing should be performed with at least two wavelengths!"
self.logger.critical(msg)
raise AssertionError(msg)
# Build internal list of spectra based on Tags.LINEAR_UNMIXING_SPECTRA
self.build_chromophore_spectra_dict()
# check if absorption dictionary contains any spectra
if self.chromophore_spectra_dict == {}:
raise KeyError("Linear unmixing must be performed for at least one chromophore. "
"Please specify at least one chromophore in the component settings by setting "
"the corresponding tag.")
# check if non-negative contraint should be used for linear unmixing
non_negative = False
if Tags.LINEAR_UNMIXING_NON_NEGATIVE in self.component_settings:
non_negative = Tags.LINEAR_UNMIXING_NON_NEGATIVE
# create the absorption matrix needed by FLUPAI
# the matrix should have the shape [#global wavelengths, #chromophores]
self.absorption_matrix = self.create_absorption_matrix()
self.logger.debug(f"The absorption matrix has shape {np.shape(self.absorption_matrix)}.")
# perform fast linear unmixing FLUPAI
# the result saved in self.chromophore_concentrations is a list with the unmixed images
# containing the chromophore concentration
self.chromophore_concentrations = self.flupai(non_negative=non_negative)
self.logger.debug(f"The unmixing result has shape {np.shape(self.chromophore_concentrations)}.")
# split results to create dictionary which contains linear unmixing result for each chromophore
for index, chromophore in enumerate(self.chromophore_spectra_dict.keys()):
self.chromophore_concentrations_dict[chromophore] = self.chromophore_concentrations[index]
self.logger.info(f"The chromophore concentration was computed for chromophores: "
f"{self.chromophore_concentrations_dict.keys()}")
# compute blood oxygen saturation if selected
save_dict = {
"chromophore_concentrations": self.chromophore_concentrations_dict,
"wavelengths": self.wavelengths
}
if Tags.LINEAR_UNMIXING_COMPUTE_SO2 in self.component_settings:
if self.component_settings[Tags.LINEAR_UNMIXING_COMPUTE_SO2]:
self.logger.info("Blood oxygen saturation is calculated and saved.")
save_dict["sO2"] = self.calculate_sO2()
# save linear unmixing result in hdf5
save_data_field(save_dict, self.global_settings[Tags.SIMPA_OUTPUT_FILE_PATH],
Tags.LINEAR_UNMIXING_RESULT, wavelength=None)
self.logger.info("Performing linear spectral unmixing......[Done]")
[docs] def build_chromophore_spectra_dict(self):
"""
This function builds the absorption spectra dictionary for each chromophore using SIMPAs spectral library
and saves the result in self.chromophore_spectra_dict.
This function might have to change drastically if the design of the spectral library changes in the future!
"""
if Tags.LINEAR_UNMIXING_SPECTRA in self.component_settings:
spectra = self.component_settings[Tags.LINEAR_UNMIXING_SPECTRA]
if len(spectra) < 2:
raise AssertionError(f"Need at least two endmembers for unmixing! You provided {len(spectra)}.")
for spectrum in spectra:
self.create_chromophore_spectra_entry(spectrum)
else:
raise AssertionError("Tried to unmix without spectra definitions. Make sure that the"
" Tags.LINEAR_UNMIXING_SPECTRA tag is set in the linear unmixing settings.")
[docs] def create_chromophore_spectra_entry(self, spectrum: Spectrum):
"""
This function builds the spectra for a chromophore specified by tag and name and saves it in
self.chromophore_spectra_dict and creates a dictionary containing the corresponding wavelengths.
The name must match the ones used in the spectral library of SIMPA.
"""
self.chromophore_spectra_dict[spectrum.spectrum_name] = [spectrum.get_value_for_wavelength(wavelength)
for wavelength in self.wavelengths]
[docs] def create_absorption_matrix(self) -> np.ndarray:
"""
Method that returns the absorption (endmember) matrix needed for linear unmixing.
:return: absorption matrix
"""
numberWavelengths = len(self.wavelengths)
numberChromophores = len(self.chromophore_spectra_dict.keys())
# prepare matrix
endmemberMatrix = np.zeros((numberWavelengths, numberChromophores))
# write absorption data for each chromophore and the corresponding wavelength into an array (matrix)
for index, key in enumerate(self.chromophore_spectra_dict.keys()):
for wave in range(numberWavelengths):
endmemberMatrix[wave][index] = self.chromophore_spectra_dict[key][wave]
return endmemberMatrix
[docs] def flupai(self, non_negative=False) -> list:
"""
Fast Linear Unmixing for PhotoAcoustic Imaging (FLUPAI) is based on
SVD decomposition with a pseudo inverse, which is equivalent to a least squares
ansatz for linear spectral unmixing of multi-spectral photoacoustic images.
:return: list with unmixed images containing the chromophore concentration.
:raise: SystemExit.
"""
# reshape image data to [number of wavelength, number of pixel]
dims_raw = np.shape(self.data)
try:
reshapedData = np.reshape(self.data, (dims_raw[0], -1))
except Exception:
self.logger.critical(f"FLUPAI failed probably caused by wrong input dimensions of {dims_raw}!")
raise ValueError("Reshaping of input data failed. FLUPAI expects a 4 dimensional numpy array, "
"where the first dimension represents the wavelengths and the second, third and fourth "
"dimension are representing a single wavelength PA image.")
# if non_negative is False, matmul of x = PI * b with x chromophore information,
# PI pseudo inverse with absorber information and b containing the measured pixel,
# else non-negative least squares is performed.
try:
if non_negative:
output = []
for i in range(np.shape(reshapedData)[1]):
foo, ris = nnls(np.array(self.absorption_matrix), reshapedData[:, i])
output.append(foo)
output = np.swapaxes(output, axis1=0, axis2=1)
else:
self.pseudo_inverse_absorption_matrix = linalg.pinv(self.absorption_matrix)
output = np.matmul(self.pseudo_inverse_absorption_matrix, reshapedData)
except Exception as e:
self.logger.critical(f"Matrix multiplication failed probably caused by mismatching dimensions of absorption"
f"matrix ({len(self.absorption_matrix[1])}) and "
f"input data ({dims_raw[0]})!")
print(e)
raise ValueError("Absorption matrix and input data must have matching sizes...")
# write output into list of images containing the chromophore information
numberChromophores = np.shape(output)[0]
chromophores_concentrations = []
for chromophore in range(numberChromophores):
chromophores_concentrations.append(np.reshape(output[chromophore, :], (dims_raw[1:])))
return chromophores_concentrations
[docs] def calculate_sO2(self) -> np.ndarray:
"""
Function calculates sO2 (blood oxygen saturation) values for given concentrations
of oxyhemoglobin and deoxyhemoglobin. Of course this is only possible if the concentrations of both
chromophores were calculated by this component/were specified in settings.
"""
try:
concentration_oxy = self.chromophore_concentrations_dict["Oxyhemoglobin"]
concentration_deoxy = self.chromophore_concentrations_dict["Deoxyhemoglobin"]
sO2 = concentration_oxy / (concentration_oxy + concentration_deoxy)
# if total hemoglobin is zero handle NaN by setting sO2 to zero
where_are_NaNs = np.isnan(sO2)
sO2[where_are_NaNs] = 0
return sO2
except Exception:
raise KeyError("Chromophores oxy- and/or deoxyhemoglobin were not specified in component settings, "
"so so2 cannot be calculated!")