"""Defines cost functions that act on the density (i.e. norm) of the vector field, independent of its orientation
- :class:`MinInner`: Encourages the density of streamlines crossing a surface exceeds a certain value
- :class:`TargetInner`: Sets a target value for the density of streamlines crossing a surface element
- :class:`InnerCost`: Sets a target value for the density of streamlines crossing a surface element (same as TargetInner)
- :class:`TotalIntersect`: Sets a target value for the total number of streamlines crossing a whole surface
- :class:`L1Density`: L1 norm in the vector field density
- :class:`L2Density`: L2 norm in the vector field density
- :class:`SameDensity`: Encourages two fields to have the same vector field density (useful as a smoothness constraint)
"""
import scipy as sp
from .core import CostFunc, SympyCost
from scipy import sparse
[docs]class MinInner(CostFunc):
"""Encourages the inner product to exceed a certain value.
C = 0 if (f \cdot F) > min_inner
C = min_inner - (f \cdot F) if (f \cdot F) < min_inner
"""
[docs] def __init__(self, request, field, min_inner=0, normed=False):
super(MinInner, self).__init__(request)
self.field = field
self.min_inner = min_inner
self.normed = normed
@property
def quadratic(self, ):
return not self.normed
symmetric = True
[docs] def cost(self, predicted_field, tosum=True):
"""Computes the cost-function given the predicted field at the provided positions.
:param predicted_field: (npos, ndim) array
:param tosum: if False return the cost evaluated for each requested position rather than the full cost
:return: float, which should be minimized (or (npos, ) array if tosum is False)
"""
if self.normed:
norm = sp.sqrt((predicted_field ** 2).sum(-1) * (self.field ** 2).sum(-1))
norm[norm == 0] = 1
offset = self.min_inner - (predicted_field * self.field).sum(-1) / norm
else:
offset = self.min_inner - (predicted_field * self.field).sum(-1)
offset[offset < 0] = 0.
if not tosum:
return offset
return offset.sum()
[docs] def dercost(self, predicted_field, tosum=True):
"""Computes the gradient of the cost-function with respect to the predicted field at the provided positions.
:param predicted_field: (npos, ndim) array
:return: (npos, ndim) defining which direction the predicted field should move to INCREASE the cost function
"""
if self.normed:
predicted_norm_sq = (predicted_field ** 2).sum(-1)
norm = sp.sqrt(predicted_norm_sq * (self.field ** 2).sum(-1))
norm[norm == 0] = 1
inner = (predicted_field * self.field).sum(-1) / norm
offset = self.min_inner - inner
field_derivative = -self.field / norm[:, None] + inner[:, None] * predicted_field / predicted_norm_sq[:, None]
else:
offset = self.min_inner - (predicted_field * self.field).sum(-1)
field_derivative = -self.field
field_derivative[offset < 0, :] = 0
return field_derivative
[docs] def hessp(self, predicted_field, pfield):
"""Compute the multiplication of the hessian (at `predicted_field`) with `pfield`.
:param predicted_field: (npos, ndim) array indicating where the hessian will be calculated
:param pfield: (npos, ndim) array indicating in which direction the hessian will be calculated
:return: (npos, ndim) of the local hessian times pfield
"""
if not self.normed:
return pfield * 0
else:
predicted_norm_sq = (predicted_field ** 2).sum(-1)
norm = sp.sqrt(predicted_norm_sq * (self.field ** 2).sum(-1))
norm[norm == 0] = 1
inner = (predicted_field * self.field).sum(-1)
offset = self.min_inner - inner / norm
result = (-3 * (inner / (norm * predicted_norm_sq ** 2) * (predicted_field * pfield).sum(-1))[:, None] * predicted_field
+ ((predicted_field * pfield).sum(-1) / (norm * predicted_norm_sq))[:, None] * self.field
+ ((self.field * pfield).sum(-1)[:, None] / (norm * predicted_norm_sq)[:, None]) * predicted_field
+ (inner / (norm * predicted_norm_sq))[:, None] * pfield)
result[offset < 0, :] = 0
return result
[docs] def qp(self, predicted_field):
"""Returns the (sparse) matrix :math:`d` and vector :math:`q` needed to rephrase the problem as a quadratic programming problem.
For a field F, minimize
:math:`(1/2) * F' P F + F' q`
where element :math:`i`, dimension :math:`d` is encoded in the vector :math:`F` as :math:`F[i + d * n]`
:param predicted_field: Returns the quadratic field at the requested position
:return: tuple with (P, q):
- P: (npos * ndim, npos * ndim) matrix
- q: (npos * ndim, ) vector
"""
if not self.quadratic:
raise ValueError("Cost function (%s) is not quadratic, so can not use quadratic programming" % self)
sz = predicted_field.size
return sparse.coo_matrix(([], ([], [])), shape=(sz, sz)), self.dercost(predicted_field).flatten()
[docs]class L1Density(SympyCost):
"""Computes the L1 norm on the fiber density.
This gives a penalty to every streamline proportional to its length
"""
_funcstr = 'sqrt(F.dot(F))'
[docs] def __init__(self, request):
super(L1Density, self).__init__(request)
[docs]class L2Density(SympyCost):
"""Computes the L2 norm on the fiber density.
This gives a penalty to every streamline proportional to the streamline length and the surrounding density
"""
_funcstr = 'F.dot(F)'
[docs] def __init__(self, request):
super(L2Density, self).__init__(request)
[docs]class TargetInner(SympyCost):
"""Encourages a certain number of streamlines to cross the surface.
c = (v - (F \cdot f)) ** 2
"""
field_symbols = {'field'}
_funcstr = "(F.dot(field) - target_inner) ** 2"
[docs] def __init__(self, request, field, target_inner=0):
super(TargetInner, self).__init__(request)
self.field = field
self.target_inner = target_inner
[docs]class SameDensity(SympyCost):
"""Encourages two fields to have the same density.
c = (\F| - |f|)^2 / (|F|^2 + |f|^2 + min_norm)
"""
symmetric = True
field_symbols = {'field'}
_funcstr = "(sqrt(F.dot(F)) - sqrt(field.dot(field))) ** 2 / (F.dot(F) + field.dot(field) + min_norm)"
[docs] def __init__(self, request, field, min_norm=1e-20):
super(SameDensity, self).__init__(request)
self.field = field
self.min_norm = min_norm
[docs]class InnerCost(CostFunc):
"""Set the inner product between the provided and predicted field.
Can be used to constrain the number of streamlines passing through a normal by setting `field` to the surface normal and `inner` to the value.
Set `flip` to True so that the streamlines can pass through in either direction.
"""
quadratic = True
symmetric = True
[docs] def __init__(self, request, field, inner, flip=False):
"""Creates a new constraint on the inner product.
:param request: Defines where the field will be evaluated
:param field: (N, ndim) array defining the field elements
:param inner: (N, ) array with the expected inner product
:param flip: If True compares the absolute of the inner product rather than the value itself
"""
super(InnerCost, self).__init__(request)
self.field = field
if (self.npos, self.ndim) != self.field.shape:
raise ValueError("Position array and vector field array shapes should match")
self.inner = sp.asarray(inner)
if (self.npos, ) != self.inner.shape or () == self.inner.shape:
raise ValueError("Inner product vector should have the correct length")
self.flip = flip
[docs] def cost(self, predicted_field, tosum=True):
"""Computes the cost-function given the predicted field at the provided positions.
:param predicted_field: (npos, ndim) array
:param tosum: if False return the cost evaluated for each requested position rather than the full cost
:return: float, which should be minimized (or (npos, ) array if tosum is False)
"""
field_dot_predicted = sp.sum(self.field * predicted_field, -1)
if self.flip:
result = ((abs(field_dot_predicted) - abs(self.inner)) ** 2)
else:
result = ((field_dot_predicted - self.inner) ** 2)
if tosum:
return result.sum()
return result
[docs] def dercost(self, predicted_field):
"""Computes the gradient of the cost-function with respect to the predicted field at the provided positions.
:param predicted_field: (npos, ndim) array
:return: (npos, ndim) defining which direction the predicted field should move to INCREASE the cost function
"""
field_dot_predicted = sp.sum(self.field * predicted_field, -1)
if self.flip:
der_in = abs(field_dot_predicted) - abs(self.inner)
der_in[field_dot_predicted < 0] *= -1
else:
der_in = field_dot_predicted - self.inner
return 2 * self.field * der_in[:, None]
[docs] def hessp(self, predicted_field, pfield):
"""Compute the multiplication of the hessian (at `predicted_field`) with `pfield`.
:param predicted_field: (npos, ndim) array indicating where the hessian will be calculated
:param pfield: (npos, ndim) array indicating in which direction the hessian will be calculated
:return: (npos, ndim) of the local hessian times pfield
"""
return 2 * self.field * sp.sum(pfield * self.field, -1)[:, None]
[docs] def qp(self, predicted_field):
"""Returns the (sparse) matrix :math:`d` and vector :math:`q` needed to rephrase the problem as a quadratic programming problem.
For a field F, minimize
:math:`(1/2) * F' P F + F' q`
where element :math:`i`, dimension :math:`d` is encoded in the vector :math:`F` as :math:`F[i + d * n]`
:param predicted_field: Returns the quadratic field at the requested position
:return: tuple with (P, q):
- P: (npos * ndim, npos * ndim) matrix
- q: (npos * ndim, ) vector
"""
weights = self.field[:, None, :] * self.field[:, :, None] * 2
indices = sp.arange(self.ndim * self.npos).reshape((self.npos, self.ndim))
_, rows, cols = sp.broadcast_arrays(weights, indices[:, None, :], indices[:, :, None])
P = sparse.coo_matrix((weights.flatten(), (rows.flatten(), cols.flatten())),
shape=(predicted_field.size, predicted_field.size))
q = self.dercost(predicted_field).flatten()
return P, q - P * predicted_field.flatten()
[docs]class TotalIntersect(CostFunc):
"""Constrains the total number of streamlines crossing a surface.
"""
quadratic = True
independent_pos = False
[docs] def __init__(self, request, normals, nstream, size=None, flip=False):
"""Creates a new constraint on the total number of streamlines crossing a surface
:param request: Defines the evaluation of the field across a surface
:param normals: (N, ndim) array defining the surface normals
:param nstream: total number of streamlines crossing a surface
:param size: (N, ) array with the size of each requested element (defaults to request.size())
:param flip: Set to True if the `nstream` can be positive or negative
"""
super(TotalIntersect, self).__init__(request)
self.normals = normals
if (self.npos, self.ndim) != self.normals.shape:
raise ValueError("Position array and normals array shapes should match")
self.nstream = nstream
if size is None:
size = self.request.size()
self.size = sp.asarray(size)
if self.size.ndim == 1:
self.size = self.size[:, None]
self.flip = flip
[docs] def cost(self, predicted_field):
"""Computes the cost-function given the predicted field at the provided positions.
:param predicted_field: (npos, ndim) array
:param tosum: if False return the cost evaluated for each requested position rather than the full cost
:return: float, which should be minimized (or (npos, ) array if tosum is False)
"""
total_stream = sp.sum(self.size * self.normals * predicted_field)
if self.flip:
return (abs(self.nstream) - abs(total_stream)) ** 2
else:
return (self.nstream - total_stream) ** 2
[docs] def dercost(self, predicted_field):
"""Computes the gradient of the cost-function with respect to the predicted field at the provided positions.
:param predicted_field: (npos, ndim) array
:return: (npos, ndim) defining which direction the predicted field should move to INCREASE the cost function
"""
total_stream = sp.sum(self.size * self.normals * predicted_field)
if self.flip:
der_in = abs(total_stream) - abs(self.nstream)
if total_stream < 0:
der_in *= -1
else:
der_in = total_stream - self.nstream
return 2 * self.normals * self.size * der_in
[docs] def hessp(self, predicted_field, pfield):
"""Compute the multiplication of the hessian (at `predicted_field`) with `pfield`.
:param predicted_field: (npos, ndim) array indicating where the hessian will be calculated
:param pfield: (npos, ndim) array indicating in which direction the hessian will be calculated
:return: (npos, ndim) of the local hessian times pfield
"""
return 2 * self.normals * sp.sum(pfield * self.normals * self.size) * self.size
[docs] def qp(self, predicted_field):
"""Returns the (sparse) matrix :math:`d` and vector :math:`q` needed to rephrase the problem as a quadratic programming problem.
For a field F, minimize
:math:`(1/2) * F' P F + F' q`
where element :math:`i`, dimension :math:`d` is encoded in the vector :math:`F` as :math:`F[i + d * n]`
:param predicted_field: Returns the quadratic field at the requested position
:return: tuple with (P, q):
- P: (npos * ndim, npos * ndim) matrix
- q: (npos * ndim, ) vector
"""
field_flat = (self.normals * self.size).flatten()
P = sparse.block_diag([field_flat * field_flat[:, None]]) * 2
q = self.dercost(predicted_field).flatten()
return P, q - P * predicted_field.flatten()