Source code for diamond.glms.logistic

import logging
import time
import numpy as np
import pandas as pd
from diamond.glms.glm import GLM
from diamond.solvers.diamond_logistic import FixedHessianSolverMulti
from diamond.solvers.utils import custom_block_diag
from scipy.special import expit

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)


[docs]class LogisticRegression(GLM): """ Logistic regression model with arbitrary crossed random effects and known covariances """ def __init__(self, train_df, priors_df, copy=False, test_df=None): super(LogisticRegression, self).__init__(train_df, priors_df, copy, test_df) self.solver = FixedHessianSolverMulti() self.H_main_inv = None self.H_invs = {} def _create_response_matrix(self): """ Store vector of binary responses in an array """ self.response = self.train_df[self.response] def _create_hessians(self): """ Create bounds on the Hessian matrices Args: None Returns: None """ LOGGER.info("creating Hessians") H_main = 0.25 * np.array(self.main_design.T.dot(self.main_design).todense()) # do a pseudoinverse for the main effects because they are # unregularized and could be constant columns self.H_main_inv = np.linalg.pinv(H_main) for g in self.groupings.keys(): LOGGER.info("creating H_inter for %s", g) inter_design = self.grouping_designs[g] H_inter = 0.25 * inter_design.T.dot(inter_design) q = len(self.level_maps[g]) inv_blocks = [] block_length = H_inter.shape[0] / q for k in range(q): if k % 100000 == 0: LOGGER.info("time elapsed: %.1f", time.time() - self.start_time) LOGGER.info("blocks inverted: %i of %i", k, q) block = np.array(H_inter[(k * block_length):((k + 1) * block_length), (k * block_length):((k + 1) * block_length)].todense()) inv_blocks.append(np.linalg.inv(block + self.sparse_inv_covs[g]._block)) LOGGER.info("creating H_invs") self.H_invs[g] = custom_block_diag(inv_blocks) self.H_invs['main'] = self.H_main_inv
[docs] def fit(self, formula, **kwargs): r""" Fit the model specified by formula and training data Args: formula (string): R-style formula expressing the model to fit. eg. :math:`y \sim 1 + x + (1 + x | group)` Keyword Args: kwargs : additional arguments passed to solver Returns: dict of estimated parameters """ self.initialize(formula, **kwargs) self.effects, self.obj_fun = self.solver.fit(self.response, self.grouping_designs, self.H_invs, self.sparse_inv_covs, **kwargs) self._create_output_dict() return self.results_dict
[docs] def refit(self, **kwargs): r""" Update a fitted model. Any kwargs not supplied will be reused from original call to self.fit() Args: None Keyword Args: kwargs: fitting parameters Returns: None """ kwargs = {x: kwargs.get(x, self.fit_kwargs.get(x, None)) for x in set(kwargs.keys()).union(self.fit_kwargs.keys())} self.effects, self.obj_fun = self.solver.fit(self.response, self.grouping_designs, self.H_invs, self.sparse_inv_covs, **kwargs) self._create_output_dict() return self.results_dict
def _create_output_dict(self): """ Extract coefficients Args: None Returns: Dictionary of estimated coefficients. Keys are "fixed_effects" and one key for each grouping factor """ LOGGER.info("extracting coefficients") main_coefs = pd.DataFrame({'variable': self.main_effects, 'value': self.effects["main"]}) self.results_dict['fixed_effects'] = main_coefs self._create_output_dict_inter() def _merge_effects(self): """ Add together estimated fixed and random coefficients Args: None Returns: None """ LOGGER.info("extracting coefficients") main_coefs = pd.DataFrame({'variable': self.main_effects, 'value': self.effects["main"]}) groupings_coefs = {} for g in self.groupings.keys(): coefs = pd.DataFrame({'inter_value': self.effects[g]}) idx = g + '_idx' coefs[idx] = np.arange(len(coefs)) / len(self.groupings[g]) coefs['inter_idx'] = np.mod(np.arange(len(coefs)), len(self.groupings[g])) coefs = coefs.merge(self.inter_maps[g], on='inter_idx').merge(self.level_maps[g], on=idx) groupings_coefs[g] = coefs # start merging all of the coefficients together merged_coefs = main_coefs for g in groupings_coefs: merged_coefs = merged_coefs.merge(groupings_coefs[g], on='variable', how='right', suffixes=('_1', '_2')) try: merged_coefs['value'] = merged_coefs['value'] + merged_coefs['inter_value'] except KeyError: merged_coefs['value'] = merged_coefs['value'] + merged_coefs['inter_value_2'] # do some wrangling to get the pivot to work correctly cols = self.grouping_factors cols.extend(['variable', 'value']) merged_coefs = merged_coefs[cols] index = range(np.product(self.num_levels.values())) * merged_coefs.variable.nunique() merged_coefs['index'] = index pivoted_merged_coefs = merged_coefs.pivot(index='index', columns='variable', values='value').reset_index() del pivoted_merged_coefs.index.name pivoted_merged_coefs.columns.name = None self.results = pivoted_merged_coefs.merge(merged_coefs, on='index', how='left').drop( ['variable', 'value', 'index'], axis=1) # Add in columns that are main effects but not interaction effects inter_cols = self.groupings.values()[0] main_cols = main_coefs['variable'].unique() vars_to_add = list(set(main_cols) - set(inter_cols)) for v in vars_to_add: LOGGER.info("%s is a main effect but not an interaction effect. Adding it to the results now", v) self.results[v] = float(main_coefs.value[main_coefs['variable'] == v])
[docs] def predict(self, new_df): """ Use estimated coefficients to make predictions on new data Args: new_df (DataFrame). DataFrame to make predictions on. Returns: array-like. Predictions on the response scale, i.e. probabilities """ return expit(super(LogisticRegression, self).predict(new_df))