""" Helper functions for diamond solvers.
Mostly linear algebra/gradients/Hessians.
Includes logistic and cumulative logistic functions
"""
from scipy import sparse
from scipy.special import expit
from scipy.sparse.linalg import spsolve
import numpy as np
from diamond.solvers.repeated_block_diag import RepeatedBlockDiagonal
[docs]def dot(x, y):
"""A generic dot product for sparse and dense vectors
Args:
x: array_like, possibly sparse
y: array_like, possibly sparse
Returns:
dot product of x and y
"""
if sparse.issparse(x):
return np.array(x.dot(y))
elif isinstance(x, RepeatedBlockDiagonal):
return np.array(x.dot(y))
return np.dot(x, y)
[docs]def solve(A, b):
"""
A generic linear solver for sparse and dense matrices
Args:
A : array_like, possibly sparse
b : array_like
Returns:
x such that Ax = b
"""
if sparse.issparse(A):
return spsolve(A, b)
else:
return np.linalg.solve(A, b)
[docs]def l2_logistic_fixed_hessian(X,
Y,
fixed_hess_inv,
penaltysq=None,
min_its=2,
max_its=200,
beta=None,
offset=None,
tol=1e-3):
"""Fit a l2-regularized logistic regression with a fixed-Hessian Newtonish method.
Args:
X : array_like. Design matrix
Y : array_like. Vector of responses with values in {0, 1}
fixed_hess_inv : sparse penalty matrix
penalty_sq : array_like. Square penalty matrix
min_its : int. Minimum number of iterations
max_its : int. Maximum number of iterations
beta : array_like. Vector of initial parameters, with length == number of columns of `X`
offset: array_like. Added to X * beta. Defaults to 0
tol : float. Convergence tolerance on relative change in `beta`
Returns:
array_like. Estimated parameters
"""
if beta is None:
beta = np.zeros(X.shape[1])
if offset is None:
offset = np.zeros(X.shape[0])
if penaltysq is None:
penaltysq = np.zeros((X.shape[1], X.shape[1]))
for i in xrange(max_its):
old_beta = 1.0 * beta
# find the gradient
p = 1.0 / (1 + np.exp(-(dot(X, beta) + offset)))
grad = dot(X.T, Y - p) - dot(penaltysq, beta)
# find a step direction (with the inverse of the fixed Hessian)
step = dot(fixed_hess_inv, grad)
# find a true Newton step in the chosen direction
newton_num = dot(grad, step)
newton_den = np.sum(dot(penaltysq, step) ** 2) + dot(p * (1 - p), dot(X, step) ** 2)
step *= (newton_num / newton_den)
# just take the step already!
beta += step
change = np.linalg.norm(beta - old_beta) / np.linalg.norm(beta)
if change < tol and i > min_its:
break
return beta
[docs]def custom_block_diag(blocks):
""" create csr sparse block diagonal matrix from identically-sized blocks
Blocks don't need to be identical, but they do need to be the same shape.
"""
L = len(blocks)
K = blocks[0].shape[0]
_data = [x.flatten() for x in blocks]
m = np.arange(_data[0].shape[0])
flat_data = np.zeros(L * len(m))
for n in xrange(L):
flat_data[m + n * len(m)] = _data[n][m]
# now make the block diagonal array
i = np.repeat(np.arange(L * K), K)
j = np.tile(np.tile(np.arange(K), K), L) + np.repeat(np.arange(0, L * K, K), K * K)
return sparse.csr_matrix((flat_data, (i, j)), shape=(L * K, L * K))
[docs]def l2_clogistic_llh(X, Y, alpha, beta, penalty_matrix, offset):
""" Penalized log likelihood function for proportional odds cumulative logit model
Args:
X : array_like. design matrix
Y : array_like. response matrix
alpha : array_like. intercepts.\
must have shape == one less than the number of columns of `Y`
beta : array_like. parameters.\
must have shape == number of columns of X
penalty_matrix : array_like. Regularization matrix
offset : array_like, optional. Defaults to 0
Returns:
scalar : penalized loglikelihood
"""
offset = 0.0 if offset is None else offset
obj = 0.0
J = Y.shape[1]
Xb = dot(X, beta) + offset
for j in xrange(J):
if j == 0:
obj += dot(np.log(expit(alpha[j] + Xb)), Y[:, j])
elif j == J - 1:
obj += dot(np.log(1 - expit(alpha[j - 1] + Xb)), Y[:, j])
else:
obj += dot(np.log(expit(alpha[j] + Xb) - expit(alpha[j - 1] + Xb)), Y[:, j])
obj -= 0.5 * dot(beta, dot(penalty_matrix, beta))
return -np.inf if np.isnan(obj) else obj
[docs]def l2_clogistic_gradient(X, Y, intercept=True, **kwargs):
""" Gradient of cumulative logit model
Args:
X : array_like. design matrix
Y : array_like. response matrix
intercept : boolean. If intercept=False, just calculate the gradient for the coefficients
offset : array_like, optional. Defaults to 0
alpha : array_like, optional. intercepts.\
must have shape == one less than the number of columns of `Y`
beta : array_like, optional. parameters.\
must have shape == number of columns of X
penalty_matrix : array_like, optional. Regularization matrix
Returns:
dense array of the GRADIENT of the NEGATIVE, penalized loglikelihood function
"""
if X is not None:
# X can be None if there are no fixed effects - i.e. just fit the intercept
p = X.shape[1]
beta = kwargs.get('beta', np.zeros(X.shape[1]))
offset = kwargs.get('offset', np.zeros(X.shape[0]))
else:
p = 0
beta = None
offset = None
J = Y.shape[1]
alpha = kwargs.get('alpha', np.linspace(-1.0, 1.0, J - 1))
penalty_matrix = kwargs.get('penalty_matrix', sparse.csr_matrix((p, p)))
IL = _l2_clogistic_gradient_IL(X=X,
alpha=alpha,
beta=beta,
offset=offset,
n=Y.shape[0])
if X is None:
grad_beta = None
else:
grad_beta = _l2_clogistic_gradient_slope(X=X,
Y=Y,
IL=IL,
beta=beta,
penalty_matrix=penalty_matrix)
if intercept:
grad_alpha = _l2_clogistic_gradient_intercept(IL, Y, alpha)
# MINIMIZE NEGATIVE LOGLIKELIHOOD
if X is not None:
return -1.0 * np.concatenate([grad_alpha, grad_beta])
else:
return -1.0 * grad_alpha
else:
return -1.0 * grad_beta
def _l2_clogistic_gradient_IL(X, alpha, beta, offset=None, **kwargs):
""" Helper function for calculating the cumulative logistic gradient. \
The inverse logit of alpha[j + X*beta] is \
ubiquitous in gradient and Hessian calculations \
so it's more efficient to calculate it once and \
pass it around as a parameter than to recompute it every time
Args:
X : array_like. design matrix
alpha : array_like. intercepts. must have shape == one less than the number of columns of `Y`
beta : array_like. parameters. must have shape == number of columns of X
offset : array_like, optional. Defaults to 0
n : int, optional.\
You must specify the number of rows if there are no main effects
Returns:
array_like. n x J-1 matrix where entry i,j is the inverse logit of (alpha[j] + X[i, :] * beta)
"""
J = len(alpha) + 1L
if X is None:
n = kwargs.get("n")
else:
n = X.shape[0]
if X is None or beta is None:
Xb = 0.
else:
Xb = dot(X, beta) + (0 if offset is None else offset)
IL = np.zeros((n, J - 1L))
for j in xrange(J - 1L):
IL[:, j] = expit(alpha[j] + Xb)
return IL
def _l2_clogistic_gradient_intercept(IL, Y, alpha):
""" Gradient of penalized loglikelihood with respect to the intercept parameters
Args:
IL : array_like. See _l2_clogistic_gradient_IL
Y : array_like. response matrix
alpha : array_like. intercepts. must have shape == one less than the number of columns of `Y`
Returns:
array_like : length J-1
"""
exp_int = np.exp(alpha)
grad_alpha = np.zeros(len(alpha))
J = len(alpha) + 1L
for j in xrange(J - 1): # intercepts
# there are J levels, and J-1 intercepts
# indexed from 0 to J-2
if j == 0:
grad_alpha[j] = dot(Y[:, j], 1 - IL[:, j]) -\
dot(Y[:, j + 1], exp_int[j] / (exp_int[j + 1] - exp_int[j]) + IL[:, j])
elif j < J - 2:
grad_alpha[j] = dot(Y[:, j], exp_int[j] / (exp_int[j] - exp_int[j - 1]) - IL[:, j]) - \
dot(Y[:, j + 1], exp_int[j] / (exp_int[j + 1] - exp_int[j]) + IL[:, j])
else: # j == J-2. the last intercept
grad_alpha[j] = dot(Y[:, j], exp_int[j] / (exp_int[j] - exp_int[j - 1]) - IL[:, j]) - \
dot(Y[:, j + 1], IL[:, j])
return grad_alpha
def _l2_clogistic_gradient_slope(X, Y, IL, beta, penalty_matrix):
""" Gradient of penalized loglikelihood with respect to the slope parameters
Args:
X : array_like. design matrix
Y : array_like. response matrix
IL : array_like. See _l2_clogistic_gradient_IL
beta : array_like. parameters. must have shape == number of columns of X
penalty_matrix : array_like. penalty matrix
Returns:
array_like : same length as `beta`.
"""
grad_beta = np.zeros(len(beta))
J = Y.shape[1]
XT = X.transpose() # CSC format
for j in xrange(J): # coefficients
if j == 0:
grad_beta = dot(XT, Y[:, j] * (1.0 - IL[:, j]))
elif j < J - 1:
grad_beta += dot(XT, Y[:, j] * (1.0 - IL[:, j] - IL[:, j - 1]))
else: # j == J-1. this is the highest level of response
grad_beta -= dot(XT, Y[:, j] * IL[:, j - 1])
grad_beta -= dot(penalty_matrix, beta)
return grad_beta
[docs]def l2_clogistic_hessian(X, Y, intercept=True, **kwargs):
""" Hessian matrix of proportional odds cumulative logit model
Args:
X : array_like. design matrix
Y : array_like. response matrix
intercept : boolean. If intercept=False,\
just calculate the gradient for the coefficients
alpha : array_like, optional.intercepts.\
must have shape == one less than the number of columns of `Y`
beta : array_like, optional. parameters.\
must have shape == number of columns of X
offset : array_like, optional. Defaults to 0
penalty_matrix : array_like, optional.
Returns:
Square matrix of dimension == number of columns of `X`
"""
if X is not None:
# this is the case if there are no fixed effects, i.e. just an intercept
p = X.shape[1]
beta = kwargs.get('beta', np.zeros(X.shape[1]))
offset = kwargs.get('offset', np.zeros(X.shape[0]))
else:
p = 0
beta = None
offset = None
J = Y.shape[1]
# default arguments
alpha = kwargs.get('alpha', np.linspace(-1.0, 1.0, J - 1))
penalty_matrix = kwargs.get('penalty_matrix', sparse.csr_matrix((p, p)))
ILL = _l2_clogistic_hessian_ILL(X, alpha, beta, offset=offset, n=Y.shape[0])
if X is None:
hess_beta = None
else:
hess_beta = _l2_clogistic_hessian_slope(X=X,
Y=Y,
ILL=ILL,
penalty_matrix=penalty_matrix,
value=True)
# MINIMIZE NEGATIVE LOGLIKELIHOOD
if intercept:
hess_alpha = -1.0 * _l2_clogistic_hessian_intercept(X, Y, ILL, alpha)
if X is None:
return hess_alpha
if sparse.issparse(hess_beta):
hess_beta = hess_beta.todense()
hess_alpha[J - 1:, J - 1:] = hess_beta
return hess_alpha
else:
return hess_beta
def _l2_clogistic_hessian_ILL(X, alpha, beta, **kwargs):
""" this is the derivative of the `IL` term
Args:
X : array_like. design matrix
alpha : array_like. intercepts.\
must have shape == one less than the number of columns of `Y`
beta : array_like. parameters. \
must have shape == number of columns of X
offset : array_like, optional. Defaults to 0
n : int, optional. You must specify the number of rows if there are no main effects
Returns:
array_like. n x J-1 matrix containing terms for Hessian calculation
"""
offset = kwargs.get('offset')
if X is None:
n = kwargs.get('n')
else:
n = X.shape[0]
if X is None or beta is None:
Xb = np.zeros(n)
else:
Xb = dot(X, beta) + (0 if offset is None else offset)
ILL = np.zeros((X.shape[0], len(alpha)))
for j in xrange(len(alpha)):
ILL[:, j] = np.exp(alpha[j] + Xb) * (1.0 + np.exp(alpha[j] + Xb)) ** -2.0
return ILL
def _l2_clogistic_hessian_intercept(X, Y, ILL, alpha):
""" second derivatives of intercepts
Args:
X : array_like. design matrix
Y : array_like. response matrix
ILL : array_like. See _l2_clogistic_hessian_ILL
alpha : array_like. intercepts.\
must have shape == one less than the number of columns of `Y`
Returns:
array_like. Matrix of second derivatives of main effects and intercepts
"""
exp_int = np.exp(alpha)
J = len(alpha) + 1L
p = X.shape[1] if X is not None else 0L
hess_alpha = np.zeros((p + J - 1L, p + J - 1L))
# important to initialize hess to 0. this covers the non-adjacent intercepts
for j in xrange(J - 1):
if j == 0:
hess_alpha[j, j] = -dot(Y[:, j], ILL[:, j]) -\
np.dot(Y[:, j + 1], (exp_int[j] * exp_int[j + 1]) / (exp_int[j + 1] - exp_int[j]) ** 2.0 + ILL[:, j])
elif j < J - 2:
hess_alpha[j, j] = -dot(Y[:, j], (exp_int[j] * exp_int[j - 1]) / (exp_int[j] - exp_int[j - 1]) ** 2.0 + ILL[:, j]) - \
dot(Y[:, j + 1], (exp_int[j] * exp_int[j + 1]) / (exp_int[j + 1] - exp_int[j]) ** 2.0 + ILL[:, j])
else: # j == J-2
hess_alpha[j, j] = -dot(Y[:, j], (exp_int[j] * exp_int[j - 1]) / (exp_int[j] - exp_int[j - 1]) ** 2.0 + ILL[:, j]) - \
dot(Y[:, j + 1], ILL[:, j])
for j in xrange(J - 2):
# non-adjacent intercepts have zero mixed partial derivatives
hess_alpha[j, j + 1] = sum(Y[:, j + 1]) * exp_int[j] * exp_int[j + 1] / (exp_int[j + 1] - exp_int[j]) ** 2.0
hess_alpha[j + 1, j] = hess_alpha[j, j + 1] # symmetric
if X is not None: # \partial \alpha_j, \partial \beta
for j in xrange(J - 1):
hess_alpha[j, (J - 1):] = -dot(X.transpose(), ILL[:, j] * (Y[:, j] + Y[:, j + 1]))
hess_alpha[(J - 1):, j] = hess_alpha[j, (J - 1):] # symmetric
return hess_alpha
def _l2_clogistic_hessian_slope(X, Y, ILL, penalty_matrix, value=True):
""" Calculate the matrix of partial second derivatives of the slopes
Args:
X : array_like. design matrix
Y : array_like. response matrix
ILL : array_like. inverse logit of something. see helper function
penalty_matrix : array_like. penalty matrix
value : boolean. ILL: if True, return the actual Hessian. this is SUPER slow! \
if False, return the diagonal weight matrix needed to calculate the Hessian downstream
Returns:
either the true Hessian if value=True, or a sparse diagonal matrix for downstream use
"""
n = Y.shape[0]
J = ILL.shape[1] + 1L
D = Y[:, 0] * ILL[:, 0] # initialization
for j in xrange(1, J):
if j < J - 1:
D += Y[:, j] * (ILL[:, j] + ILL[:, j - 1])
else:
D += Y[:, j] * ILL[:, j - 1]
# make an n x n diagonal matrix whose diagonal is D, off-diagonal is 0
DD = sparse.csr_matrix((D, (range(n), xrange(n))), shape=(n, n))
if value:
return X.transpose().dot(DD.dot(X)) + penalty_matrix # SLOW
else:
return DD # faster to push matrix mult downstream
[docs]def l2_clogistic_ranef(X, Y, alpha, beta, penalty_matrix, offset, LU, **kwargs):
""" Use a fixed inverse hessian to determine step direction \
this inverse hessian is represented by the LU decomposition of the approximate Hessian
Args:
X : array_like. design matrix
Y : array_like. response matrix
alpha : array_like. intercepts.\
must have shape == one less than the number of columns of `Y`
beta : array_like. parameters.\
must have shape == number of columns of X
penalty_matrix : array_like
offset : array_like, optional. Defaults to 0
LU : LU decomposition of approximate Hessian matrix
min_its : int, optional. Minimum number of iterations to take
max_its : int, optional. Maximum number of iterations to take
tol : float. Convergence tolerance on relative change in `beta`
Returns:
Updated parameter vector `beta`
"""
min_its = kwargs.get('min_its', 2L)
max_its = kwargs.get('max_its', 200L)
tol = kwargs.get('tol', 1e-3)
for i in xrange(max_its):
old_beta = beta.copy()
grad = l2_clogistic_gradient(X, Y, intercept=False, alpha=alpha, beta=beta,
penalty_matrix=penalty_matrix, offset=offset)
step = LU.solve(grad)
ILL = _l2_clogistic_hessian_ILL(X, alpha, beta, offset=offset, n=Y.shape[0])
W = _l2_clogistic_hessian_slope(X, Y, ILL, penalty_matrix, value=False)
Xu = dot(X, step)
upstairs = dot(step, grad)
downstairs = dot(Xu, dot(W, Xu)) + dot(step, dot(penalty_matrix, step))
step *= np.sum(upstairs / downstairs)
beta -= step
change = np.linalg.norm(beta - old_beta) / np.linalg.norm(beta)
if change < tol and i >= min_its:
break
return beta
[docs]def l2_clogistic_fixef(X, Y, **kwargs):
""" Take a true, second-order Newton step for fixed effects
Args:
X : array_like. design matrix
Y : array_like. response matrix
alpha : array_like. intercepts.\
must have shape == one less than the number of columns of `Y`
beta : array_like. parameters. \
must have shape == number of columns of X
min_its : int, optional. Minimum number of iterations
max_its : int, optional. Maximum number of iterations
tol : float, optional. Convergence tolerance on relative change in `alpha` and `beta`
penalty_matrix : array_like, optional.
Returns:
Updated intercepts, updated main effects, both array-like
"""
if X is not None:
# X can be none if there are no fixed effects - i.e. just fit the intercept
p = X.shape[1]
beta = kwargs.get('beta', np.zeros(X.shape[1]))
offset = kwargs.get('offset', np.zeros(X.shape[0]))
else:
p = 0
beta = None
offset = None
J = Y.shape[1]
min_its = kwargs.get('min_its', 2L)
max_its = kwargs.get('max_its', 200L)
tol = kwargs.get('tol', 1e-3)
alpha = kwargs.get('alpha', np.linspace(-1, 1, J - 1L))
penalty_matrix = kwargs.get('penalty_matrix', sparse.csr_matrix((p, p)))
for i in xrange(max_its):
old_alpha = 1.0 * alpha
old_beta = 1.0 * beta if beta is not None else None
grad = l2_clogistic_gradient(X=X,
Y=Y,
alpha=alpha,
beta=beta,
penalty_matrix=penalty_matrix,
offset=offset)
H = l2_clogistic_hessian(X=X,
Y=Y,
alpha=alpha,
beta=beta,
penalty_matrix=penalty_matrix,
offset=offset)
step = solve(H, grad)
upstairs = dot(step, grad)
downstairs = dot(step, dot(H, step)) # cf ranef: penalty matrix is 0 for fixed effects
# sometimes step size is a 1x1 matrix. sum converts to scalar
step_size = np.sum(upstairs / downstairs)
step *= step_size
alpha -= step[0:J - 1]
if beta is None:
change = np.linalg.norm(alpha - old_alpha) / np.linalg.norm(alpha)
else:
beta -= step[J - 1:]
change = np.linalg.norm(np.concatenate([alpha, beta]) -
np.concatenate([old_alpha, old_beta])) / \
np.linalg.norm(np.concatenate([alpha, beta]))
if change < tol and i >= min_its:
break
return alpha, beta