Source code for patato.io.hdf.hdf5_interface

#  Copyright (c) Thomas Else 2023-25.
#  License: MIT

from __future__ import annotations

from typing import TYPE_CHECKING, Optional, Sequence

import h5py
import numpy as np
from ...core.image_structures.image_structure_types import IMAGE_DATA_TYPES
from ...core.image_structures.pa_time_data import PATimeSeries
from ...io.attribute_tags import (
    HDF5Tags,
    ROITags,
    UnmixingAttributeTags,
    ReconAttributeTags,
)
from ...io.hdf.fileimporter import WriterInterface, ReaderInterface
from ...utils.rois.roi_type import ROI
import json

if TYPE_CHECKING:
    import numpy as np
import dask.array as da


[docs] def renumber_group(dataset: h5py.Group) -> None: """ Renumber an old roi group. Parameters ---------- dataset : h5py.Group """ # Clean up old group: original_names = sorted(list(dataset.keys()), key=int) new_names = [str(x) for x in range(len(original_names))] for old, new in zip(original_names, new_names): dataset.move(old, new)
# Might be useful to set enviro variable: HDF5_USE_FILE_LOCKING=FALSE
[docs] def load_image_from_hdf5(cls, dataset, file): """ Parameters ---------- cls dataset file. Returns ------- """ ax_1_meaning = cls.get_ax1_label_meaning() dataset_da = da.from_array(dataset, chunks=(1,) + dataset.shape[1:]) # This is very hacky, could do with improving this. if ax_1_meaning == HDF5Tags.WAVELENGTH: ax_1_labels = dataset.attrs.get(HDF5Tags.WAVELENGTH, file["wavelengths"]) elif ax_1_meaning == "SPECTRA": ax_1_labels = dataset.attrs["SPECTRA"] elif ax_1_meaning == "": # For ultrasound ax_1_labels = np.arange(dataset_da.shape[1]) elif ax_1_meaning is None: # For dso2 etc. ax_1_labels = np.array([dataset.name.split("/")[-3]]) elif dataset_da.shape[1] == 1: ax_1_labels = np.array([dataset.name.split("/")[-3]]) else: ax_1_labels = np.arange(dataset_da.shape[1]) fov = [ dataset.attrs.get(fx, None) for fx in [ ReconAttributeTags.X_FIELD_OF_VIEW, ReconAttributeTags.Y_FIELD_OF_VIEW, ReconAttributeTags.Z_FIELD_OF_VIEW, ] ] if all([x is None for x in fov]): fov_x = dataset.attrs.get(ReconAttributeTags.OLD_FIELD_OF_VIEW, 1.0) fov = [fov_x, fov_x, fov_x] new_cls = cls( dataset_da, ax_1_labels, algorithm_id=dataset.attrs.get(UnmixingAttributeTags.SUFFIX, ""), attributes=dict(dataset.attrs), field_of_view=fov, ) if not cls.is_single_instance(): new_cls.hdf5_sub_name = dataset.name.split("/")[-2] return new_cls
[docs] class HDF5Writer(WriterInterface): def delete_images(self, image): pass
[docs] def __init__(self, file): WriterInterface.__init__(self) if type(file) == h5py.File: self.file = file else: self.file = h5py.File(file, "a") self.overwrite = False # TODO: implement this.
def set_scan_datetime(self, datetime): self.file.attrs[HDF5Tags.DATE] = str(datetime) def set_pa_data(self, raw_data): if type(raw_data) == PATimeSeries: raw_data = raw_data.raw_data if raw_data is None: return self.file.create_dataset(HDF5Tags.RAW_DATA, data=raw_data) def set_scan_name(self, scan_name: str): self.file.attrs[HDF5Tags.SCAN_NAME] = scan_name self.file[HDF5Tags.RAW_DATA].attrs[HDF5Tags.SCAN_NAME] = scan_name def set_temperature(self, temperature: "np.ndarray"): self.file.create_dataset( HDF5Tags.TEMPERATURE, data=temperature, compression="gzip" ) def set_correction_factor(self, correction_factor): self.file.create_dataset( HDF5Tags.OVERALL_CORR, data=correction_factor, compression="gzip" ) def set_scanner_z_position(self, z_position): self.file.create_dataset( HDF5Tags.Z_POSITION, data=z_position, compression="gzip" ) def set_run_numbers(self, run_numbers): self.file.create_dataset(HDF5Tags.RUN, data=run_numbers, compression="gzip") def set_repetition_numbers(self, repetition_numbers): self.file.create_dataset( HDF5Tags.REPETITION, data=repetition_numbers, compression="gzip" ) def set_scan_times(self, scan_times): self.file.create_dataset( HDF5Tags.TIMESTAMP, data=scan_times, compression="gzip" ) def set_sensor_geometry(self, sensor_geometry): self.file.create_dataset( HDF5Tags.SCAN_GEOMETRY, data=sensor_geometry, compression="gzip" ) def set_impulse_response(self, impulse_response): self.file.create_dataset( HDF5Tags.IMPULSE_RESPONSE, data=impulse_response, compression="gzip" ) def set_wavelengths(self, wavelengths): self.file.create_dataset(HDF5Tags.WAVELENGTH, data=wavelengths) def set_water_absorption(self, water_absorption, pathlength): # TODO: Maybe refactor this? water = self.file.create_dataset( HDF5Tags.WATER_ABSORPTION_COEFF, data=water_absorption, compression="gzip" ) water.attrs[HDF5Tags.WATER_PATHLENGTH] = pathlength def add_image(self, image_data): # Image data can be reconstructions, unmixed data etc. grp = self.file.require_group( image_data.get_hdf5_group_name() ) # E.g. unmixed or recons subgroup = grp.require_group( image_data.get_hdf5_sub_name() ) # E.g. reconstruction method or something name = str(len(subgroup.keys())) + image_data.algorithm_id # E.g. "0" + "E" dataset = subgroup.create_dataset(name, data=image_data.raw_data) # Copy attributes for a in image_data.attributes: if type(image_data.attributes[a]) == dict: for b in image_data.attributes[a]: attr = image_data.attributes[a][b] if type(attr) == str: attr = bytes(attr, "utf-8") try: dataset.attrs[b] = attr except TypeError: dataset.attrs[b] = json.dumps(attr) else: attr = image_data.attributes[a] if type(attr) == str: attr = bytes(attr, "utf-8") from numpy import nan, ndarray if attr is None: attr = nan if type(attr) is ndarray: if attr.dtype == "<U4": attr = [bytes(a, "utf-8") for a in attr] dataset.attrs[a] = attr def set_scan_comment(self, comment: str): self.file.attrs[HDF5Tags.SCAN_COMMENT] = comment def set_sampling_frequency(self, frequency: float): self.file[HDF5Tags.RAW_DATA].attrs[HDF5Tags.SAMPLING_FREQ] = frequency self.file.attrs[HDF5Tags.SAMPLING_FREQ] = frequency def set_segmentation(self, seg): self.file.create_dataset(HDF5Tags.SEGMENTATION, data=seg, compression="gzip") def set_speed_of_sound(self, c: float): self.file[HDF5Tags.RAW_DATA].attrs[HDF5Tags.SPEED_OF_SOUND] = c self.file.attrs[HDF5Tags.SPEED_OF_SOUND] = c
[docs] def add_roi(self, roi_data: ROI, generated: bool = False): """ Add a region of interest to the hdf5 file. Parameters ---------- roi_data : ROI generated : bool, default False """ roi_group = self.file.require_group(HDF5Tags.REGIONS_OF_INTEREST) region_group = roi_group.require_group( roi_data.attributes[ROITags.ROI_NAME] + "_" + roi_data.attributes[ROITags.ROI_POSITION] ) n = str(len(region_group)) if type(roi_data.points) in [np.ndarray, list]: dataset = region_group.create_dataset(n, data=roi_data.points) else: raise NotImplementedError("Cannot save specialist roi type (yet...).") # Add attributes for attr in roi_data.attributes: dataset.attrs[attr] = roi_data.attributes[attr] dataset.attrs[ROITags.GENERATED_ROI] = generated
[docs] def rename_roi(self, old_name, new_name, new_position): if type(old_name) == str: old_name = tuple(old_name.split("/")) roi_dataset = self.file[HDF5Tags.REGIONS_OF_INTEREST] roi_dataset["/".join(old_name)].attrs[ROITags.ROI_NAME] = new_name roi_dataset["/".join(old_name)].attrs[ROITags.ROI_POSITION] = new_position new_dataset_name = new_name + "_" + new_position new_number = "0" if new_dataset_name in roi_dataset: new_number = str(len(roi_dataset[new_dataset_name])) roi_dataset.move("/".join(old_name), new_dataset_name + "/" + new_number) renumber_group(roi_dataset[old_name[0]])
def delete_rois(self, name_position=None, number=None): if "/" in name_position and number is None: name, number = name_position.split("/") name_position = name if HDF5Tags.REGIONS_OF_INTEREST in self.file: if name_position is None and number is None: del self.file[HDF5Tags.REGIONS_OF_INTEREST] elif ( number is None and name_position in self.file[HDF5Tags.REGIONS_OF_INTEREST] ): del self.file[HDF5Tags.REGIONS_OF_INTEREST][name_position] elif name_position in self.file[HDF5Tags.REGIONS_OF_INTEREST]: if number in self.file[HDF5Tags.REGIONS_OF_INTEREST][name_position]: del self.file[HDF5Tags.REGIONS_OF_INTEREST][name_position][number] # Renumber rois. renumber_group( self.file[HDF5Tags.REGIONS_OF_INTEREST][name_position] )
[docs] def delete_recons(self, name=None, recon_groups: Optional[Sequence[str]] = None): """ Delete some reconstructions. Parameters ---------- name : str or None recon_groups : (iterable of str) or None """ if recon_groups is None: recon_groups = [ HDF5Tags.RECONSTRUCTION, HDF5Tags.UNMIXED, HDF5Tags.SO2, HDF5Tags.THB, HDF5Tags.DELTA_SO2, HDF5Tags.DELTA_ICG, HDF5Tags.BASELINE_ICG, HDF5Tags.BASELINE_SO2, HDF5Tags.BASELINE_SO2_STANDARD_DEVIATION, HDF5Tags.BASELINE_ICG_SIGMA, ] if name is None: for group in recon_groups: if group in self.file: del self.file[group] else: for group in recon_groups: if group in self.file: for recon_name in self.file[group]: if name == recon_name: del self.file[group][name]
[docs] def delete_dso2s(self): """ Delete the old dso2 datasets. Might want to remove this or refactor it in some way? """ for group in [ HDF5Tags.DELTA_SO2, HDF5Tags.BASELINE_SO2_STANDARD_DEVIATION, HDF5Tags.BASELINE_SO2, HDF5Tags.DELTA_ICG, HDF5Tags.BASELINE_ICG, HDF5Tags.BASELINE_ICG_SIGMA, ]: if group in self.file: del self.file[group]
[docs] class HDF5Reader(ReaderInterface): def _get_rois(self): output = {} if HDF5Tags.REGIONS_OF_INTEREST in self.file: for roi_name in self.file[HDF5Tags.REGIONS_OF_INTEREST]: roi_group = self.file[HDF5Tags.REGIONS_OF_INTEREST][roi_name] for roi_number in roi_group: dataset = roi_group[roi_number] clinical = self.is_clinical() frame_type = ( ROITags.Z_POSITION if not clinical else ROITags.REPETITION ) match_frames = ( self.get_scanner_z_position() if not clinical else self.get_repetition_numbers() ) ax0_indices = np.where( np.isclose( match_frames[:, 0], dataset.attrs.get(frame_type, 1.0) ) )[0] output[(roi_name, roi_number)] = ROI( dataset[:], dataset.attrs[ROITags.Z_POSITION], dataset.attrs[ROITags.RUN], dataset.attrs.get(ROITags.REPETITION, np.nan), dataset.attrs[ROITags.ROI_NAME], dataset.attrs[ROITags.ROI_POSITION], dataset.attrs.get(ROITags.GENERATED_ROI, False), ax0_indices, ) return output
[docs] def __init__(self, file): ReaderInterface.__init__(self) if type(file) == h5py.File: self.file = file else: self.file = h5py.File(file, "r")
def get_scan_datetime(self): import dateutil.parser try: if type(self.file.attrs[HDF5Tags.DATE]) == str: return dateutil.parser.isoparse(self.file.attrs[HDF5Tags.DATE]).replace( tzinfo=None ) else: return self.file.attrs[HDF5Tags.DATE] except KeyError: return np.nan def _get_pa_data(self): return self.file[HDF5Tags.RAW_DATA], dict(self.file[HDF5Tags.RAW_DATA].attrs) def get_scan_name(self): return self.file[HDF5Tags.RAW_DATA].attrs[HDF5Tags.SCAN_NAME] def _get_temperature(self): return self.file[HDF5Tags.TEMPERATURE] def _get_correction_factor(self): if np.any(np.isnan(self.file[HDF5Tags.OVERALL_CORR][:])): # Old version return self.file["POWER"][:] return self.file[HDF5Tags.OVERALL_CORR][:] def _get_scanner_z_position(self): return self.file[HDF5Tags.Z_POSITION] def _get_run_numbers(self): return self.file[HDF5Tags.RUN] def _get_repetition_numbers(self): return self.file[HDF5Tags.REPETITION] def _get_scan_times(self): return self.file[HDF5Tags.TIMESTAMP][:, :] * 1e-7 def _get_sensor_geometry(self): return self.file[HDF5Tags.SCAN_GEOMETRY][:] def get_impulse_response(self): return self.file[HDF5Tags.IMPULSE_RESPONSE] def _get_wavelengths(self): return self.file[HDF5Tags.WAVELENGTH][:] def _get_water_absorption(self): return ( self.file[HDF5Tags.WATER_ABSORPTION_COEFF], self.file[HDF5Tags.WATER_ABSORPTION_COEFF].attrs[HDF5Tags.WATER_PATHLENGTH], ) def _get_datasets(self): output = {} for image_type in IMAGE_DATA_TYPES: output[image_type] = {} dtype = IMAGE_DATA_TYPES[image_type] images = [] if image_type in self.file: # Loop through all types of this image and add them to the list for recon in self.file[image_type]: for recon_num in self.file[image_type][recon]: image = load_image_from_hdf5( dtype, self.file[image_type][recon][recon_num], self.file ) images.append(((recon, recon_num), image)) for (recon, num), data in images: output[image_type][(recon, num)] = data return output def get_scan_comment(self): return self.file.attrs[HDF5Tags.SCAN_COMMENT] def _get_sampling_frequency(self): return self.file[HDF5Tags.RAW_DATA].attrs[HDF5Tags.SAMPLING_FREQ] def get_speed_of_sound(self): return self.file[HDF5Tags.RAW_DATA].attrs.get(HDF5Tags.SPEED_OF_SOUND, None) def _get_segmentation(self): return self.file.get(HDF5Tags.SEGMENTATION, None)
[docs] class PACFISHInterface(ReaderInterface): pass