import abc
import logging
import re
import time
from collections import defaultdict
import numpy as np
import pandas as pd
from diamond.solvers.repeated_block_diag import RepeatedBlockDiagonal
from scipy import sparse
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
[docs]class GLM(object):
"""
Binary or cumulative logistic regression model with arbitrary crossed random effects and known covariance
"""
__metaclass__ = abc.ABCMeta
def __init__(self, train_df, priors_df, copy=False, test_df=None):
r""" Initialize a diamond model
Args:
train_df (DataFrame): DataFrame to estimate the model parameters with
priors_df (DataFrame): Covariance matrix to use for regularization.
Format is | group | var1 | var2 | vcov |
where group represents the grouping factor, var1 and var2 specify the row and column of the covariance matrix,
and vcov is a scalar for that entry of the covariance matrix. Note that if var2 is NULL, vcov is
interpreted as the diagonal element of the covariance matrix for var1
copy (boolean): Make a copy of train_df. If False, new columns will be created and the index will be reset.
test_df (DataFrame): This is used to make predictions.
Returns:
Object (GLM)
"""
self.solver = None # solver will be set by child classes
self.train_df = train_df.copy(deep=True) if copy else train_df
self.test_df = test_df
self.priors_df = priors_df
self.variances = None
# set by self.parse_formula
self.response = None
self.main_effects = []
self.groupings = defaultdict(lambda: [])
self.grouping_factors = []
self.group_levels = {}
self.num_levels = {}
self.total_num_interactions = 0
self.num_main = 0
# set by self.create_penalty_matrix
self.sparse_inv_covs = {}
self.fit_order = []
# set by self.create_design_matrix
self.level_maps = {}
self.main_map = None
self.inter_maps = {}
self.main_design = None
self.grouping_designs = {}
# set by self.fit
self.fit_kwargs = None
self.effects = {}
self.results = None
self.results_dict = {}
self.start_time = None
self.obj_fun = None
[docs] def initialize(self, formula, **kwargs):
r""" Get ready to fit the model by parsing the formula, checking priors, and creating design, penalty, and Hessian matrices
Args:
formula (string): R-style formula expressing the model to fit. eg. :math:`y \sim 1 + x + (1 + x | group)`
Keyword Args:
kwargs: additional arguments to pass to fit method
"""
self.fit_kwargs = kwargs
self.start_time = time.time()
self._parse_formula(formula)
self._check_priors_df()
self._create_design_matrix()
self._create_response_matrix()
self._create_penalty_matrix()
self._create_hessians()
@abc.abstractmethod
def _create_response_matrix(self):
""" Must be implemented by subclasses """
raise NotImplementedError
@abc.abstractmethod
def _create_hessians(self):
""" Must be implemented by subclasses """
raise NotImplementedError
@abc.abstractmethod
[docs] def fit(self, formula, **kwargs):
""" Must be implemented by subclasses """
raise NotImplementedError
@abc.abstractmethod
def _create_output_dict(self):
""" Must be implemented by subclasses """
raise NotImplementedError
def _check_priors_df(self):
"""Run simple validations on priors data frame
This method runs a number of sanity checks on the priors data frame:
- ensure that the expected columns are present
- ensure that all rows in priors_df are consistent with the formula
- ensure that all random effect variables at least have a variance
Args:
None
Returns:
None
"""
_groupings = {g: f + [np.nan] for g, f in self.groupings.iteritems()}
allowed_covs = {
g: [[v, w] for i, v in enumerate(f)
for j, w in enumerate(f)
if j > i and not isinstance(v, float)]
for g, f in _groupings.iteritems()
}
required_covs = {
g: [[v, np.nan] for i, v in enumerate(f) if not isinstance(v, float)]
for g, f in _groupings.iteritems()
}
example_priors_data = {
"group": [g for g, f in allowed_covs.iteritems() for _ in f],
"var1": [j[0] for _, f in allowed_covs.iteritems() for j in f],
"var2": [j[1] for _, f in allowed_covs.iteritems() for j in f],
"vcov": [0.1 for _, f in allowed_covs.iteritems() for _ in f]
}
example_priors_df = pd.DataFrame.from_dict(example_priors_data)
expected_cols = set(["group", "var1", "var2", "vcov"])
actual_cols = set(self.priors_df.columns)
if len(expected_cols - actual_cols) != 0:
raise AssertionError("""
priors_df is expected to have the following columns:
| group | var1 | var2 | vcov |
""")
# check that all rows in self.priors_df are valid
for row in self.priors_df.iterrows():
group = row[1].group
var1 = row[1].var1
var2 = row[1].var2
# we need to standardize nan values
if str(var2) in ['nan', 'None', 'null']:
var2 = np.nan
var_pair = [var1, var2]
num_matches = sum([np.array_equal(var_pair, p) for p in allowed_covs[group]])
if num_matches != 1:
raise AssertionError("""There is a row in your priors_df which is not expected.
Unexpected row: {}
A valid priors_df looks something like:
{}
""".format([group] + var_pair, example_priors_df))
try:
required_covs[group].remove(var_pair)
except ValueError:
# thrown when var_pair not in required_covs[group]
pass
# loop through the required_covs and make sure none are remaining
remaining_required_covs = sum([len(f) for _, f in required_covs.iteritems()])
if remaining_required_covs > 0:
raise AssertionError("""Priors_df is missing some required rows.
If you want an unregularized random effect, include it as a fixed
effect instead.
The missing rows are: {}
""".format(required_covs))
@staticmethod
def _check_formula(formula):
"""
Check that the formula contains all necessary ingredients, such as a response and at least one group
Args:
formula (string): R-style formula expressing the model to fit. eg. "y ~ 1 + x + (1 + x | group)"
Returns:
None
"""
valid_formula_str = """
A valid formula looks like:
response ~ 1 + feature1 + feature2 + ... + (1 + feature1 + feature2 + ... | doctor_id)
"""
if "~" not in formula:
raise AssertionError("Formula missing '~'. You need a response. {}".format(valid_formula_str))
if "|" not in formula:
raise AssertionError("Formula missing '|'. You need at least 1 group. {}".format(valid_formula_str))
def _parse_formula(self, formula):
"""
Args:
formula (string): R-style formula expressing the model to fit. eg. "y ~ 1 + x + (1 + x | group)"
Returns:
None
"""
# strip all newlines, tabs, and spaces
formula = re.sub(r'[\n\t ]', '', formula)
# split the response from the formula terms
self.response = formula.split("~")[0]
terms = formula.split("~")[1:][0]
interactions = re.findall(r'\(([A-Za-z0-9_|\+]+)\)', terms)
# parse the interactions. these are terms like (1|doctor_id)
for i in interactions:
# remove interactions from terms list
terms = terms.replace("(%s)" % i, "")
i_terms = i.split('|')[0]
i_group = i.split('|')[1:][0]
for i_term in i_terms.split("+"):
if i_term == "1":
self.groupings[i_group].append("intercept")
elif i_term != '':
self.groupings[i_group].append(i_term)
self.group_levels[i_group] = self.train_df[i_group].unique()
self.grouping_factors.append(i_group)
for g in self.grouping_factors:
self.num_levels[g] = self.train_df[g].nunique()
self.total_num_interactions += self.num_levels[g]
# parse the main terms
for m in terms.split("+"):
if m == "1":
self.main_effects.append("intercept")
elif m != '':
self.main_effects.append(m)
self.num_main = len(self.main_effects)
def _create_penalty_matrix(self):
"""
Take the provided covariance matrices and transform it into an L2 penalty matrix
Args:
None
Returns:
None
"""
self.variances = self.priors_df
self.variances.ix[self.variances['var1'] == '(Intercept)', 'var1'] = 'intercept'
LOGGER.info("creating covariance matrix")
# if "group" is a column in the variances, rename it to "grp"
self.variances.rename(columns={'grp': 'group'}, inplace=True)
inv_covs = {}
for g in self.groupings.keys():
n = len(self.groupings[g])
cov_mat = np.zeros((n, n))
var_grp = self.variances[self.variances.group == g]
if len(var_grp) > 0: # if no priors, then leave cov_mat as zeros
for row in var_grp[['var1', 'var2', 'vcov']].iterrows():
if str(row[1]['var2']) in ['nan', 'None', 'null']:
i = self.groupings[g].index(row[1]['var1'])
cov_mat[i, i] = row[1]['vcov']
else:
i = self.groupings[g].index(row[1]['var1'])
j = self.groupings[g].index(row[1]['var2'])
cov_mat[i, j] = row[1]['vcov']
cov_mat[j, i] = row[1]['vcov']
inv_covs[g] = np.linalg.inv(cov_mat)
self.sparse_inv_covs[g] = RepeatedBlockDiagonal(inv_covs[g], self.num_levels[g])
self.sparse_inv_covs['main'] = None
def _create_main_design(self, **kwargs):
r"""
Create design matrix for main effects
Keyword Args:
* *df* (``DataFrame``). specify a new dataframe to create design matrix from
Returns:
array_like: design matrix in sparse CSR format
"""
df = kwargs.get('df', self.train_df)
df.reset_index(drop=True, inplace=True)
df['row_index'] = df.index
df['intercept'] = 1.0 # assume intercept is always included
# although having this extra column doesn't hurt if intercept is not included
id_cols = ['row_index']
melted_df = pd.melt(df[id_cols + self.main_effects], id_cols)
melted_df = melted_df.merge(self.main_map, on='variable')
melted_df['col_index'] = melted_df['main_idx']
row = melted_df.row_index
col = melted_df.col_index
data = melted_df.value
return sparse.coo_matrix((data, (row, col)), shape=(max(row) + 1, max(col) + 1)).tocsr()
def _create_inter_design(self, g, **kwargs):
r"""
Create random effects design matrix for grouping factor g
This is straightforward when you create the matrix using the training DataFrame
But a new DataFrame can have new levels of g which did not exist in training DF
For these levels, the random coefficients are set to zero
But as a practical matter, it's easier to zero out the values of the predictors
here than it is to modify the fitted coefficient vector
Args:
g (string): grouping factor to create design matrix
Keyword Args:
* *df* (``DataFrame``). specify a new dataframe to create design matrix from
Returns:
array_like : design matrix in sparse CSR format
"""
idx = g + '_idx'
df = kwargs.get('df', self.train_df)
df.reset_index(drop=True, inplace=True)
df['row_index'] = df.index
if 'intercept' in self.groupings[g]:
df['intercept'] = 1.0
id_cols = [g, 'row_index']
melted_inter = pd.melt(df[id_cols + self.groupings[g]], id_cols).merge(
self.level_maps[g], how='left', on=g).merge( # level_maps has levels of g and an index for each level
self.inter_maps[g], how='inner', on='variable')
# inter_maps has variables in formula for this grouping factor, plus indexes
# because of the above left join, some idx values are NULL
# but we need to keep the row indexes
nrows = max(melted_inter.row_index) + 1L
# now drop the null column index
melted_inter.dropna(inplace=True)
melted_inter.sort_values(by=[g, idx], inplace=True)
melted_inter['col_index'] = melted_inter[idx] * len(self.groupings[g]) + melted_inter['inter_idx']
row = melted_inter.row_index
col = melted_inter.col_index
data = melted_inter.value
if g in self.grouping_designs.keys():
# this means training matrix was already created for this group
# reuse the same shape: indices are lined up, everything else will be 0 b/c of sparse matrix
ncols = self.grouping_designs[g].shape[1]
else:
ncols = max(col) + 1L
return sparse.coo_matrix((data, (row, col)), shape=(nrows, ncols)).tocsr()
def _create_design_matrix(self):
"""
Create row index and dummy intercept column
Args:
None
Returns:
None
"""
self.train_df['row_index'] = self.train_df.index
# Create some dataframes to help with indexing
for g in self.groupings.keys():
idx = g + '_idx'
self.level_maps[g] = pd.DataFrame({g: np.unique(self.train_df[g]),
idx: np.arange(self.num_levels[g])})
self.inter_maps[g] = pd.DataFrame({'variable': self.groupings[g],
'inter_idx': np.arange(len(self.groupings[g]))})
self.main_map = pd.DataFrame({'variable': self.main_effects,
'main_idx': np.arange(len(self.main_effects))})
LOGGER.info("creating main design matrix")
# Compute indices (row_index, col_index) into sparse design matrix
self.main_design = self._create_main_design()
for g in self.groupings.keys():
LOGGER.info("creating %s design matrix", g)
self.grouping_designs[g] = self._create_inter_design(g)
self.grouping_designs['main'] = self.main_design
def _create_output_dict_inter(self):
""" Format the estimated coefficients into a nice dictionary
Args:
None
Returns:
None
"""
groupings_coefs = {}
for g in self.groupings.keys():
coefs = pd.DataFrame({'inter_value': self.effects[g]})
idx = g + '_idx'
coefs[idx] = np.arange(len(coefs)) / len(self.groupings[g])
coefs['inter_idx'] = np.mod(np.arange(len(coefs)), len(self.groupings[g]))
coefs = coefs.merge(self.inter_maps[g], on='inter_idx').merge(self.level_maps[g], on=idx)
groupings_coefs[g] = coefs
# make the individual entries in self.results_dict
_df = coefs.pivot(index=g, columns='variable', values='inter_value').reset_index()
del _df.index.name
_df.columns.name = None
self.results_dict[g] = _df
[docs] def predict(self, new_df):
r""" Obtain predictions from a fitted diamond object. New levels of grouping factors are given fixed effects, with zero random effects
Args:
new_df (DataFrame): data to make predictions on
Returns:
array_like: predictions, whose length is equal to the \
number of rows of the supplied DataFrame
"""
if self.num_main > 0:
X = self._create_main_design(df=new_df)
predictions = X.dot(self.effects['main'])
else:
predictions = np.zeros(len(new_df.index))
for g in self.groupings.keys():
Z = self._create_inter_design(g, df=new_df)
predictions += Z.dot(self.effects[g])
return predictions