Scripts included in gyral_structure

An example of combining these scripts to model the fibre configuration in the gyral white matter can be found here.

gs_mask: Creates a mask of the gyral white matter

#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser("""Creates a mask of the gyral white matter.

The final mask:
1. Includes any white matter voxels, where the straight distance between the gyral walls is less than `width` mm (only counting gyral walls part of the surface mask).
2. Includes any white matter voxels within `dist` mm from the white/gray matter boundary (ignored by default)
3. Includes any voxels intersected by the masked white/gray matter boundary
4. Excludes any voxels closer to the unmasked than the masked surface (even if it meets the criteria above) 

Any holes in the mask will be filled.
""")
parser.add_argument('white', help='GIFTI (.surf.gii) file with the white/gray matter boundary')
parser.add_argument('white_mask',
                    help='GIFTI (.shape.gii) mask with only gyral white matter between non-zero vertices included')
parser.add_argument('reference', help='reference volume')
parser.add_argument('out', help='Output mask')
parser.add_argument('-w', '--width', default=10., type=float,
                    help='maximum width between gyral walls in mm (default: 10)')
parser.add_argument('-d', '--dist', default=0., type=float,
                    help='any voxels within `dist` mm below the white/gray matter boundary will be ' +
                         'included in the mask (default: 0)')
parser.add_argument("-N", "--norient", default=300, type=int,
                    help="Number of random line orientations to average over (only if algorithm='line'); " +
                         "default: 300")
args = parser.parse_args()

from mcutils.surface import CorticalMesh, grid, orientation
import nibabel as nib
import numpy as np
from scipy import ndimage

# load the input data
white = CorticalMesh.read(args.white)
white_mask = nib.load(args.white_mask).darrays[0].data != 0
masked_white = white[white_mask]

img = nib.load(args.reference)

# distance mask
white_dist = grid.signed_distance(white, img.shape, affine=img.affine)
dist_mask = (-args.dist < white_dist) & (white_dist < 0)

# thickness mask
wo = orientation.WeightedOrientation(masked_white, masked_white, np.zeros(masked_white.nvertices),
                                     resolution_grid=1., target_affine=img.affine)
_, thickness = wo.average_line_grid(white_dist < 0, norient=args.norient)
thickness_mask = thickness[..., 0] <= args.width

# intersection mask (includes all voxels intersected by the masked white/gray matter boundary
intersect_mask = grid.intersect(masked_white, img.shape, img.affine).has_hit != -1

# exclude voxels closer to unmasked vertices than masked vertices
idx = grid.closest_surface(white, (white_dist <= 3) & (white_dist != 0), img.affine)
close_to_white_mask = white_mask[idx]
close_to_white_mask[idx == -1] = False

gyral_wm_mask = (dist_mask | thickness_mask) & (white_dist < 0)

final_mask = (gyral_wm_mask | intersect_mask) & close_to_white_mask

idx_deepest = np.unravel_index(white_dist.argmin(), shape=white_dist.shape)
if final_mask[idx_deepest]:
    raise ValueError("Even the deepest white matter point was included in the gyral mask.")

# Fill any holes left in the gyral white matter
filled = ndimage.binary_fill_holes(final_mask)

if filled[idx_deepest]:
    # Filling holes in the gyral white matter also filled the deep white matter
    # Let's fix that
    new_filled = filled ^ final_mask
    labels, _ = ndimage.label(new_filled)
    filled[labels[idx_deepest] == labels] = False

nib.Nifti1Image(filled, affine=None, header=img.header).to_filename(args.out)

gs_fit: Fits vector field inside gyral white matter

#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser('Script to fit a vector field to the gyral white matter')
parser.add_argument('gyral_wm_mask', help='NIFTI file with gyral WM mask (e.g., from `gs_mask`)')
parser.add_argument('white', help='GIFTI (.surf.gii) file with white/gray matter boundary (used to put the surface constraints)')
parser.add_argument('pial', help='GIFTI (.surf.gii) file with the pial surface (used to place the initial charges)')
parser.add_argument('surf_mask',
                    help='GIFTI (.shape.gii) mask setting where surface constraints will be imposed')
parser.add_argument('dyads',
                    help='NIFTI file with dyads that need to be followed')
parser.add_argument('out',
                    help='HDF5 file with the best-fit basis functions')
parser.add_argument('-m', '--mid-thickness', default=None,
                    help='GIFTI (.surf.gii) file with mid-thickness (optional to also put surface constraints at mid-thickness)')
parser.add_argument('-ns', '--nsplit-surf', default=2, type=int,
                    help='Number of sub-triangles to split each surface element along each dimension')
parser.add_argument('-nv', '--nsplit-vol', default=2, type=int,
                    help='Number of sub-voxels to split each volume element along each dimension')
parser.add_argument('--wL2', default=1e-3, type=float,
                    help='Weight of the L2 constraint on the volumetric fibre density')
parser.add_argument('--wdyad', default=1., type=float,
                    help='Weight of the cost of alignment with dyads')
parser.add_argument('--wdensity', default=1., type=float,
                    help='Weight of the cost of target density constraint across the surface')
parser.add_argument('--wradial', default=1, type=float,
                    help='Weight of the radial constraint on the surface fibre density')
parser.add_argument('--wsmoothness', default=0., type=float,
                    help='Weight of the smoothness contraint on the orientation between neighbouring voxels')
parser.add_argument('-s1', '--first_rbf', default=20., type=float,
                    help='RBF Dipole size during the first optimisation step in mm (default: 20)')
parser.add_argument('-s2', '--second_rbf', default=7., type=float,
                    help='RBF Dipole size during the second optimisation step in mm (default: 7)')
parser.add_argument('-td', '--target_density',
                    help='GIFTI (.shape.gii) file with the target surface density in number of streamlines per vertex '
                         '(default: uniform per unit of cortical volume).'
                         'Can be set to "white", "mid", or "pial" to be uniform on that surface.')
parser.add_argument('--ignore_bad_dyads', action='store_true',
                    help='ignore alignment constraint for voxels where the dyad is undefined')
args = parser.parse_args()

import nibabel as nib
import numpy as np
from gyral_structure import basis, cost, request, utils
from mcutils.surface import CorticalMesh, grid, Cortex
from warnings import warn
from nibabel import affines


# define where the vector field will be evaluated
white_request = request.read_surface(args.white, args.surf_mask, nsplit=args.nsplit_surf)
if args.mid_thickness is not None:
    mid_request = request.read_surface(args.mid_thickness, args.surf_mask, nsplit=args.nsplit_surf)
vol_request = request.read_volume(args.gyral_wm_mask, nsplit=args.nsplit_vol)


# define the cost functions
L2 = cost.dens.L2Density(vol_request) / vol_request.npos

dyads_img = nib.load(args.dyads)
dyads_arr = dyads_img.get_fdata()[nib.load(args.gyral_wm_mask).get_fdata() != 0]
mm_dyads_arr = utils.dyad_to_mm(dyads_arr, dyads_img.affine)
bad_dyads = (dyads_arr == 0).all(-1)
if bad_dyads.any():
    if not args.ignore_bad_dyads:
        raise ValueError(f"Dyads not defined for {bad_dyads.sum()} voxels in gyral white matter mask. Set --ignore_bad_dyads flag to ignore this issue")
    warn(f"Dyads not defined for {bad_dyads.sum()} voxels in gyral white matter mask. Ignoring these voxels when computing alignment")
    mm_dyads_arr[bad_dyads] = 0
assert np.isfinite(mm_dyads_arr).all()
aligned = cost.orient.Watson(vol_request, field=mm_dyads_arr) / vol_request.npos

voxel_size = affines.voxel_sizes(dyads_img.affine)
smoothness = cost.meta.Smoothness.from_request(cost.orient.VonMises, vol_request, max_dist=voxel_size.max() * 1.1)
smoothness = smoothness / smoothness.index1.size

surf_mask = nib.load(args.surf_mask).darrays[0].data != 0
white_surf = CorticalMesh.read(args.white)[surf_mask]

if args.target_density is None:
    target_density = Cortex([white_surf, CorticalMesh.read(args.pial)[surf_mask]]).wedge_volume()
elif args.target_density in ('white', 'mid', 'pial'):
    fn = getattr(args, 'mid_thickness' if args.target_density == 'mid' else args.target_density)
    surf = CorticalMesh.read(fn)[surf_mask]
    target_density = surf.size_faces()
else:
    target_density_vertex = nib.load(args.target_density).darrays[0].data[surf_mask]
    target_density = white_surf.graph_connection_point().T.dot(target_density_vertex) / 3.

white_normal = white_surf.normal().T
white_radial = cost.orient.VonMises(white_request, field=white_normal) / white_request.npos
white_density = cost.dens.TargetInner(white_request, field=white_normal,
                                      target_inner=target_density / white_surf.size_faces()) / white_request.npos
if args.mid_thickness is not None:
    mid_surf = CorticalMesh.read(args.mid_thickness)[surf_mask]
    mid_normal = mid_surf.normal().T
    mid_radial = cost.orient.VonMises(mid_request, field=mid_normal) / mid_request.npos
    mid_uniform = cost.dens.TargetInner(mid_request, field=mid_normal,
                                        target_inner=target_density / mid_surf.size_faces()) / mid_request.npos
    radial = 0.5 * (white_radial + mid_radial)
    density = 0.5 * (white_density + mid_uniform)
else:
    radial = white_radial
    density = white_density

no_dyads = args.wdensity * density + args.wradial * radial + args.wL2 * L2
if args.wsmoothness != 0:
    no_dyads = no_dyads + args.wsmoothness * smoothness
with_dyads = no_dyads + args.wdyad * aligned


# define the basis functions
## initial charge distribution
white = CorticalMesh.read(args.white)
ref_img = nib.load(args.gyral_wm_mask)
white_dist = grid.signed_distance(white, ref_img.shape, affine=ref_img.affine)
idx_max = np.argmin(white_dist)
voxel_max = np.array(np.unravel_index(idx_max, ref_img.shape)).flatten()
mm_max = ref_img.affine[:3, :3].dot(voxel_max) + ref_img.affine[:3, -1]

masked_pial = CorticalMesh.read(args.pial)[surf_mask]
points_pial = masked_pial.vertices[:, masked_pial.faces].mean(1).T

charge_pial = -target_density

charge_max = -np.atleast_1d(charge_pial.sum())

charge_basis = basis.SumBase([
    basis.ChargeDistribution(points_pial, 0.1),
    basis.ChargeDistribution(mm_max[None, :], 0.1)
])
charge_basis.fix(0, charge_pial)
charge_basis.fix(1, charge_max)

## radial basis functions
def get_rbf(resolution=3., spacing=1./3.):
    """
    Defines the radial basis functions

    :param resolution: radius of the basis functions
    :param spacing: spacing between basis functions relative to the resolution (best between 1/2 and 1/3)
    :return: radial basis functions
    """
    req = request.MultRequest([white_request, vol_request])
    bb = req.bounding_box()
    centroids = basis.hcp_packing(bb, distance=resolution * spacing)
    use_centroids = req.min_dist(centroids) < resolution / 2.
    return basis.RadialBasis(basis.wendland(smoothness=2), centroids[use_centroids], resolution)


# initial fitting
init_basis = basis.SumBase([charge_basis, get_rbf(args.first_rbf)])
fitter = no_dyads.get_evaluator(init_basis)
res = fitter.fit(np.zeros(init_basis.nparams), verbose_every=100, tol=1e-5)
print('Initial fitting:')
print(res)
init_basis.fix(1, res.x)

# final fitting
final_basis = basis.SumBase([init_basis, get_rbf(args.second_rbf)])
fitter = with_dyads.get_evaluator(final_basis)
res = fitter.fit(np.zeros(final_basis.nparams), verbose_every=100, tol=1e-10)
print('Final fitting:')
print(res)
final_basis.fix(1, res.x)
if res.nit < 10 or (res.status != 0 and res.nit < 1000):
    final_basis.save(args.out + "_no_convergence")
    raise ValueError("Failed to converge")
final_basis.save(args.out)




gs_deform_surface: Moves white/gray matter boundary to deep white matter

#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser('Moves a surface through a vector field')
parser.add_argument('field', help='Input HDF5 file with the basis function and best-fit array')
parser.add_argument('input', help='input surface (.surf.gii file)')
parser.add_argument('mask', help='streamlines will stop when leaving this mask')
parser.add_argument('output', help='Output .surf.gii file with the streamlines')
parser.add_argument('-n', '--negative', action='store_true',
                    help='Start streamline where they enter rather than leave the cortex')
parser.add_argument('--steplength', type=float, default=0.2,
                    help='tractography step length (default: 0.2 mm)')
parser.add_argument('-S', '--nsteps', type=int, default=2000,
                    help='maximum number of steps to take per streamline (default: 2000)')
parser.add_argument('-ol', '--output_length',
                    help='Outputs streamline lengths to given directory in GIFTI file')
parser.add_argument('-ot', '--output_tracts',
                    help='Outputs streamlines as TRK file')
args = parser.parse_args()

import nibabel
import numpy as np
from gyral_structure import basis, streamline
from mcutils.surface import CorticalMesh, grid
from mcutils.utils.write_gifti import write_gifti
from numpy import linalg
from scipy import ndimage

basis_func, best_fit_arr = basis.read(args.field)
surf = CorticalMesh.read(args.input)
start_points = surf.vertices.T

img = nibabel.load(args.mask)
trck = streamline.Tracker(basis_func.param_evaluator(best_fit_arr), img.get_data(), img.affine,
                          args.steplength, args.nsteps)
lines = trck.track(start_points, nstore=1, inverse=not args.negative)

line_lengths = np.array([(len(line) - 1) * args.steplength for line in lines])

new_vertices = np.array([line[-1] for line in lines])

deep_wm = (img.get_data() == 0) & (grid.signed_distance(surf, img.shape, img.affine) < 0)
dist = ndimage.distance_transform_edt(~deep_wm)

voxel_vertices = nibabel.affines.apply_affine(linalg.inv(img.affine), new_vertices)
distances = dist[tuple(np.around(voxel_vertices).astype('i4').T)]

bad_surface = distances > max(nibabel.affines.voxel_sizes(img.affine))

print("{} out of {} vertices did not reach the deep WM and have been replaced with their neighbours".format(
    bad_surface.sum(), bad_surface.size))

pp = surf.graph_point_point(include_diagonal=False)
map_to_bad = pp[bad_surface, :]

mean_vertices = new_vertices.copy()
nneigh = map_to_bad.dot(np.ones(surf.nvertices, dtype=int))
for _ in range(100):
    mean_vertices[bad_surface] = map_to_bad.dot(mean_vertices) / nneigh[:, None]

if args.output_length is not None:
    offset = np.sqrt(((mean_vertices - surf.vertices.T) ** 2).sum(-1))
    write_gifti(args.output_length, [offset], brain_structure=surf.anatomy)

surf.vertices = mean_vertices.T
surf.write(args.output)

if args.output_tracts:
    hdr = nibabel.streamlines.trk.TrkFile.create_empty_header()
    hdr['dimensions'] = img.shape
    hdr['voxel_sizes'] = abs(img.affine[:3, :3]).sum(0)
    trac = nibabel.streamlines.Tractogram(lines, affine_to_rasmm=linalg.inv(img.affine))
    tracfile = nibabel.streamlines.trk.TrkFile(trac, hdr)
    nibabel.streamlines.save(tracfile, args.output_tracts)

gs_dyad_vol: Evaluates vector field fibre density and orientation in a volume

#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser('Computes the fibre orientation and optionally density in a volume image')
parser.add_argument('field', help='Input HDF5 file with the basis function and best-fit array')
parser.add_argument('mask', help='reference volume on which the dyads will be computed for all non-zero voxels')
parser.add_argument('dyad', help='Output volume filename with fibre orientations')
parser.add_argument('-d', '--density', help='Output volume filename with fibre densities')
parser.add_argument('--nsplit', type=int, default=1, help='number of sub-voxels/sub-triangles to split each element into along each dimension (default: 1)')
args = parser.parse_args()

import nibabel
from gyral_structure import basis, request, utils
import scipy as sp

basis_func, best_fit_arr = basis.read(args.field)
img = nibabel.load(args.mask)

mask = img.get_data() != 0
req = request.read_volume(img, nsplit=args.nsplit)

field = req.get_evaluator(basis_func)(best_fit_arr)

density = sp.zeros(mask.shape)
density[mask] = sp.sqrt((field ** 2).sum(-1))

dyad = sp.zeros(mask.shape + (3, ))
dyad[mask] = utils.dyad_from_mm(field, img.affine)

nibabel.Nifti1Image(dyad, img.affine).to_filename(args.dyad)
if args.density is not None:
    nibabel.Nifti1Image(density, img.affine).to_filename(args.density)

gs_dyad_surf: Evaluates vector field fibre density and orientation on a surface

#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser('Computes the fibre orientation & density on a surface')
parser.add_argument('field', help='Input HDF5 file with the basis function and best-fit array')
parser.add_argument('surface', help='reference surface on which the dyads will be computed')
parser.add_argument('output', help='Output CIFTI (.dscalar.nii) file with densities and dyads')
parser.add_argument('--nsplit', type=int, default=1, help='number of sub-voxels/sub-triangles to split each element into along each dimension (default: 1)')
parser.add_argument('-m', '--mask', help='GIFTI filename with surface mask (all non-zero vertices will be included)')
args = parser.parse_args()

import nibabel
from gyral_structure import basis, request, utils
from mcutils.surface import CorticalMesh
import scipy as sp
import cifti

basis_func, best_fit_arr = basis.read(args.field)
img = nibabel.load(args.surface)
mask = nibabel.load(args.mask).darrays[0].data != 0
surf = CorticalMesh.read(img)[mask]
req = request.read_surface(surf, nsplit=args.nsplit)

field = req.get_evaluator(basis_func)(best_fit_arr)


density = sp.zeros(field.shape[0])
density = sp.sqrt((field ** 2).sum(-1))
normals = surf.normal()
crossing = (normals.T * field).sum(-1)

arr_triangle = sp.stack((crossing, density, crossing / density, field[:, 0], field[:, 1], field[:, 2]), axis=0)
arr_vertices = sp.zeros((arr_triangle.shape[0], surf.nvertices), dtype='f4')
for idx in range(arr_triangle.shape[0]):
    conn, val = sp.broadcast_arrays(surf.faces, arr_triangle[idx][None, :])
    nconn = sp.bincount(conn.flatten(), minlength=surf.nvertices)
    arr_vertices[idx] = sp.bincount(conn.flatten(), val.flatten(), minlength=surf.nvertices) / nconn
bm = cifti.BrainModel.from_mask(mask, name=img.darrays[0].metadata['AnatomicalStructurePrimary'])
cifti.write(args.output, arr_vertices, (cifti.Scalar.from_names(['density (crossing)', 'density (total)', 'radiality', 'x', 'y', 'z']), bm))

gs_track: Streamline tractography through vector field

#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser('Runs tractography through a given vector field')
parser.add_argument('field', help='Input HDF5 file with the basis function and best-fit array')
parser.add_argument('surf', help='surface to start the tractography at')
parser.add_argument('mask', help='streamlines will stop when leaving this mask')
parser.add_argument('output', help='Output .trk file with the streamlines')
parser.add_argument('-N', '--nstream', type=int, default=10000, help='total number of streamlines to run (default: 10k)')
parser.add_argument('-p', '--positive', action='store_true', help='Start streamline where they leave rather than enter the cortex')
parser.add_argument('--steplength', type=float, default=0.5, help='default tractography step length (default: 0.5 mm)')
parser.add_argument('--stop', nargs='*', default=(), help='stop surface masks')
parser.add_argument('-S', '--nsteps', type=int, default=1000, help='maximum number of steps to take per streamline (default: 1000)')
args = parser.parse_args()
if len(args.stop) % 2 != 0:
    raise ValueError('stop masks should consist of surface/masks combinations')

import nibabel
from numpy import linalg
from gyral_structure import basis, request, streamline
from mcutils.surface import CorticalMesh

stop_surfaces = []
for surf_name, mask_name in zip(args.stop[::2], args.stop[1::2]):
    surf = CorticalMesh.read(surf_name)
    mask = nibabel.load(mask_name).darrays[0].data != 0
    stop_surfaces.append(surf[mask])

basis_func, best_fit_arr = basis.read(args.field)
surf = CorticalMesh.read(args.surf)
req = request.VertexRequest(surf.vertices[:, surf.faces].transpose(2, 0, 1))
field = req.get_evaluator(basis_func)(best_fit_arr)
field_dens = (surf.normal().T * field).sum(-1) * req.size()
if args.positive:
    field_dens[field_dens < 0] = 0
else:
    field_dens[field_dens > 0] = 0
norm_field_dens = field_dens * args.nstream / field_dens.sum()

start_points = streamline.surface_startpoints(req.triangles, norm_field_dens)
assert start_points.shape == (args.nstream, 3)

img = nibabel.load(args.mask)
trck = streamline.Tracker(basis_func.param_evaluator(best_fit_arr), img.get_data(), img.affine, args.steplength, args.nsteps, stop_surfaces=stop_surfaces)
points = trck.track(start_points, nstore=1, inverse=args.positive)

hdr = nibabel.streamlines.trk.create_empty_header()
hdr['dimensions'] = img.shape
hdr['voxel_sizes'] = abs(img.affine[:3, :3]).sum(0)
trac = nibabel.streamlines.Tractogram(points, affine_to_rasmm=linalg.inv(img.affine))
tracfile = nibabel.streamlines.trk.TrkFile(trac, dict(zip(hdr.dtype.names, hdr[0])))
nibabel.streamlines.save(tracfile, args.output)