Source code for toupy.io.dataio

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Standard library imports
import functools
import fnmatch
import glob
import os
from pathlib import Path
import re
import time

# third party packages
import h5py
import matplotlib.pyplot as plt
import numpy as np

# local packages
from .filesrw import *
from .h5chunk_shape_3D import chunk_shape_3D
from ..utils import (
    checkhostname,
    convert_to_delta,
    convert_to_beta,
    padarray_bothsides,
    progbar,
    ShowProjections,
    plot_checkangles,
    replace_bad,
)

__all__ = [
    "remove_extraprojs",
    "PathName",
    "LoadProjections",
    "SaveData",
    "LoadData",
    "SaveTomogram",
    "LoadTomogram",
]


[docs]def remove_extraprojs(stack_projs, theta): """ Remove extra projections of tomographic scans with projections at 180, 90 and 0 degrees at the end Parameters ---------- stack_projs : array_like Stack of projections with the first index correspoding to the projection number theta : array_like Array of theta values Returns ------- stack_projs : array_like Stack of projections after the removal theta : array_like Array of theta values after the removal """ print(theta[-5:]) a = str(input("Do you want to remove extra thetas?([y]/n)")).lower() if a == "" or a == "y": a1 = eval(input("How many to remove?")) # the 3 last angles are 180, 90 and 0 degrees stack_projs = stack_projs[:-a1] theta = theta[:-a1] # the 3 last angles are 180, 90 and 0 degrees print(theta[-5:]) return stack_projs, theta
class SliceMaker(object): """ Class to make array slices more efficient """ def __getitem__(self, item): return item
[docs]class PathName: """ Class to manage file location and paths """ def __init__(self, **params): """ Parameters ---------- **params Dictionary of parameters params["pathfilename"] : str Path to first file and filename params["account"] : str User account number params["samplename"] : str Samplename params["regime"] : str Regime of imaging """ self.pathfilename = os.path.abspath(params["pathfilename"]) self.useraccount = params["account"] self.samplename = params["samplename"] self.scanprefix = params["scanprefix"] self.regime = params["regime"] self.filename = os.path.basename(self.pathfilename) # data filename self.dirname = os.path.dirname(self.pathfilename) # data filename self.fileprefix, self.fileext = os.path.splitext( self.filename ) # filename and extension # metadata filename if self.fileext == ".edf": # edf self.rootpath = Path(self.pathfilename).parents[2] else: self.rootpath = Path(self.pathfilename).parents[4] self.icath5file = "{}-id16a.h5".format(self.useraccount) self.icath5path = os.path.join( self.rootpath, self.icath5file ) # metadata filename path try: self.pycuda = params["pycudaprojs"] except KeyError: self.pycuda = True try: self.legacy = params["legacy"] except KeyError: self.legacy = True try: self.thetameta = params["thetameta"] except KeyError: self.thetameta = True
[docs] def datafilewcard(self): """ Create file wildcard to search for files """ file_wcard = re.sub( self.samplename + r"\w*", self.samplename + "*", self.fileprefix ) # file_wcard # ~ file_wcard = re.sub(self.samplename+'\w*',self.samplename+'*_ML',fileprefix) return file_wcard
[docs] def metadatafilewcard(self): """ Create file wildcard to search for metafiles """ if not os.path.isfile(self.icath5path): raise IOError("File {} not found".format(self.icath5file)) if self.fileext == ".ptyr": # Ptypy metafile_wcard = re.sub( r"_subtomo\d{3}_\d{4}_\w+", "_*subtomo*", os.path.splitext(self.filename)[0], ) elif self.fileext == ".cxi": # PyNX metafile_wcard = re.sub( r"_*subtomo\d{3}_\d{4}", "_*subtomo*", os.path.splitext(self.filename)[0], ) elif self.fileext == ".edf": # edf metafile_wcard = re.sub(r"_\d{4}$", "*", os.path.splitext(self.filename)[0]) else: raise IOError( "File {} is not a .ptyr, nor a .cxi, nor a .edf file. Please, load a compatible file.".format( self.filename ) ) return metafile_wcard
[docs] def search_projections(self): """ Search for projection given the filenames """ print("Path: {}".format(self.dirname)) print("First projection file: {}".format(self.filename)) if self.fileext == ".edf": scan_wcard = re.sub(r"_\d{4}.edf", "*.edf", self.pathfilename) elif self.fileext == ".ptyr": # Ptypy if self.pycuda: reconsuffix = "_ML_pycuda" else: reconsuffix = "_ML" # TODO: correct the path name if the calculations are done in cuda or not # ~ scan_wcard = os.path.join( # ~ re.sub(self.samplename + r"_\w*", self.samplename + "_*", self.dirname), # ~ self.metadatafilewcard() + "_ML" + self.fileext, # ~ ) scan_wcard = os.path.join( re.sub(self.scanprefix + r"_\w*", self.scanprefix + "_*", self.dirname), self.metadatafilewcard() + reconsuffix + "*" + self.fileext, ) elif self.fileext == ".cxi": # PyNX scan_wcard = os.path.join( re.sub(self.samplename + r"_\w*", self.samplename + "_*", self.dirname), self.metadatafilewcard() + self.fileext, ) else: raise IOError( "File {} is not a .ptyr, nor a .cxi, nor a .edf file. Please, load a compatible file.".format( self.filename ) ) return scan_wcard
[docs] def results_folder(self): """ create path for the result folder """ aux_wcard = re.sub(r"\*", "", self.datafilewcard()) if self.regime == "nearfield": foldername = aux_wcard + "_nfpxct" elif self.regime == "farfield": foldername = aux_wcard + "_pxct" elif self.regime == "holoct": foldername = aux_wcard + "_hxct" else: raise ValueError("Unrecognized regime") results_path = os.path.join(os.path.dirname(self.dirname), foldername) if not os.path.isdir(results_path): print("Directory does not exist. Creating the directory...") os.makedirs(results_path) return results_path
[docs] def results_datapath(self, h5name): """ create path for the h5file in result folder """ aux_path = self.results_folder() h5file = os.path.join(aux_path, h5name) return h5file
class Variables(object): """ Auxiliary class to initialize some variables """ showrecons = False phaseonly = False amponly = False autosave = False checkextraprojs = True cxientry = None missingprojs = False missingnum = None border_crop_x = None border_crop_y = None shiftmeth = "fourier" derivatives = True calc_derivatives = False correct_bad = False bad_projs = [] opencl = True load_previous_shiftstack = False algorithm = "FBP"
[docs]class LoadProjections(PathName, Variables): """ Load the reconstructed projections from the ptyr files """ def __init__(self, **params): super().__init__(**params) self.params = params try: self.showrecons = params["showrecons"] except: pass try: self.border_crop_x = params["border_crop_x"] except: pass try: self.border_crop_y = params["border_crop_y"] except: pass try: self.checkextraprojs = params["checkextraprojs"] except: pass try: self.missingprojs = params["missingprojs"] except: pass try: self.missingnum = params["missingnum"] except: pass try: self.cxientry = params["cxientry"] except: pass try: self.detector = params["detector"] except: self.detector = "Frelon" if self.showrecons: self.SP = ShowProjections() if self.fileext == ".ptyr": # Ptypy self.read_reconfile = read_ptyr elif self.fileext == ".cxi": # PyNX self.read_reconfile = read_cxi elif self.fileext == ".edf": # edf projections self.read_reconfile = read_edf else: raise IOError( "File {} is not a .ptyr, nor a .cxi, nor a .edf file. Please, load a compatible file.".format( self.filename ) ) # create_paramsh5(**params) # get the list of files to load self.proj_files = sorted(glob.glob(self.search_projections())) def __call__(self): return self._load_projections()
[docs] @classmethod def load(cls, **params): """ Load the reconstructed projections from phase-retrieved files. Parameters ---------- **params Container with parameters to load the files. params["account"] : str User experiment number at ESRF. params["samplename"] : str Sample name params["pathfilename"] : str Path to the first projection file. params["regime"] : str Imaging regime. The options are: `nearfield`, `farfield`, `holoct`. params["showrecons"] : bool To show or not the projections once loaded params["autosave"] : bool Save the projections once load without asking params["phaseonly"] : bool Load only phase projections. Used when the projections are complex-valued. params["amponly"] : bool Load only amplitude projections. Used when the projections are complex-valued. params["border_crop_x"] : int, None Amount of pixels to crop at each border in x. params["border_crop_y"] : int, None Amount of pixels to crop at each border in y. params["checkextraprojs"] : bool Check for the projections acquired at and over 180 degrees. params["missingprojs"] : bool Allow to interpolate for missing projections. The numbers of the projections need to be provided in params["missingnum"]. params["missingnum"] : list of ints Numbers of the missing projections to be interpolated. Returns ------- stack_objs : array_like Array containing the projections stack_angles : array_like Array containing the thetas pxsize : list of floats List containing the pixel size in the vertical and horizontal directions. Typically, the resolution is isotropic and the two values are the same paramsload : dict Parameters of the loading """ return cls(**params)._load_projections()
[docs] @classmethod def loadedf(cls, **params): """ Load the reconstructed projections from the edf files This is adapted for the phase-contrast imaging generating projections as edf files Parameters ---------- **params Container with parameters to load the files. params["account"] : str User experiment number at ESRF. params["samplename"] : str Sample name params["pathfilename"] : str Path to the first projection file. params["regime"] : str Imaging regime. The options are: `nearfield`, `farfield`, `holoct`. params["showrecons"] : bool To show or not the projections once loaded params["autosave"] : bool Save the projections once load without asking Returns ------- stack_objs : array_like Array containing the projections stack_angles : array_like Array containing the thetas pxsize : list of floats List containing the pixel size in the vertical and horizontal directions. Typically, the resolution is isotropic and the two values are the same paramsload : dict Parameters of the loading """ return cls(**params)._load_edfprojections()
[docs] def check_angles(self): """ Find the angles of the projections and plot them to be checked Specific to ID16A beamline (ESRF) """ thetas = {} with h5py.File(self.icath5path, "r") as fid: sorted_keys = sorted(list(fid.keys())) for keys in sorted_keys: if fnmatch.fnmatch(keys, "*" + self.metadatafilewcard()): try: # old style at ID16A beamline positioners = fid[keys + "/sample/positioner/value"][()] except KeyError: # new style at ID16A beamline positioners = fid[keys + "/sample/positioners/value"][()] thetas[keys] = np.float(positioners.split()[0]) # remove additional angles if self.checkextraprojs: theta_keys = sorted(list(thetas.keys())) thetas_array = np.array([ii for ii in thetas.values()]) thetas_array -= thetas_array.min() idxend = int(np.where(thetas_array == 180)[0]) print(theta_keys[idxend:]) if theta_keys[idxend:] != []: print( "Removing projections at the end of the scan (180,90, and 0 degrees)" ) [thetas.pop(keyrm) for keyrm in theta_keys[idxend:]] rmkeys = [ii.split()[-1] for ii in theta_keys[idxend:]] for ii in rmkeys: [self.proj_files.remove(s) for s in self.proj_files if ii in s] # checking the angles print("Checking the angles") angles = [] deltaidx = 0 # in case of repeated values sorted_thetakeys = sorted(thetas.keys()) for idx, keys in enumerate(sorted_thetakeys): th = np.float(thetas[keys]) if th == np.float(thetas[sorted_thetakeys[idx - 1]]): print("Found repeated value of theta. Discarding it") deltaidx += 1 continue print("Projection {}: {} degrees".format(idx + 1 - deltaidx, thetas[keys])) angles.append(th) # plot the angles for verification plot_checkangles(angles) a = input("Are the angles ok?([Y]/n)").lower() if a == "" or a == "y": print("Continuing...") else: raise SystemExit("Exiting") return angles, thetas
[docs] def check_angles_new(self): """ Find the angles of the projections and plot them to be checked Specific to ID16A beamline (ESRF) """ thetas = {} if self.legacy: suffixfile = "_0000.h5" else: suffixfile = ".h5" # reading the angles from raw files for idxp, proj in enumerate(self.proj_files): keys = os.path.basename(os.path.dirname(proj)) # equal to scanname??? scanname = os.path.basename(Path(proj).parents[0]) rawfile = os.path.join(scanname, scanname + suffixfile) if not self.legacy: rawscanprefix = re.sub("_subtomo\w+", "", scanname) rawfile = os.path.join(rawscanprefix, scanname + suffixfile) rawfilepath = os.path.join(Path(proj).parents[3], rawfile) thetas[keys] = read_theta_raw(rawfilepath,detector=self.detector) # reading theta # checking the angles print("Checking the angles") angles = [] deltaidx = 0 # in case of repeated values sorted_thetakeys = sorted(thetas.keys()) sorted_thetakeys.sort(key=lambda x: re.findall(r"[0-9]+", x)[-2:]) for idx, keys in enumerate(sorted_thetakeys): th = np.float(thetas[keys]) if th == np.float(thetas[sorted_thetakeys[idx - 1]]): print("Found repeated value of theta. Discarding it") deltaidx += 1 continue print("Projection {}: {} degrees".format(idx + 1 - deltaidx, thetas[keys])) angles.append(th) # plot the angles for verification plot_checkangles(angles) a = input("Are the angles ok?([Y]/n)").lower() if a == "" or a == "y": print("Continuing...") else: raise SystemExit("Exiting") return angles, thetas
def _remove_extraprojs(self, thetas, proj_files): """ Remove extra projections of tomographic scans with projections at 180, 90 and 0 degrees at the end Parameters ---------- theta : array_like Array of theta values proj_files : list of str List of projection files Returns ------- stack_projs : array_like Stack of projections after the removal theta : array_like Array of theta values after the removal """ print("The final 5 angles are: {}".format(list(thetas[-5:]))) a = str(input("Do you want to remove extra thetas?([y]/n)")).lower() if a == "" or a == "y": a1 = input("How many to remove?(default=3) ") if a1 == "": rmnum = 3 else: rmnum = eval(a1) # the 3 last angles are 180, 90 and 0 degrees proj_files = proj_files[:-rmnum] # the 3 last angles are 180, 90 and 0 degrees thetas = thetas[:-rmnum] print("The final 5 angles are now: {}".format(list(thetas[-5:]))) plot_checkangles(thetas) # re-ploting for checking return proj_files
[docs] @staticmethod def insert_missing(stack_objs, theta, missingnum): """ Insert missing projections by interpolation of neighbours """ # special: insert the information of the missing projections print("Inserting the missing projections:{}".format(missingnum)) delta_theta = theta[1] - theta[0] for ii in missingnum: print("Projection: {}".format(ii), end="\r") theta = np.insert(theta, ii, theta[ii - 1] + delta_theta) stack_objs = np.insert(stack_objs, ii, stack_objs[ii - 1], axis=0) print("\r") return stack_objs, theta
@checkhostname def _load_projections(self): """ Load the reconstructed projections from the ptyr files """ # get the angles if self.thetameta: print("Extracting theta values from metadata") angles, thetas = self.check_angles() else: print("Extracting theta values from experimental data") angles, thetas = self.check_angles_new() # count the number of available projections num_projections = len(self.proj_files) a = input( "I have found {} projections. Do you want to continue?([Y]/n)".format( num_projections ) ).lower() if a == "" or a == "y": print("Continuing...") plt.close("all") else: raise SystemExit("Exiting the script") # Read the first projection to check size and reconstruction parameters objs0, probe0, pxsize, energy = self.read_reconfile(self.pathfilename) # add the information of pixelsize and energy to params paramsload = dict() paramsload.update(self.params) paramsload["pixelsize"] = pxsize paramsload["energy"] = energy # crop image if requested objs0 = crop_array(objs0, self.border_crop_x, self.border_crop_y) nr, nc = objs0.shape # ~ print(objs0.shape) if pxsize[0] != pxsize[1]: raise SystemExit("Pixel size is not symmetric. Exiting the script") print( "the pixelsize of the first projection is {:.2f} nm".format(pxsize[0] * 1e9) ) # initialize the array for the stack objects stack_objs = np.empty((num_projections, nr, nc), dtype=np.complex64) stack_angles = np.empty(num_projections, dtype=np.float32) # reads the ptyr or cxi files and get object and probe in a stack for idxp, proj in enumerate(self.proj_files): print("\nProjection: {}".format(idxp)) print("Reading: {}".format(proj)) objs, probes, pxsize, energy = self.read_reconfile(proj) # reading file # crop image if requested if self.border_crop_x is not None: if self.border_crop_y is not None: objs = crop_array(objs, self.border_crop_x, self.border_crop_y) # check if same size, otherwise pad if objs.shape != objs0.shape: print("########################") objs = padarray_bothsides(objs, (nr, nc), padmode="edge") print("File {} has different shape and was padded".format(proj)) print("########################") # update stack_objs stack_objs[idxp] = objs # compare projection name with thetas dictionary and associate angles if self.fileext == ".ptyr": key_finder = os.path.basename(os.path.dirname(proj)) elif self.fileext == ".cxi": key_finder = os.path.splitext(os.path.basename(proj))[0] # compare projection name with thetas dictionary and associate angles for keys in sorted(thetas.keys()): if keys.find(key_finder) != -1: stack_angles[idxp] = thetas[keys] print("Angle: {}".format(thetas[keys])) break if self.showrecons: print("Showing projection {}".format(idxp + 1)) self.SP.show_projections(objs, probes, idxp) nprojs, nr, nc = stack_objs.shape print("\nNumber of projections loaded: {}".format(nprojs)) if self.missingprojs: stack_objs, stack_angles = self.insert_missing( stack_objs, stack_angles, self.missingnum ) nprojs, nr, nc = stack_objs.shape print("New number of projections: {}".format(nprojs)) print("Dimensions {} x {} pixels".format(nr, nc)) print("All projections loaded\n") return stack_objs, stack_angles, pxsize, paramsload @checkhostname def _load_edfprojections(self): """ Load the reconstructed projections from the edf files This is adapted for the phase-contrast imaging generating projections as edf files """ ## to be tested # notuseful = sorted(glob.glob(self.samplename+'*_[0-4].edf')) # self.proj_files = [ii for ii in self.proj_files if ii not in notuseful] # remove the last projection, which is 180 degrees self.proj_files = self.proj_files[:-1] # count the number of available projections num_projections = len(self.proj_files) a = input( "I have found {} projections. Do you want to continue?([Y]/n)".format( num_projections ) ).lower() if a == "" or a == "y": print("Continuing...") plt.close("all") else: raise SystemExit("Exiting the script") # Read the first projection to check size and reconstruction parameters objs0, pxsize, energy, nvue = self.read_reconfile(self.pathfilename) if nvue != num_projections: raise ValueError("The number of projections is different from nvue in file") nr, nc = objs0.shape # add the information of pixelsize and energy to params paramsload = dict() paramsload.update(self.params) paramsload["pixelsize"] = pxsize paramsload["energy"] = energy print("the pixelsize of the first projection is {:.2f} nm".format(pxsize * 1e9)) # initialize the array for the stack objects stack_objs = np.empty((num_projections, nr, nc), dtype=np.float32) stack_angles = np.arange(0, 180, 180 / nvue, dtype=np.float32) if stack_angles.shape[0] != num_projections: raise ValueError( "The number of projections is different from number of thetas" ) # reads the edf files and get object and probe in a stack for idxp, proj in enumerate(self.proj_files): print("\nProjection: {}".format(idxp)) print("Reading: {}".format(proj)) objs, _, _, _ = self.read_reconfile(proj) # reading file # update stack_objs stack_objs[idxp] = objs if self.showrecons: print("Showing projection {}".format(idxp + 1)) self.SP.show_projections(objs, probes, idxp) nprojs, nr, nc = stack_objs.shape print("\nNumber of projections loaded: {}".format(nprojs)) print("Dimensions {} x {} pixels".format(nr, nc)) print("All projections loaded\n") return stack_objs, stack_angles, pxsize, paramsload
[docs]class SaveData(PathName, Variables): """ Save projections to HDF5 file """ def __init__(self, **params): super().__init__(**params) self.params = params try: self.cxientry = params["cxientry"] except: pass try: self.autosave = params["autosave"] except: pass def __call__(self, *args): # h5name,stack_projs,theta,shiftstack): return self._save_data(*args)
[docs] @classmethod def save(cls, *args, **params): """ Save data to HDF5 File Parameters ---------- *args positional arguments args[0] : str H5 file name args[1] : array_like Array containing the stack of projections args[2] : array_like Values of theta args[3] : array_like Array containing the shifts for each projection in the stack. If not provided, it will be initialized with zeros args[4] : array_like or None Array containing the projection masks """ return cls(**params)._save_data(*args)
[docs] @classmethod def saveFSC(cls, *args, **params): """ Save FSC data to HDF5 file Parameters ---------- *args positional arguments args[0] : str H5 file name args[1] : array_like Normalized frequencies args[2] : array_like Value of the threshold for each frequency args[3] : array_like The FSC curve args[4] : array_like The first tomogram args[5] : array_like The second tomogram args[6] : array_like The array of theta values args[7] : float Pixel size """ return cls(**params)._save_FSC(*args)
[docs] @classmethod def savemasks(cls, *args, **params): return cls(**params)._save_masks(*args)
def _save_masks(self, h5name, masks): """ Save masks for the linear phase ramp removal of the phase contrast image or the air removal from the amplitude images """ print("Saving {}".format(h5name)) h5file = self.results_datapath(h5name) if os.path.isfile(h5file): os.remove(h5file) with h5py.File(h5file, "a") as fid: fid.create_dataset( "masks/stack", data=masks, dtype=np.bool ) # air/vacuum mask print("Done")
[docs] def savecheck(func): """ Decorator for save data """ @functools.wraps(func) def new_func(self, *args, **params): if self.autosave: ansuser = "y" func(self, *args) else: while True: ansuser = input( "Do you want to save the data to HDF5 file? ([y]/n) " ).lower() if ansuser == "" or ansuser == "y": func(self, *args) break elif ansuser == "n": print("The data have NOT been saved yet. Please, be careful") break else: print("You have to answer y or n") return new_func
@savecheck def _save_data(self, *args): """ Save data to HDF5 File Parameters ---------- *args positional arguments args[0] : str H5 file name args[1] : array_like Array containing the stack of projections args[2] : array_like Values of theta args[3] : array_like Array containing the shifts for each projection in the stack. If not provided, it will be initialized with zeros args[4] : array_like or None Array containing the projection masks """ h5name = args[0] stack_projs = args[1] nprojs, nr, nc = stack_projs.shape theta = args[2] if len(args) == 4: shiftstack = args[3] else: shiftstack = np.zeros((2, nprojs), dtype=np.float32) if len(args) == 5: masks = args[4] else: masks = None if np.iscomplexobj(stack_projs[0]): array_dtype = np.complex64 else: array_dtype = np.float32 # calculate the chunk size for writing the HDF5 files chunk_size = chunk_shape_3D(stack_projs.shape) print("Saving {}".format(h5name)) h5file = self.results_datapath(h5name) if os.path.isfile(h5file): print("File {} already exists and will be overwritten".format(h5name)) os.remove(h5file) print("\rSaving metadata...", end="") write_paramsh5(h5file, **self.params) create_paramsh5(**self.params) print("\b\b Done") print("Saving data. This takes time, please wait...") with h5py.File(h5file, "a") as fid: fid.create_dataset( "shiftstack/shiftstack", data=shiftstack, dtype=np.float32 ) # shiftstack fid.create_dataset("angles/thetas", data=theta, dtype=np.float32) # thetas dset = fid.create_dataset( "projections/stack", shape=(nprojs, nr, nc), dtype=array_dtype, chunks=chunk_size, ) p0 = time.time() for ii in range(nprojs): strbar = "{:5d} / {:5d}".format(ii + 1, nprojs) dset[ii : ii + 1, :, :] = stack_projs[ii] # avoid fancy slicing progbar(ii + 1, nprojs, strbar) print("\r") if masks is not None: fid.create_dataset( "masks/stack", data=masks, dtype=np.bool ) # air/vacuum mask print("Done. Time elapsed = {:.03f} s".format(time.time() - p0)) print("Data saved to file {}".format(h5name)) print("In the folder {}".format(self.results_folder())) def _save_FSC(self, *args): """ Save FSC data to HDF5 file Parameters ---------- *args positional arguments args[0] : str H5 file name args[1] : array_like Normalized frequencies args[2] : array_like Value of the threshold for each frequency args[3] : array_like The FSC curve args[4] : array_like The first tomogram args[5] : array_like The second tomogram args[6] : array_like The array of theta values args[7] : float Pixel size """ h5name = args[0] normfreqs = args[1] T = args[2] FSCcurve = args[3] tomogram1 = args[4] tomogram2 = args[5] theta = args[6] pxsize = args[7] print("Saving {}".format(h5name)) h5file = self.results_datapath(h5name) if os.path.isfile(h5file): print("File {} already exists and will be overwritten".format(h5name)) os.remove(h5file) print("\rSaving metadata...", end="") write_paramsh5(h5file, **self.params) create_paramsh5(**self.params) print(". Done") print("Saving data. This takes time, please wait...") p0 = time.time() with h5py.File(h5file, "a") as fid: fid.create_dataset( "angles/thetas", data=theta, dtype=np.float32 ) # add the thetas fid.create_dataset( "FSC", data=FSCcurve, dtype=np.float32 ) # add the FSC curve # add the threshold fid.create_dataset("T", data=T, dtype=np.float32) fid.create_dataset( "normfreqs", data=normfreqs, dtype=np.float32 ) # add the normalized freqs if tomogram1.ndim == 2: fid.create_dataset("tomogram1", data=tomogram1, dtype=np.float32) fid.create_dataset("tomogram2", data=tomogram2, dtype=np.float32) elif tomogram1.ndim == 3: # calculate the chunk size for writing the HDF5 files chunk_size = chunk_shape_3D(tomogram1.shape) print("Saving tomogram1 and tomogram2. This takes time, please wait...") dset1 = fid.create_dataset( "tomogram1", shape=tomogram1.shape, dtype=np.float32, chunks=chunk_size, ) dset2 = fid.create_dataset( "tomogram2", shape=tomogram2.shape, dtype=np.float32, chunks=chunk_size, ) nslices, nr, nc = tomogram1.shape for ii in range(nslices): print("{:5d} / {:5d}".format(ii + 1, nslices), end="\r") dset1[ii, :, :] = tomogram1[ii] dset2[ii, :, :] = tomogram2[ii] progbar(ii + 1, nslices) # ~ print('\r') print("Done. Time elapsed = {:.03f} s".format(time.time() - p0)) print("FSC data saved to file {}".format(h5name)) print("In the folder {}".format(self.results_folder()))
[docs]class LoadData(PathName, Variables): """ Load projections from HDF5 file """ def __init__(self, **params): paramsh5 = load_paramsh5(**params) super().__init__(**paramsh5) self.params = paramsh5 self.params.update(params) try: self.amponly = params["amponly"] except: pass try: self.phaseonly = params["phaseonly"] except: pass try: self.loadroi = params["loadroi"] except: self.loadroi = False if self.loadroi: self.roi = params["roi"] if self.roi == []: print("ROI not specified. Loading entire dataset") self.loadroi = False def __call__(self, h5name): return self._load_data(h5name)
[docs] @classmethod def load(cls, *args, **params): """ Load data from h5 file Parameters ---------- h5name: str File name from which data is loaded **params Dictionary of additonal parameters params["autosave"] : bool Save the projections once load without asking params["phaseonly"] : bool Load only phase projections. Used when the projections are complex-valued. params["amponly"] : bool Load only amplitude projections. Used when the projections are complex-valued. params["pixtol"] : float Tolerance for alignment, which is also used as a search step params["alignx"] : bool True or False to activate align x using center of mass (default= False, which means align y only) params["shiftmeth"] : str Shift images with fourier method (default). The options are `linear` -> Shift images with linear interpolation (default); `fourier` -> Fourier shift or `spline` -> Shift images with spline interpolation. params["circle"] : bool Use a circular mask to eliminate corners of the tomogram params["filtertype"] : str Filter to use for FBP params["freqcutoff"] : float Frequency cutoff for tomography filter (between 0 and 1) params["cliplow"] : float Minimum value in tomogram params["cliphigh"] : float Maximum value in tomogram params["correct_bad"] : bool If true, it will interpolate bad projections. The numbers of projections to be corrected is given by `params["bad_projs"]`. params["bad_projs"] : list of ints List of projections to be interpolated. It starts at 0. Returns ------- stack_projs: array_like Stack of projections theta: array_like Stack of thetas shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) datakwargs : dict Dictionary with metadata information """ return cls(**params)._load_data(*args)
[docs] @classmethod def load_olddata(cls, *args, **params): """ Load old data from h5 file. It should disappear soon. Parameters ---------- h5name: str File name from which data is loaded **params Dictionary of additonal parameters params["autosave"] : bool Save the projections once load without asking params["phaseonly"] : bool Load only phase projections. Used when the projections are complex-valued. params["amponly"] : bool Load only amplitude projections. Used when the projections are complex-valued. params["pixtol"] : float Tolerance for alignment, which is also used as a search step params["alignx"] : bool True or False to activate align x using center of mass (default= False, which means align y only) params["shiftmeth"] : str Shift images with fourier method (default). The options are `linear` -> Shift images with linear interpolation (default); `fourier` -> Fourier shift or `spline` -> Shift images with spline interpolation. params["circle"] : bool Use a circular mask to eliminate corners of the tomogram params["filtertype"] : str Filter to use for FBP params["freqcutoff"] : float Frequency cutoff for tomography filter (between 0 and 1) params["cliplow"] : float Minimum value in tomogram params["cliphigh"] : float Maximum value in tomogram params["correct_bad"] : bool If true, it will interpolate bad projections. The numbers of projections to be corrected is given by `params["bad_projs"]`. params["bad_projs"] : list of ints List of projections to be interpolated. It starts at 0. Returns ------- stack_projs: array_like Stack of projections theta: array_like Stack of thetas shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) datakwargs : dict Dictionary with metadata information """ return cls(**params)._load_olddata(*args)
[docs] @classmethod def loadshiftstack(cls, *args, **params): """ Load shitstack from previous h5 file Parameters ---------- h5name : str File name from which data is loaded Returns ------- shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) """ return cls(**params)._load_shiftstack(*args)
[docs] @classmethod def loadtheta(cls, *args, **params): """ Load shitstack from previous h5 file Parameters ---------- h5name : str File name from which data is loaded Returns ------- shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) """ return cls(**params)._load_theta(*args)
[docs] @classmethod def loadmasks(cls, *args, **params): """ Load masks from previous h5 file Parameters ---------- h5name: str File name from which data is loaded Returns ------- masks: array_like Array with the masks """ return cls(**params)._load_masks(*args)
def _load_shiftstack(self, h5name): """ Load shitstack from previous h5 file Parameters ---------- h5name : str File name from which data is loaded Returns ------- shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) """ print("Loading shiftstack from file {}".format(h5name)) h5file = self.results_datapath(h5name) if not os.path.isfile(h5file): raise ValueError(f"The file {h5file} does not exist yet") with h5py.File(h5file, "r") as fid: shiftstack = fid["shiftstack/shiftstack"][()] return shiftstack def _load_theta(self, h5name): """ Load shitstack from previous h5 file Parameters ---------- h5name : str File name from which data is loaded Returns ------- shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) """ print("Loading thetas from file {}".format(h5name)) h5file = self.results_datapath(h5name) with h5py.File(h5file, "r") as fid: theta = fid["angles/thetas"][()] return theta def _load_masks(self, h5name): """ Load masks from previous h5 file Parameters ---------- h5name: str File name from which data is loaded Returns ------- masks: array_like Array with the masks """ print("Loading the projections from file {}".format(h5name)) h5file = self.results_datapath(h5name) with h5py.File(h5file, "r") as fid: masks = fid["masks/stack"][()] return masks @checkhostname def _load_data(self, h5name): """ Load data from h5 file Parameters ---------- h5name: str File name from which data is loaded Returns ------- stack_projs: array_like Stack of projections theta: array_like Stack of thetas shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) datakwargs : dict Dictionary with metadata information """ h5file = self.results_datapath(h5name) shiftstack = self._load_shiftstack(h5name) theta = self._load_theta(h5name) it0 = time.time() with h5py.File(h5file, "r") as fid: # read the inputkwargs dict datakwargs = dict() print("\rLoading metadata...", end="") for keys in sorted(list(fid["info"].keys())): datakwargs[keys] = fid["info/{}".format(keys)][()] datakwargs.update(self.params) # add/update with new values print("\b\b Done") print("Loading the projections from file {}".format(h5name)) dset = fid["projections/stack"] # ~ stack_projs = dset[()] if self.loadroi: roi = self.roi nprojs, nr, nc = [roi[ii + 1] - roi[ii] for ii in range(0, len(roi), 2)] print("\rInitializing array...", end="") stack_projs = np.empty((nprojs, nr, nc), dtype=dset.dtype) print("\b\b Done") print("Loading. This takes time, please wait...") for ii in [projs]: strbar = "{:5d} / {:5d}".format(ii + 1, nprojs) stack_projs[ii, roi[2] : roi[3], roi[4] : roi[5]] = dset[ ii, roi[2] : roi[3], roi[4] : roi[5] ] progbar(ii + 1, nprojs, strbar) print("\r") else: nprojs = dset.shape[0] print("\rInitializing array...", end="") stack_projs = np.empty(dset.shape, dtype=dset.dtype) print("\b\b Done") print("Loading. This takes time, please wait...") for ii in range(nprojs): strbar = "{:5d} / {:5d}".format(ii + 1, nprojs) stack_projs[ii, :, :] = dset[ii, :, :] progbar(ii + 1, nprojs, strbar) print("\r") if self.amponly and np.iscomplexobj(stack_projs): print("\rTaking only amplitudes...", end="") stack_projs = np.abs(stack_projs) print("\b\b Done") elif self.phaseonly and np.iscomplexobj(stack_projs): print("\rTaking only phases...", end="") stack_projs = np.angle(stack_projs) print("\b\b Done") try: self.params["correct_bad"] except KeyError: self.params["correct_bad"] = False if self.params["correct_bad"]: stack_projs = replace_bad( stack_projs, list_bad=self.params["bad_projs"], temporary=True ) print("Projections loaded from file {}".format(h5name)) print("Time elapsed = {:.03f} s".format(time.time() - it0)) return stack_projs, theta, shiftstack, datakwargs @checkhostname def _load_olddata(h5name, **params): """ Load data from the old-format h5 file. Parameters ---------- h5name: str File name from which data is loaded Returns ------- stack_projs : array_like Stack of projections theta : array_like Stack of thetas shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) datakwargs : dict Dictionary with metadata information Note ---- May be deprecated soon. """ h5file = self.results_datapath(h5name) shiftstack = self._load_shiftstack(h5name) theta = self._load_theta(h5name) it0 = time.time() print("The file format is old.") with h5py.File(h5file, "r") as fid: # read the inputkwargs dict datakwargs = dict() print("\rLoading metadata...", end="") for keys in sorted(list(fid["info"].keys())): datakwargs[keys] = fid["info/{}".format(keys)][()] datakwargs.update(self.params) # add/update with new values print("\b\b Done") datakwargs["pixelsize"] = fid["pixelsize"][()] dset0 = fid["aligned_projections_proj/projection_000"] nr, nc = dset0.shape key_list = list(fid["aligned_projections_proj"].keys()) nprojs = len(key_list) stack_projs = np.empty((nprojs, nr, nc), dtype=dset.dtype) print("Loading projections. This takes time, please wait...") for ii in range(nprojs): strbar = "{:5d} / {:5d}".format(ii + 1, nprojs) dset = fid["aligned_projections_proj/{}".format(key_list[ii])] stack_projs[ii] = dset[()] progbar(ii + 1, nprojs, strbar) print("\r") # sorting theta print("Sorting theta...") stack_projs, theta = sort_array(stack_projs, theta) print("Projections loaded from file {}".format(h5name)) print("Time elapsed = {:.03f} s".format(time.time() - it0)) return stack_projs, theta, shiftstack, datakwargs
[docs]class SaveTomogram(SaveData): """ Save tomogram to HDF5 file """ def __init__(self, **params): super().__init__(**params) self.params = params
[docs] def savecheck(func): """ Decorator for save data """ @functools.wraps(func) def new_func(self, *args, **params): if self.autosave: ansuser = "y" func(self, *args) else: while True: ansuser = input( "Do you want to save the data to HDF5 file? ([y]/n) " ).lower() if ansuser == "" or ansuser == "y": func(self, *args) break elif ansuser == "n": print("The data have NOT been saved yet. Please, be careful") break else: print("You have to answer y or n") return new_func
def __call__(self, *args): return self._save_tomogram(*args) @classmethod def save(cls, *args, **params): return cls(**params)._save_tomogram(*args)
[docs] @classmethod def save_vol_to_h5(cls, *args, **params): return cls(**params)._save_vol_to_h5(*args)
[docs] @classmethod def save(cls, *args, **params): """ Parameters ---------- *args positional arguments args[0] : str H5 file name args[1] : array_like Array containing the stack of slices (tomogram) args[2] : array_like Values of theta args[3] : array_like Array containing the shifts for each projection in the stack """ return cls(**params)._save_tomogram(*args)
[docs] @classmethod def convert_to_tiff(cls, *args, **params): """ Convert the HDF5 file with the tomogram to tiff Parameters ---------- *args positional arguments args[0] : str H5 file name args[1] : array_like Array containing the stack of slices (tomogram) args[2] : array_like Values of theta args[3] : array_like Array containing the shifts for each projection in the stack """ return cls(**params)._convert_to_tiff(*args)
@savecheck def _save_tomogram(self, *args): """ Parameters ---------- *args positional arguments args[0] : str H5 file name args[1] : array_like Array containing the stack of slices (tomogram) args[2] : array_like Values of theta args[3] : array_like Array containing the shifts for each projection in the stack """ h5name = args[0] tomogram = args[1] nslices, nr, nc = tomogram.shape theta = args[2] if len(args) == 4: shiftstack = args[3] else: shiftstack = np.zeros((2, nprojs), dtype=np.float32) # calculate the chunk size for writing the HDF5 files chunk_size = chunk_shape_3D(tomogram.shape) print("Saving {}".format(h5name)) h5file = self.results_datapath(h5name) if os.path.isfile(h5file): print("File {} already exists and will be overwritten".format(h5name)) os.remove(h5file) print("\rSaving metadata...", end="") write_paramsh5(h5file, **self.params) create_paramsh5(**self.params) print("\b\b Done") print("Saving data. This takes time, please wait...") with h5py.File(h5file, "a") as fid: # add the shiftstack fid.create_dataset( "shiftstack/shiftstack", data=shiftstack, dtype=np.float32 ) fid.create_dataset( "angles/thetas", data=theta, dtype=np.float32 ) # add the thetas # ,compression='lzf')#, compression='gzip', compression_opts=9) dset = fid.create_dataset( "tomogram/slices", shape=(nslices, nr, nc), dtype=np.float32, chunks=chunk_size, ) print("Saving tomographic slices. This takes time, please wait...") p0 = time.time() for ii in range(nslices): strbar = "{:5d} / {:5d}".format(ii + 1, nslices) # ~ print(' Slice: {} out of {}'.format(ii+1, nslices), end='\r') dset[ii, :, :] = tomogram[ii] progbar(ii + 1, nslices, strbar) print("\r") print("Done. Time elapsed = {:.03f} s".format(time.time() - p0)) print("Tomogram saved to file {}".format(h5name)) print("In the folder {}".format(self.results_folder()))
[docs] def tiff_folderpath(self, foldername): """ Create the path to the folder in which the tiff files will be stored. """ aux_path = self.results_folder() folderpath = os.path.join(aux_path, foldername) if not os.path.isdir(folderpath): print("Folder {} does not exists and will be created.".format(folderpath)) os.makedirs(folderpath) else: print("Folder exists:{}".format(folderpath)) userans = input( "Do you want to overwrite TIFFs in this folder ([y]/n)?" ).lower() if userans == "" or userans == "y": print("Overwritting") else: print("Writting of TIFFs aborted") raise SystemExit("Writting of TIFFs aborted") return folderpath
def _convert_to_tiff(self, *args): """ Convert the HDF5 file with the tomogram to tiff Parameter --------- *args positional arguments args[0] : str H5 file name args[1] : array_like Array containing the stack of slices (tomogram) args[2] : array_like Values of theta args[3] : array_like Array containing the shifts for each projection in the stack """ tomogram = args[0] nslices, nr, nc = tomogram.shape try: voxelsize = self.params["voxelsize"] except KeyError: voxelsize = self.params["pixelsize"] energy = self.params["energy"] print("The total number of slides is {}".format(nslices)) print("The voxel size is {} nm".format(voxelsize[0] * 1e9)) create_paramsh5(**self.params) # create the TIFF folder tiff_subfolder_name = "TIFF_{}_{}_freqscl_{:0.2f}_{:d}bits".format( self.params["tomo_type"], self.params["filtertype"], self.params["freqcutoff"], self.params["bits"], ) if self.params["tomo_type"] == "delta": # Conversion from phase-shifts tomogram to delta print("Converting from phase-shifts values to delta values") for ii in range(nslices): strbar = "{:5d} / {:5d}".format(ii + 1, nslices) tomogram[ii], factor = convert_to_delta(tomogram[ii], energy, voxelsize) progbar(ii + 1, nslices, strbar) elif self.params["tomo_type"] == "beta": # Conversion from amplitude to beta print("Converting from amplitude to beta values") for ii in range(slices): strbar = "{:5d} / {:5d}".format(ii + 1, nslices) tomogram[ii], factor = convert_to_beta(tomogram[ii], energy, voxelsize) progbar(ii + 1, nslices, strbar) print("\r") # low and high cutoff for Tiff normalization low_cutoff = np.min(tomogram) high_cutoff = np.max(tomogram) # writing the tiffs print("Writing the tiff files...") tiff_path = self.tiff_folderpath(tiff_subfolder_name) for ii in range(nslices): strbar = "{:5d} / {:5d}".format(ii + 1, nslices) if self.params["bits"] == 16: imgtiff = convertimageto16bits(tomogram[ii], low_cutoff, high_cutoff) elif self.params["bits"] == 8: imgtiff = convertimageto8bits(tomogram[ii], low_cutoff, high_cutoff) filename = "tomo_{:s}_filter_{:s}_cutoff_{:0.2f}_{:04d}.tif".format( self.params["tomo_type"], self.params["filtertype"], self.params["freqcutoff"], ii, ) pathfilename = os.path.join(tiff_path, filename) write_tiff(imgtiff, pathfilename) progbar(ii + 1, nslices, strbar) print("\r") # writing the metadata filename = tiff_subfolder_name + "_cutoffs.txt" metadatafile = os.path.join(tiff_path, filename) write_tiffmetadata(metadatafile, low_cutoff, high_cutoff, factor, **self.params) @savecheck def _save_vol_to_h5(self, *args): """ Save .vol files into HDF5 file """ h5name = args[0] theta = args[1] print("Saving .vol into HDF5") filename = "volfloat/{}.vol".format(self.params["samplename"]) tomogram = read_volfile(filename) nslices = tomogram.shape[0] print("Found {} slices in the volume.".format(nslices)) self._save_tomogram(self, h5name, tomogram, theta, **self.params)
[docs]class LoadTomogram(LoadData): """ Load projections from HDF5 file """ def __init__(self, **params): paramsh5 = load_paramsh5(**params) super().__init__(**paramsh5) self.params = paramsh5 self.params.update(params) def __call__(self, h5name): return self._load_tomogram(h5name)
[docs] @classmethod def load(cls, *args, **params): """ Load tomographic data from h5 file Parameters ---------- args[0] : str HDF5 file name from which data is loaded Returns ------- tomogram : array_like Stack of tomographic slices theta : array_like Stack of thetas shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) datakwargs : dict Dictionary with metadata information """ return cls(**params)._load_tomogram(*args)
@checkhostname def _load_tomogram(self, h5name): """ Load tomographic data from h5 file Parameters ---------- h5name : str File name from which data is loaded Returns -------- tomogram : array_like Stack of tomographic slices theta : array_like Stack of thetas shiftstack : array_like Shifts in vertical (1st dimension) and horizontal (2nd dimension) datakwargs : dict Dictionary with metadata information """ print("Loading tomogram from file {}".format(h5name)) h5file = self.results_datapath(h5name) shiftstack = self._load_shiftstack(h5name) p0 = time.time() with h5py.File(h5file, "r") as fid: theta = fid["angles/thetas"][()] # read the inputkwargs dict datakwargs = dict() print("\rLoading metadata...", end="") for keys in sorted(list(fid["info"].keys())): datakwargs[keys] = fid["info/{}".format(keys)][()] datakwargs.update(self.params) print("\b\b Done") print("Loading tomogram. This takes time, please wait...") dset = fid["tomogram/slices"] nslices = dset.shape[0] tomogram = np.empty(dset.shape, dtype=dset.dtype) # ~ tomogram = fid[u'tomogram/slices'][()] for ii in range(nslices): strbar = "{:5d} / {:5d}".format(ii + 1, nslices) tomogram[ii : ii + 1, :, :] = dset[ii, :, :] progbar(ii + 1, nslices, strbar) print("\r") print("Tomogram loaded from file {}".format(h5name)) print("Time elapsed = {:.03f} s".format(time.time() - p0)) return tomogram, theta, shiftstack, datakwargs