Source code for gyral_structure.cost.dens

"""Defines cost functions that act on the density (i.e. norm) of the vector field, independent of its orientation

- :class:`MinInner`: Encourages the density of streamlines crossing a surface exceeds a certain value
- :class:`TargetInner`: Sets a target value for the density of streamlines crossing a surface element
- :class:`InnerCost`: Sets a target value for the density of streamlines crossing a surface element (same as TargetInner)
- :class:`TotalIntersect`: Sets a target value for the total number of streamlines crossing a whole surface
- :class:`L1Density`: L1 norm in the vector field density
- :class:`L2Density`: L2 norm in the vector field density
- :class:`SameDensity`: Encourages two fields to have the same vector field density (useful as a smoothness constraint)
"""
import scipy as sp
from .core import CostFunc, SympyCost
from scipy import sparse


[docs]class MinInner(CostFunc): """Encourages the inner product to exceed a certain value. C = 0 if (f \cdot F) > min_inner C = min_inner - (f \cdot F) if (f \cdot F) < min_inner """
[docs] def __init__(self, request, field, min_inner=0, normed=False): super(MinInner, self).__init__(request) self.field = field self.min_inner = min_inner self.normed = normed
@property def quadratic(self, ): return not self.normed symmetric = True
[docs] def cost(self, predicted_field, tosum=True): """Computes the cost-function given the predicted field at the provided positions. :param predicted_field: (npos, ndim) array :param tosum: if False return the cost evaluated for each requested position rather than the full cost :return: float, which should be minimized (or (npos, ) array if tosum is False) """ if self.normed: norm = sp.sqrt((predicted_field ** 2).sum(-1) * (self.field ** 2).sum(-1)) norm[norm == 0] = 1 offset = self.min_inner - (predicted_field * self.field).sum(-1) / norm else: offset = self.min_inner - (predicted_field * self.field).sum(-1) offset[offset < 0] = 0. if not tosum: return offset return offset.sum()
[docs] def dercost(self, predicted_field, tosum=True): """Computes the gradient of the cost-function with respect to the predicted field at the provided positions. :param predicted_field: (npos, ndim) array :return: (npos, ndim) defining which direction the predicted field should move to INCREASE the cost function """ if self.normed: predicted_norm_sq = (predicted_field ** 2).sum(-1) norm = sp.sqrt(predicted_norm_sq * (self.field ** 2).sum(-1)) norm[norm == 0] = 1 inner = (predicted_field * self.field).sum(-1) / norm offset = self.min_inner - inner field_derivative = -self.field / norm[:, None] + inner[:, None] * predicted_field / predicted_norm_sq[:, None] else: offset = self.min_inner - (predicted_field * self.field).sum(-1) field_derivative = -self.field field_derivative[offset < 0, :] = 0 return field_derivative
[docs] def hessp(self, predicted_field, pfield): """Compute the multiplication of the hessian (at `predicted_field`) with `pfield`. :param predicted_field: (npos, ndim) array indicating where the hessian will be calculated :param pfield: (npos, ndim) array indicating in which direction the hessian will be calculated :return: (npos, ndim) of the local hessian times pfield """ if not self.normed: return pfield * 0 else: predicted_norm_sq = (predicted_field ** 2).sum(-1) norm = sp.sqrt(predicted_norm_sq * (self.field ** 2).sum(-1)) norm[norm == 0] = 1 inner = (predicted_field * self.field).sum(-1) offset = self.min_inner - inner / norm result = (-3 * (inner / (norm * predicted_norm_sq ** 2) * (predicted_field * pfield).sum(-1))[:, None] * predicted_field + ((predicted_field * pfield).sum(-1) / (norm * predicted_norm_sq))[:, None] * self.field + ((self.field * pfield).sum(-1)[:, None] / (norm * predicted_norm_sq)[:, None]) * predicted_field + (inner / (norm * predicted_norm_sq))[:, None] * pfield) result[offset < 0, :] = 0 return result
[docs] def qp(self, predicted_field): """Returns the (sparse) matrix :math:`d` and vector :math:`q` needed to rephrase the problem as a quadratic programming problem. For a field F, minimize :math:`(1/2) * F' P F + F' q` where element :math:`i`, dimension :math:`d` is encoded in the vector :math:`F` as :math:`F[i + d * n]` :param predicted_field: Returns the quadratic field at the requested position :return: tuple with (P, q): - P: (npos * ndim, npos * ndim) matrix - q: (npos * ndim, ) vector """ if not self.quadratic: raise ValueError("Cost function (%s) is not quadratic, so can not use quadratic programming" % self) sz = predicted_field.size return sparse.coo_matrix(([], ([], [])), shape=(sz, sz)), self.dercost(predicted_field).flatten()
[docs]class L1Density(SympyCost): """Computes the L1 norm on the fiber density. This gives a penalty to every streamline proportional to its length """ _funcstr = 'sqrt(F.dot(F))'
[docs] def __init__(self, request): super(L1Density, self).__init__(request)
[docs]class L2Density(SympyCost): """Computes the L2 norm on the fiber density. This gives a penalty to every streamline proportional to the streamline length and the surrounding density """ _funcstr = 'F.dot(F)'
[docs] def __init__(self, request): super(L2Density, self).__init__(request)
[docs]class TargetInner(SympyCost): """Encourages a certain number of streamlines to cross the surface. c = (v - (F \cdot f)) ** 2 """ field_symbols = {'field'} _funcstr = "(F.dot(field) - target_inner) ** 2"
[docs] def __init__(self, request, field, target_inner=0): super(TargetInner, self).__init__(request) self.field = field self.target_inner = target_inner
[docs]class SameDensity(SympyCost): """Encourages two fields to have the same density. c = (\F| - |f|)^2 / (|F|^2 + |f|^2 + min_norm) """ symmetric = True field_symbols = {'field'} _funcstr = "(sqrt(F.dot(F)) - sqrt(field.dot(field))) ** 2 / (F.dot(F) + field.dot(field) + min_norm)"
[docs] def __init__(self, request, field, min_norm=1e-20): super(SameDensity, self).__init__(request) self.field = field self.min_norm = min_norm
[docs]class InnerCost(CostFunc): """Set the inner product between the provided and predicted field. Can be used to constrain the number of streamlines passing through a normal by setting `field` to the surface normal and `inner` to the value. Set `flip` to True so that the streamlines can pass through in either direction. """ quadratic = True symmetric = True
[docs] def __init__(self, request, field, inner, flip=False): """Creates a new constraint on the inner product. :param request: Defines where the field will be evaluated :param field: (N, ndim) array defining the field elements :param inner: (N, ) array with the expected inner product :param flip: If True compares the absolute of the inner product rather than the value itself """ super(InnerCost, self).__init__(request) self.field = field if (self.npos, self.ndim) != self.field.shape: raise ValueError("Position array and vector field array shapes should match") self.inner = sp.asarray(inner) if (self.npos, ) != self.inner.shape or () == self.inner.shape: raise ValueError("Inner product vector should have the correct length") self.flip = flip
[docs] def cost(self, predicted_field, tosum=True): """Computes the cost-function given the predicted field at the provided positions. :param predicted_field: (npos, ndim) array :param tosum: if False return the cost evaluated for each requested position rather than the full cost :return: float, which should be minimized (or (npos, ) array if tosum is False) """ field_dot_predicted = sp.sum(self.field * predicted_field, -1) if self.flip: result = ((abs(field_dot_predicted) - abs(self.inner)) ** 2) else: result = ((field_dot_predicted - self.inner) ** 2) if tosum: return result.sum() return result
[docs] def dercost(self, predicted_field): """Computes the gradient of the cost-function with respect to the predicted field at the provided positions. :param predicted_field: (npos, ndim) array :return: (npos, ndim) defining which direction the predicted field should move to INCREASE the cost function """ field_dot_predicted = sp.sum(self.field * predicted_field, -1) if self.flip: der_in = abs(field_dot_predicted) - abs(self.inner) der_in[field_dot_predicted < 0] *= -1 else: der_in = field_dot_predicted - self.inner return 2 * self.field * der_in[:, None]
[docs] def hessp(self, predicted_field, pfield): """Compute the multiplication of the hessian (at `predicted_field`) with `pfield`. :param predicted_field: (npos, ndim) array indicating where the hessian will be calculated :param pfield: (npos, ndim) array indicating in which direction the hessian will be calculated :return: (npos, ndim) of the local hessian times pfield """ return 2 * self.field * sp.sum(pfield * self.field, -1)[:, None]
[docs] def qp(self, predicted_field): """Returns the (sparse) matrix :math:`d` and vector :math:`q` needed to rephrase the problem as a quadratic programming problem. For a field F, minimize :math:`(1/2) * F' P F + F' q` where element :math:`i`, dimension :math:`d` is encoded in the vector :math:`F` as :math:`F[i + d * n]` :param predicted_field: Returns the quadratic field at the requested position :return: tuple with (P, q): - P: (npos * ndim, npos * ndim) matrix - q: (npos * ndim, ) vector """ weights = self.field[:, None, :] * self.field[:, :, None] * 2 indices = sp.arange(self.ndim * self.npos).reshape((self.npos, self.ndim)) _, rows, cols = sp.broadcast_arrays(weights, indices[:, None, :], indices[:, :, None]) P = sparse.coo_matrix((weights.flatten(), (rows.flatten(), cols.flatten())), shape=(predicted_field.size, predicted_field.size)) q = self.dercost(predicted_field).flatten() return P, q - P * predicted_field.flatten()
[docs]class TotalIntersect(CostFunc): """Constrains the total number of streamlines crossing a surface. """ quadratic = True independent_pos = False
[docs] def __init__(self, request, normals, nstream, size=None, flip=False): """Creates a new constraint on the total number of streamlines crossing a surface :param request: Defines the evaluation of the field across a surface :param normals: (N, ndim) array defining the surface normals :param nstream: total number of streamlines crossing a surface :param size: (N, ) array with the size of each requested element (defaults to request.size()) :param flip: Set to True if the `nstream` can be positive or negative """ super(TotalIntersect, self).__init__(request) self.normals = normals if (self.npos, self.ndim) != self.normals.shape: raise ValueError("Position array and normals array shapes should match") self.nstream = nstream if size is None: size = self.request.size() self.size = sp.asarray(size) if self.size.ndim == 1: self.size = self.size[:, None] self.flip = flip
[docs] def cost(self, predicted_field): """Computes the cost-function given the predicted field at the provided positions. :param predicted_field: (npos, ndim) array :param tosum: if False return the cost evaluated for each requested position rather than the full cost :return: float, which should be minimized (or (npos, ) array if tosum is False) """ total_stream = sp.sum(self.size * self.normals * predicted_field) if self.flip: return (abs(self.nstream) - abs(total_stream)) ** 2 else: return (self.nstream - total_stream) ** 2
[docs] def dercost(self, predicted_field): """Computes the gradient of the cost-function with respect to the predicted field at the provided positions. :param predicted_field: (npos, ndim) array :return: (npos, ndim) defining which direction the predicted field should move to INCREASE the cost function """ total_stream = sp.sum(self.size * self.normals * predicted_field) if self.flip: der_in = abs(total_stream) - abs(self.nstream) if total_stream < 0: der_in *= -1 else: der_in = total_stream - self.nstream return 2 * self.normals * self.size * der_in
[docs] def hessp(self, predicted_field, pfield): """Compute the multiplication of the hessian (at `predicted_field`) with `pfield`. :param predicted_field: (npos, ndim) array indicating where the hessian will be calculated :param pfield: (npos, ndim) array indicating in which direction the hessian will be calculated :return: (npos, ndim) of the local hessian times pfield """ return 2 * self.normals * sp.sum(pfield * self.normals * self.size) * self.size
[docs] def qp(self, predicted_field): """Returns the (sparse) matrix :math:`d` and vector :math:`q` needed to rephrase the problem as a quadratic programming problem. For a field F, minimize :math:`(1/2) * F' P F + F' q` where element :math:`i`, dimension :math:`d` is encoded in the vector :math:`F` as :math:`F[i + d * n]` :param predicted_field: Returns the quadratic field at the requested position :return: tuple with (P, q): - P: (npos * ndim, npos * ndim) matrix - q: (npos * ndim, ) vector """ field_flat = (self.normals * self.size).flatten() P = sparse.block_diag([field_flat * field_flat[:, None]]) * 2 q = self.dercost(predicted_field).flatten() return P, q - P * predicted_field.flatten()