Source code for mcutils.scripts.MDE.fit_dispersion

#!/usr/bin/env python
"""Fit dispersing zeppelin model (single Bingham)"""
import os.path as op
import logging
import numpy as np
from scipy import optimize, special
from mcutils.utils.hypergeometry import bingham_normalization, der_bingham_normalization
from mcutils.utils.sidecar import AcquisitionParams
from mcutils.utils import spherical
from mcutils.utils.noise import log_non_central_chi
from mcutils.scripts.sidecar import index as sidecar_index
from fsl.utils.filetree import FileTree
from numpy.linalg import LinAlgError
import nibabel
import time
import pandas as pd
import string
from functools import lru_cache

logger = logging.getLogger(__name__)

cached_euler2mat = lru_cache(maxsize=4)(spherical.euler2mat)


[docs]def bingham_matrix(lk1, lk2, phi, theta, psi, derivative=False): """ Returns the Bingham matrix after rotation :param lk1: logarithm of k1 :param lk2: logarithm of k2 :param phi: main fibre orientation in the x-y plane :param theta: polar angle (0 for main fibre orientation in z-direction, pi/2 for main fibre orientation in x-y plane) :param psi: rotation of the major dispersion axis around the main fibre orientation :param derivative: also return a (5, 3, 3) derivative matrix :return: (3, 3) Bingham matrix (and a (5, 3, 3) matrix with derivatives if requested) """ arr = np.zeros((3, 3)) arr[0, 0] = -np.exp(lk1) arr[1, 1] = -np.exp(lk2) rotation = cached_euler2mat(phi, theta, psi) res = np.dot(np.dot(rotation, arr), rotation.T) if not derivative: return res der = np.zeros((5, 3, 3)) der[0] = arr[0, 0] * rotation[:, 0, None] * rotation[None, :, 0] der[1] = arr[1, 1] * rotation[:, 1, None] * rotation[None, :, 1] angles = [phi, theta, psi] for idx in range(3): angles[idx] += 1e-6 upper_rotation = cached_euler2mat(*angles) res_upper = np.dot(np.dot(upper_rotation, arr), upper_rotation.T) der[idx + 2] = (res_upper - res) / 1e-6 angles[idx] -= 1e-6 return res, der
[docs]def stick_dispersion(dstick, bmat, bingham, derivative=False): """ Returns the signal for dispersing sticks :param dstick: diffusion along the stick :param bmat: b-matrix defining the signal acquisition :param bingham: Bingham matrix describing the dispersion :param derivative: also return the derivatives adds two output with dS/dstick, dS/dbingham :return: signal attenuation (1 for unattenuated signal) """ Q = bingham - dstick * bmat if not derivative: res = bingham_normalization(Q) / bingham_normalization(bingham) return res num, der_num = der_bingham_normalization(Q) denum, der_denum = der_bingham_normalization(bingham) dS_ddstick = -(der_num * bmat).sum((-1, -2)) / denum dS_dbingham = der_num / denum - (num / denum ** 2)[:, None, None] * der_denum return num / denum, dS_ddstick, dS_dbingham
[docs]def zepp_to_signal(angles, params, bmat, derivative=False): """ Converts 4/5 parameters into a signal The 4 or 5 parameters correspond to - primary dispersion eigenvalue - secondary dispersion eigenvalue - diffusion anisotropy - MD - log(S_0) / b (or just MD if S0 is also provided) - optionally S0 :param angles: (theta, phi, and psi) :param params: (log of k1, log of k2, dpara - dperp, maximum signal) :param bmat: (N, 3, 3) b-matrices :param derivative: compute the derivative if requested (adds two outputs with dS/dangles and dS/dparams) :return: signal attenuation """ params = np.asarray(params) angles = np.asarray(angles) if len(params) == 4: lk1, lk2, anisotropy, MD = params S0 = 1 elif len(params) == 5: lk1, lk2, anisotropy, MD, S0 = params else: raise ValueError('4 or 5 parameters expected') if len(angles) != 3: raise ValueError('3 angles expected') bh = bingham_matrix(lk1, lk2, angles[0], angles[1], angles[2], derivative=derivative) bval = np.trace(bmat, axis1=-1, axis2=-2) if not derivative: return S0 * np.exp(-bval * MD) * np.exp(bval * anisotropy / 3) * stick_dispersion(anisotropy, bmat, bh) bh, dbh_dparams = bh stick_res, dstick_dd, dstick_dbh = stick_dispersion(anisotropy, bmat, bh, derivative=True) pre_fix = np.exp(-bval * MD) * np.exp(bval * anisotropy / 3) dS_dparams = np.zeros((len(params), ) + bmat.shape[:-2]) dS_dangle = S0 * pre_fix * (dbh_dparams[2:, None, :, :] * dstick_dbh).sum((-1, -2)) dS_dparams[:2] = S0 * pre_fix * (dbh_dparams[:2, None, :, :] * dstick_dbh).sum((-1, -2)) dS_dparams[2] = S0 * pre_fix * (dstick_dd + bval / 3 * stick_res) dS_dparams[3] = -bval * S0 * pre_fix * stick_res if len(params) == 5: dS_dparams[4] = pre_fix * stick_res return S0 * pre_fix * stick_res, dS_dangle, dS_dparams
[docs]def params_to_signal(params, bmat_list, include_S0=False, derivative=False): """ Converts parameters for one or more b-shells into a signal First three parameters are the three angles defining the orientation for each component there are two more parameters with the: 1. log(k1) 2. log(k2) 3. d2-d1 4. MD - log(S_0) / b (or MD if S0 is also provided) 5. optionally S0 :param params: (3 + 4/5 * N, ) array for N b-shells :param bmat_list: (N, ) list of lists containing the b-matrices :param include_S0: include a parameter representing S0 rather than letting it be submerged in MD :param derivative: also return the (N, ) tuple with (3 + 4/5 * N, M) array with the derivatives :return: (N, ) tuple of lists with the signal attenuations (or full signal if include_S0 is True) """ nparams = len(params) nshells = len(bmat_list) nparams_per_shell = 5 if include_S0 else 4 assert nparams == nshells * nparams_per_shell + 3 res = [] der_res = [] for idx, bmat in enumerate(bmat_list): signal = zepp_to_signal( params[:3], params[3 + nparams_per_shell * idx: 3 + nparams_per_shell * (idx + 1)], bmat, derivative=derivative ) if derivative: signal, dS_dangle, dS_dparams = signal der_res.append(np.zeros((nparams, signal.size))) der_res[-1][:3, :] = dS_dangle der_res[-1][3 + nparams_per_shell * idx: 3 + nparams_per_shell * (idx + 1), :] = dS_dparams res.append(signal) else: res.append(signal) if derivative: return tuple(res), tuple(der_res) return tuple(res)
[docs]def params_to_error(params, observed, bmat_list, include_S0=False, derivative=False): """ Computes the error between the predicted and observed signal attenuations :param params: (3 + 4/5 * N, ) array for N b-shells (see params_to_signal for more details) :param observed: (N, ) list of array with the observed signal attenuations in each b-shell :param bmat_list: (N, ) list of lists containing the b-matrices :param include_S0: include a parameter representing S0 rather than letting it be submerged in MD :param derivative: also return the (3 + 4/5 * N, ) array with the error derivative :return: float with the Euclidean error """ try: signal_full = params_to_signal(params, bmat_list, include_S0=include_S0, derivative=derivative) except LinAlgError: if derivative: return np.inf, np.zeros(len(params)) else: return np.inf if not derivative: total = 0. for sig, obs in zip(signal_full, observed): total += np.sum((sig - obs) ** 2) return total signal, der_signal = signal_full total = 0. total_der = np.zeros(len(params)) for sig, der_sig, obs in zip(signal, der_signal, observed): total += np.sum((sig - obs) ** 2) total_der += 2 * np.sum((sig - obs) * der_sig, -1) return total, total_der
[docs]def params_to_non_central_logp(params, observed, bmat_list, include_S0=False, derivative=False, ncoils=1, fixed_noise_var=None): """ Computes the log(p) of the Non-central-chi fit to the data :param params: (4 + 4/5 * N, ) array for N b-shells (last parameter is the log of the noise variance, for others see params_to_signal) :param observed: (N, ) list of array with the observed signal attenuations in each b-shell :param bmat_list: (N, ) list of lists containing the b-matrices :param include_S0: include a parameter representing S0 rather than letting it be submerged in MD :param derivative: also return the (3 + 4/5 * N, ) array with the error derivative :param ncoils: number of coils (set to 1 for the Rician noise distribution) :param fixed_noise_var: noise variance (if not set, last parameters is log(noise variance) :return: float with the log(p) """ p_signal = params[:-1] if fixed_noise_var is None else params try: signal_full = params_to_signal(p_signal, bmat_list, include_S0=include_S0, derivative=derivative) except LinAlgError: if derivative: return -np.inf, np.zeros(len(params)) else: return -np.inf noise_var = np.exp(params[-1]) if fixed_noise_var is None else fixed_noise_var total = 0. if not derivative: for sig, obs in zip(signal_full, observed): total += np.sum(log_non_central_chi(obs, sig, noise_var, ncoils)) return total signal, der_signal = signal_full total_der = np.zeros(len(params)) for sig, der_sig, obs in zip(signal, der_signal, observed): res = log_non_central_chi(obs, sig, noise_var, ncoils, derivative="both" if fixed_noise_var is None else "signal") total += np.sum(res[0]) signal_slice = slice(None) if fixed_noise_var is None: signal_slice = slice(None, -1) total_der[-1] += np.sum(res[2]) * noise_var total_der[signal_slice] += np.sum( der_sig * res[1], axis=-1 ) return total, total_der
[docs]def angles_from_dti(V1, V2, V3): """ Returns the Euler angles based on DTI data :param V1: (N, 3) array of primary eigenvector :param V2: (N, 3) array of secondary eigenvector :param V3: (N, 3) array of tertiary eigenvector :return: (N, 3) array of Euler angles (phi, theta, and psi) """ return np.stack(spherical.mat2euler( np.stack([V3, V2, V1], -1) ), -1)
[docs]class Fitter(object):
[docs] def __init__(self, data_list, bmat_list, include_S0=False, ncoils=0, noise_var=None): """ Provides a fit for a single voxel :param data_list: (K,) length list with (M_k, ) array for K shells of N voxels with M_k acquisitions :param bmat_list: (K,) length list with (M_k, 3, 3) array of b-matrices :param include_S0: if True include a parameter representing the S0 for every shell :param ncoils: If set to non-zero: assumes a non-central model for the noise with 2 ncoils degrees of freedom rather than Gaussian :param noise_var: Set to fix the noise variance rather than allow it to be a free parameters (only used if ncoils is non-zero) :return: best-fit parameters (same length as init_params) """ self.data_list = data_list self.bmat_list = bmat_list self.include_S0 = include_S0 self.ncoils = ncoils self.noise_var = noise_var
@property def nparams_per_shell(self, ): return 5 if self.include_S0 else 4 @property def nparams(self, ): return self.nparams_per_shell * self.nshells + (4 if self.ncoils and self.noise_var is None else 3) @property def nshells(self, ): return len(self.data_list)
[docs] def single(self, init_params, ignore_angle=False): """ Provides a single fit to the full dataset at once :param init_params: initial parameter set :param ignore_angle: if True do not fit the angle parameters :return: best-fit parameters (same length as init_params) """ if init_params.size != self.nparams: raise ValueError(f"Initial parameters has incorrect size of {init_params.size} instead of {self.nparams}") bounds = np.zeros((self.nparams, 2)) bounds[()] = (-np.inf, np.inf) bounds[3:-1:self.nparams_per_shell] = (-5, 5) bounds[4:-1:self.nparams_per_shell] = (-5, 5) bounds[5::self.nparams_per_shell] = (0., np.inf) if self.include_S0: bounds[6::self.nparams_per_shell] = (0, np.inf) bounds[7::self.nparams_per_shell] = (0, np.inf) if ignore_angle: full_params = np.zeros(self.nparams) full_params[:3] = init_params[:3] if self.ncoils: if self.noise_var is None: full_params[-1] = init_params[-1] use_slice = slice(3, -1) if self.noise_var is None else slice(3, None) def tofit(params): full_params[use_slice] = params f, g = params_to_non_central_logp(full_params, self.data_list, self.bmat_list, self.include_S0, True, ncoils=self.ncoils, fixed_noise_var=self.noise_var) return -f, -g[use_slice] actual_init = init_params[use_slice] bounds = bounds[use_slice] else: def tofit(params): full_params[3:] = params f, g = params_to_error(full_params, self.data_list, self.bmat_list, self.include_S0, True) return f, g[3:] actual_init = init_params[3:] bounds = bounds[3:] else: actual_init = init_params if self.ncoils: def tofit(params): f, g = params_to_non_central_logp(params, self.data_list, self.bmat_list, self.include_S0, True, ncoils=self.ncoils, fixed_noise_var=self.noise_var) return -f, -g else: def tofit(params): return params_to_error(params, self.data_list, self.bmat_list, self.include_S0, True) voxel_fit = optimize.minimize(tofit, actual_init, method='l-bfgs-b', bounds=bounds, jac=True, options={'eps': 1e-5, 'maxiter': 500}) if ignore_angle: if self.ncoils and self.noise_var is None: full_params[3:-1] = voxel_fit.x else: full_params[3:] = voxel_fit.x return full_params else: return voxel_fit.x
[docs] def per_shell(self, init_params): """ Fits the parameters for every shell separately """ res = np.zeros(init_params.shape) res[:3] = init_params[:3] if self.ncoils: res[-1] = init_params[-1] for idx in range(self.nshells): sub_fitter = Fitter( data_list=[self.data_list[idx]], bmat_list=[self.bmat_list[idx]], include_S0=self.include_S0, ncoils=self.ncoils, noise_var=self.noise_var ) sub_params = np.concatenate( (init_params[:3], init_params[self.nparams_per_shell * idx + 3:self.nparams_per_shell * (idx + 1) + 3]), ) if self.ncoils and self.noise_var is None: sub_params = np.append(sub_params, init_params[-1]) shell_fit = sub_fitter.single(sub_params, ignore_angle=True) if self.ncoils and self.noise_var is None: shell_fit = shell_fit[:-1] res[self.nparams_per_shell * idx + 3:self.nparams_per_shell * (idx + 1) + 3] = shell_fit[3:] return res
[docs] def only_angle(self, init_params): """ Fits only the angular parameters """ full_params = np.zeros(self.nparams) full_params[3:] = init_params[3:] if self.ncoils: if self.noise_var is None: actual_init = np.append(init_params[:3], init_params[-1]) else: actual_init = init_params[:3] def tofit(params, data_list, bmat_list, include_S0, derivative=True): full_params[:3] = params[:3] if self.noise_var is None: full_params[-1] = params[-1] f, g = params_to_non_central_logp(full_params, data_list, bmat_list, include_S0, derivative, ncoils=self.ncoils, fixed_noise_var=self.noise_var) grad = g[:3] if self.noise_var is None: grad = np.append(grad, g[-1]) return -f, -grad else: actual_init = init_params[:3] def tofit(params, data_list, bmat_list, include_S0, derivative=True): full_params[:3] = params f, g = params_to_error(full_params, data_list, bmat_list, include_S0, derivative) return f, g[:3] voxel_fit = optimize.minimize(tofit, actual_init, method='l-bfgs-b', args=(self.data_list, self.bmat_list, self.include_S0, True), jac=True, options={'eps': 1e-5, 'maxiter': 500}) full_params[:3] = voxel_fit.x[:3] if self.ncoils and self.noise_var is None: full_params[-1] = voxel_fit.x[3] return full_params
[docs] def iterate(self, init_params): """ Iterates between fitting the individual shells and the angles """ if self.ncoils: gauss_init_params = init_params[:-1] if self.noise_var is None else init_params gauss_params = Fitter(self.data_list, self.bmat_list, self.include_S0, ncoils=0).iterate(gauss_init_params) if self.noise_var is None: gauss_signal = np.concatenate(params_to_signal(gauss_params, self.bmat_list, self.include_S0), 0) init_params = np.append(gauss_params, np.log(np.var(gauss_signal - np.concatenate(self.data_list, 0)))) else: init_params = gauss_params res = self.per_shell(init_params) for _ in range(2 if self.ncoils else 3): angle = self.only_angle(res) for idx in range(3, 3 + self.nparams_per_shell): use_slc = slice(idx, -1, self.nparams_per_shell) if self.ncoils and self.noise_var is None else slice(idx, None, self.nparams_per_shell) angle[use_slc] = np.median(angle[use_slc]) res = self.per_shell(angle) return res
[docs]def run(data, sidecar: AcquisitionParams, indices=None, dti_vectors=None, include_S0=False, ncoils=False, noise_var=None, return_fitter=False, init_params=None): """ Fits the dispersing zeppelin model to the observed signal attenuation :param data: (N, M) array for N voxels and M gradient orientations :param sidecar: describes the acquired data :param indices: index of which shell each observation will be grouped into (default: all in the same shell) :param dti_vectors: tuple with (V1, V2, and V3), i.e., 3 (N, 3) arrays :param include_S0: include a parameter representing S0 rather than letting it be submerged in MDprime :param ncoils: Assumes a non-central chi model with 2 * ncoils dof for the noise rather than Gaussian :param noise_var: noise variance (treated as variable if not set; not recommended) :param return_fitter: debugging option to return the fitter :param init_params: initial values for the parameters log(k1), log(k2), d2-d1, MD - log(S_0) / b (or MD if include_S0 is true), S0 if include_S0) :return: Tuple with: - Acquisition parameters per shell - best-fit parameters per shell in a pandas dataframe """ if sidecar.n != data.shape[-1]: raise ValueError(f'XPS file describes {sidecar.n} volumes, but data contains {data.shape[-1]} volumes') if indices is None: indices = np.zeros(data.shape[-1], dtype='i4') nshells = max(indices) + 1 logger.info(f'Total number of shells: {nshells}') bmat_list = [sidecar['btensor'][indices == idx] / 1e3 for idx in range(nshells)] data_list = [data[:, indices == idx] for idx in range(nshells)] bvals = [np.median(np.trace(bmat, axis1=-1, axis2=-2)) for bmat in bmat_list] logger.info(f'Median b-value per shell: {bvals}') nparams_per_shell = 5 if include_S0 else 4 nparams = nshells * nparams_per_shell + (4 if ncoils and noise_var is None else 3) res = np.zeros((data_list[0].shape[0], nparams)) + 0.1 res[:, 3::nparams_per_shell] = 1.5 res[:, 4::nparams_per_shell] = 1. res[:, 5::nparams_per_shell] = 1. if ncoils and noise_var is None: res[:, -1] = 0 # set initial dispersion/diffusion parameters if include_S0: res[:, 6::nparams_per_shell] = np.array([np.amax(d, -1) for d in data_list]).T # mean signal relative to max res[:, 7::nparams_per_shell] = np.array([-np.log(np.mean(d, -1) / np.amax(d, -1)) / b for b, d in zip(bvals, data_list)]).T else: # mean signal relative to 1 res[:, 6::nparams_per_shell] = np.array([-np.log(np.mean(d, -1)) / b for b, d in zip(bvals, data_list)]).T if init_params is not None: for idx, value in enumerate(init_params): res[:, (idx + 3)::nparams_per_shell] = value if dti_vectors is not None: # the primary eigenvector should point in the z-direction after inverse rotation res[:, :3] = angles_from_dti(*dti_vectors) if return_fitter: return [(Fitter([d[idx] for d in data_list], bmat_list, include_S0=include_S0, ncoils=ncoils, noise_var=noise_var), res[idx]) for idx in range(res.shape[0])] start = time.time() for idx in range(res.shape[0]): fitter = Fitter([d[idx] for d in data_list], bmat_list, include_S0=include_S0, ncoils=ncoils, noise_var=noise_var) try: res[idx] = fitter.iterate(res[idx]) except Exception as e: logger.warning(f'Failed for voxel {idx} with message:') logger.warning(str(e)) res[idx] = np.nan if idx == 9 or idx % max(int(res.shape[0] / 20), 1) == 0: logger.info('Processed {} voxels with an average processing time of {} ms'.format( idx + 1, int(1000 * (time.time() - start) / (idx + 1)))) return ( sidecar.groupby(indices, drop=('theta', 'phi', 'psi', 'b_delta')), clean(res, include_S0=include_S0, non_central=ncoils and noise_var is None) )
[docs]def clean(parameters, include_S0, non_central): """ Cleans the parameters :param parameters: (M, (3 or 4) + (4 or 5) * N) array of best-fit parameters for N shells and M voxels :param include_S0: Whether S0 was included as a separate parameter in the fitting :param non_central: Whether the noise variance in the non-central chi noise was estimated :return: (M, ) pandas dataframe with the labeled best-fit parameters """ nparams_per_shell = 5 if include_S0 else 4 # clean the angles parameters = swap_dispersion(parameters, nparams_per_shell) angles_arr = spherical.clean_euler(parameters[:, 0], parameters[:, 1], parameters[:, 2]) res = [ ('phi', angles_arr[0]), ('theta', angles_arr[1]), ('psi', angles_arr[2]), ] rotate_mat = spherical.euler2mat(parameters[:, 0], parameters[:, 1], parameters[:, 2]) for idx2, name in enumerate(('sec_disp', 'main_disp', 'dyad')): for idx1, dim in enumerate('xyz'): res.append((f'{name}_{dim}', rotate_mat[:, idx1, idx2])) # clean the other parameters for idx_param, label in zip( range(3, parameters.shape[-1] - 1, nparams_per_shell), ('_' + letter for letter in string.ascii_uppercase), ): if parameters.shape[-1] == (4 if non_central else 3) + nparams_per_shell: label = '' lk1, lk2, anis, MD = parameters[:, idx_param: idx_param + 4].T as_dict = [ ('log_k1', lk1), ('log_k2', lk2), ('k1', np.exp(lk1)), ('k2', np.exp(lk2)), ('disp1', calc_dispersion(np.exp(lk1))), ('disp2', calc_dispersion(np.exp(lk2))), ('anisotropy', anis / 1e3), # unit conversion to mm^2/s ] if include_S0: as_dict.append(('MD', MD / 1e3)) as_dict.append(('S0', parameters[:, idx_param + 4])) else: as_dict.append(('MDprime', MD / 1e3)) res.extend((key + label, value) for key, value in as_dict) if non_central: res.append(('global_noise', np.exp(parameters[:, -1]))) return pd.DataFrame(dict(res))
[docs]def swap_dispersion(res_fit, nparams_per_shell=4): """ Ensures that the dispersion along axis 2 is greater than across axis 1 :param res_fit: input parameters :return: parameters with the same signal """ disp1_degrees = calc_dispersion(np.exp(res_fit[..., 3:-2:nparams_per_shell].mean(-1))) disp2_degrees = calc_dispersion(np.exp(res_fit[..., 4:-2:nparams_per_shell].mean(-1))) swap = disp1_degrees > disp2_degrees logger.info('swapping dispersion for %d out of %d voxels', swap.sum(), swap.size) res = res_fit.copy() res[swap, 2] = (res[swap, 2] + np.pi / 2) % np.pi res[swap, 3:-2:nparams_per_shell] = res_fit[swap, 4:-2:nparams_per_shell] res[swap, 4:-2:nparams_per_shell] = res_fit[swap, 3:-2:nparams_per_shell] return res
@np.vectorize def calc_dispersion(k): """ Computes the angular dispersion from the kappa parameter Finds the angle in degrees that contains 50% of the fibres """ zero_point = -special.erfi(np.sqrt(k) * np.cos(0)) end_point = -special.erfi(np.sqrt(k) * np.cos(np.pi/2.)) half_point = (zero_point + end_point) / 2. res = optimize.minimize_scalar(lambda x: (half_point + special.erfi(np.sqrt(k) * np.cos(x))) ** 2, bounds=(0, np.pi/2.), method='Golden') return res.x / np.pi * 180. @np.vectorize def calc_k(angle): """ Computes the kappa parameter from the angular dispersion :param angle: angle containing 50% of the fibres :return: kappa parameter """ return optimize.minimize_scalar( lambda x: (special.erfi(np.sqrt(x) * np.cos(angle * np.pi / 180.)) / special.erfi(np.sqrt(x)) - 0.5) ** 2 ).x
[docs]def run_from_args(args): """ Runs the script based on a Namespace containing the command line arguments """ img = nibabel.load(args.mask) mask = img.get_data() > 0 sidecar = AcquisitionParams.read(args.sidecar) signal = nibabel.load(args.data).get_data()[mask] dti_vectors = None if args.DTI is not None: dti_vectors = tuple(nibabel.load('{}_V{}.nii.gz'.format(args.DTI, idx)).get_data()[mask] for idx in range(1, 4)) indices = sidecar_index.get_indices(sidecar, args) use = np.zeros(indices.shape, dtype='bool') for idx in range(max(indices) + 1): if max(sidecar['b'][indices ==idx]) > 300: use[indices == idx] = True signal = signal[:, use] sidecar = sidecar[use] indices = sidecar_index.get_indices(sidecar, args) sidecar_shell, res = run( data=signal, sidecar=sidecar, dti_vectors=dti_vectors, indices=indices, include_S0=args.include_S0, ncoils=args.ncoils, noise_var=args.noise_var ) direc, filename = op.split(args.output) def to_full(arr): full_arr = np.zeros(mask.shape + arr.shape[1:]) full_arr[mask] = arr return nibabel.Nifti1Image(full_arr, img.affine) def to_full_dyad(name): as_arr = np.stack([res[f'{name}_{dim}'] for dim in 'xyz'], -1) return to_full(as_arr) def to_full_shell(name): as_arr = np.stack([res[f'{name}_{idx_shell}'] for idx_shell in string.ascii_uppercase[:sidecar_shell.n]], -1) return to_full(as_arr) full_res = np.zeros(mask.shape + (res.shape[1], )) full_res[mask] = res tree = FileTree.read('fit_dispersion', directory=direc, basename=filename) to_full_dyad('dyad').to_filename(tree.get('dyad', make_dir=True)) to_full_dyad('main_disp').to_filename(tree.get('dyad_disp')) to_full_shell('disp1').to_filename(tree.get('dispA')) to_full_shell('disp2').to_filename(tree.get('dispB')) to_full_shell('anisotropy').to_filename(tree.get('microA')) if args.include_S0: to_full_shell("MD").to_filename(tree.get('MD')) to_full_shell("S0").to_filename(tree.get('S0')) else: to_full_shell("MDprime").to_filename(tree.get('MDprime')) if args.ncoils and args.noise_var is not None: to_full(res['global_noise']).to_filename(tree.get('global_noise')) sidecar_shell.write(tree.get('sidecar'))
[docs]def add_to_parser(parser): """ Creates the parser of the command line arguments """ parser.add_argument('data', help='NIFTI file with diffusion data') parser.add_argument('sidecar', help='.json or .mat file with the sequence information (see sidecar_merge)') parser.add_argument('output', help='output basename') parser.add_argument('-m', '--mask', help='mask which voxels should be used') parser.add_argument('--DTI', help='basename of a DTI fit ot the LTE data (used to initialize the fit)') parser.add_argument('-S0', '--include_S0', action='store_true', help='Include S0 as a separate parameter in the fitting rather than including it in the MDprime') parser.add_argument('-nc', '--ncoils', type=int, help='Model the noise as Non-central distribution with ' + 'ncoils * 2 degrees of freedom rather than Gaussian') parser.add_argument('-var', '--noise_var', type=float, help='Variance of the noise as estimated outside of the brain or in the CSF ' '(only used if ncoils is set; default: treat as variable)') sidecar_index.add_index_params(parser, exclude=('b_delta',))