"""
:class:`FoldingClassifier` and :class:`FoldingRegressor` provide an easy way
to run k-Folding cross-validation. Also it is a nice way to combine predictions of trained classifiers.
"""
from __future__ import division, print_function, absolute_import
import numpy
import pandas
from six.moves import zip
from sklearn import clone
from sklearn.cross_validation import KFold
from sklearn.utils import check_random_state
from . import utils
from .factory import train_estimator
from ..estimators.interface import Classifier, Regressor
from ..estimators.utils import check_inputs
__author__ = 'Tatiana Likhomanenko, Alex Rogozhnikov'
__all__ = ['FoldingClassifier', 'FoldingRegressor']
from .utils import get_classifier_probabilities, get_classifier_staged_proba, get_regressor_prediction, \
get_regressor_staged_predict
class FoldingBase(object):
"""
This meta-{estimator} implements folding algorithm:
* split training data into n equal parts;
* train n {estimator}s, each one is trained using n-1 folds
To get unbiased predictions for data, pass the **same** dataset (with same order of events)
as in training to prediction methods,
in which case each event is predicted with base {estimator} which didn't use that event during training.
To use information from not one, but several estimators during predictions,
provide appropriate voting function. Examples of voting function:
>>> voting = lambda x: numpy.mean(x, axis=0)
>>> voting = lambda x: numpy.median(x, axis=0)
"""
def __init__(self,
base_estimator,
n_folds=2,
random_state=None,
features=None,
parallel_profile=None):
"""
:param sklearn.BaseEstimator base_estimator: base classifier, which will be used for training
:param int n_folds: count of folds
:param features: features used in training
:type features: None or list[str]
:param parallel_profile: profile for IPython cluster, None to compute locally.
:type parallel_profile: None or str
:param random_state: random state for reproducibility
:type random_state: None or int or RandomState
"""
self.estimators = []
self.parallel_profile = parallel_profile
self.n_folds = n_folds
self.base_estimator = base_estimator
self._folds_indices = None
self.random_state = random_state
self._random_number = None
# setting features directly
self.features = features
def _get_folds_column(self, length):
"""
Return special column with indices of folds for all events.
"""
if self._random_number is None:
self._random_number = check_random_state(self.random_state).randint(0, 100000)
folds_column = numpy.zeros(length)
for fold_number, (_, folds_indices) in enumerate(
KFold(length, self.n_folds, shuffle=True, random_state=self._random_number)):
folds_column[folds_indices] = fold_number
return folds_column
def _prepare_data(self, X, y, sample_weight):
raise NotImplementedError('To be implemented in descendant')
def fit(self, X, y, sample_weight=None):
"""
Train the model, will train several base {estimator}s on overlapping
subsets of training dataset.
:param X: pandas.DataFrame of shape [n_samples, n_features]
:param y: labels of events - array-like of shape [n_samples]
:param sample_weight: weight of events,
array-like of shape [n_samples] or None if all weights are equal
"""
if hasattr(self.base_estimator, 'features'):
assert self.base_estimator.features is None, \
'Base estimator must have None features! Use features parameter in Folding instead'
self.train_length = len(X)
X, y, sample_weight = self._prepare_data(X, y, sample_weight)
folds_column = self._get_folds_column(len(X))
for _ in range(self.n_folds):
self.estimators.append(clone(self.base_estimator))
if sample_weight is None:
weights_iterator = [None] * self.n_folds
else:
weights_iterator = (sample_weight[folds_column != index] for index in range(self.n_folds))
result = utils.map_on_cluster(self.parallel_profile, train_estimator,
range(len(self.estimators)),
self.estimators,
(X.iloc[folds_column != index, :].copy() for index in range(self.n_folds)),
(y[folds_column != index] for index in range(self.n_folds)),
weights_iterator)
for status, data in result:
if status == 'success':
name, classifier, spent_time = data
self.estimators[name] = classifier
else:
print('Problem while training on the node, report:\n', data)
return self
def _folding_prediction(self, X, prediction_function, vote_function=None):
"""
Supplementary function to predict (labels, probabilities, values)
:param X: dataset to predict
:param prediction_function: function(classifier, X) -> prediction
:param vote_function: if using averaging over predictions of folds, this function shall be passed.
For instance: lambda x: numpy.mean(x, axis=0), which means averaging result over all folds.
Another useful option is lambda x: numpy.median(x, axis=0)
"""
X = self._get_features(X)
if vote_function is not None:
print('KFold prediction with voting function')
results = []
for estimator in self.estimators:
results.append(prediction_function(estimator, X))
# results: [n_classifiers, n_samples, n_dimensions], reduction over 0th axis
results = numpy.array(results)
return vote_function(results)
else:
if len(X) != self.train_length:
print('KFold prediction using random classifier (length of data passed not equal to length of train)')
else:
print('KFold prediction using folds column')
folds_column = self._get_folds_column(len(X))
parts = []
for fold in range(self.n_folds):
parts.append(prediction_function(self.estimators[fold], X.iloc[folds_column == fold, :]))
result_shape = [len(X)] + list(numpy.shape(parts[0])[1:])
results = numpy.zeros(shape=result_shape)
folds_indices = [numpy.where(folds_column == fold)[0] for fold in range(self.n_folds)]
for fold, part in enumerate(parts):
results[folds_indices[fold]] = part
return results
def _staged_folding_prediction(self, X, prediction_function, vote_function=None):
X = self._get_features(X)
if vote_function is not None:
print('Using voting KFold prediction')
iterators = [prediction_function(estimator, X) for estimator in self.estimators]
for fold_prob in zip(*iterators):
result = numpy.array(fold_prob)
yield vote_function(result)
else:
if len(X) != self.train_length:
print('KFold prediction using random classifier (length of data passed not equal to length of train)')
else:
print('KFold prediction using folds column')
folds_column = self._get_folds_column(len(X))
iterators = [prediction_function(self.estimators[fold], X.iloc[folds_column == fold, :])
for fold in range(self.n_folds)]
folds_indices = [numpy.where(folds_column == fold)[0] for fold in range(self.n_folds)]
for stage_results in zip(*iterators):
result_shape = [len(X)] + list(numpy.shape(stage_results[0])[1:])
result = numpy.zeros(result_shape)
for fold in range(self.n_folds):
result[folds_indices[fold]] = stage_results[fold]
yield result
def _get_feature_importances(self):
"""
Get features importance
:return: pandas.DataFrame with column effect and `index=features`
"""
importances = numpy.sum([est.feature_importances_ for est in self.estimators], axis=0)
# to get train_features, not features
one_importances = self.estimators[0].get_feature_importances()
return pandas.DataFrame({'effect': importances / numpy.max(importances)}, index=one_importances.index)
[docs]class FoldingRegressor(FoldingBase, Regressor):
# inherit documentation
__doc__ = FoldingBase.__doc__.format(estimator='regressor')
def fit(self, X, y, sample_weight=None):
return FoldingBase.fit(self, X, y, sample_weight=sample_weight)
fit.__doc__ = FoldingBase.fit.__doc__.format(estimator='regressor')
def _prepare_data(self, X, y, sample_weight):
X = self._get_features(X)
y_shape = numpy.shape(y)
self.n_outputs_ = 1 if len(y_shape) < 2 else y_shape[1]
return check_inputs(X, y, sample_weight=sample_weight, allow_multiple_targets=True)
def predict(self, X, vote_function=None):
"""
Get predictions. To get unbiased predictions on training dataset, pass training data
(with same order of events) and vote_function=None.
:param X: pandas.DataFrame of shape [n_samples, n_features]
:param vote_function: function to combine prediction of folds' estimators.
If None then folding scheme is used. Parameters: numpy.ndarray [n_classifiers, n_samples]
:type vote_function: None or function
:rtype: numpy.array of shape [n_samples, n_outputs]
"""
return self._folding_prediction(X, prediction_function=get_regressor_prediction,
vote_function=vote_function)
def staged_predict(self, X, vote_function=None):
"""
Get predictions after each iteration of base estimator.
To get unbiased predictions on training dataset, pass training data
(with same order of events) and vote_function=None.
:param X: pandas.DataFrame of shape [n_samples, n_features]
:param vote_function: function to combine prediction of folds' estimators.
If None then folding scheme is used. Parameters: numpy.ndarray [n_classifiers, n_samples]
:type vote_function: None or function
:rtype: sequence of numpy.array of shape [n_samples, n_outputs]
"""
return self._staged_folding_prediction(X, prediction_function=get_regressor_staged_predict,
vote_function=vote_function)
def get_feature_importances(self):
"""
Get features importance
:rtype: pandas.DataFrame with column effect and `index=features`
"""
return self._get_feature_importances()
@property
def feature_importances_(self):
"""Sklearn-way of returning feature importance.
This returned as numpy.array, assuming that initially passed train_features=None """
return self.get_feature_importances().ix[self.features, 'effect'].values
[docs]class FoldingClassifier(FoldingBase, Classifier):
# inherit documentation
__doc__ = FoldingBase.__doc__.format(estimator='classifier')
def fit(self, X, y, sample_weight=None):
return FoldingBase.fit(self, X, y, sample_weight=sample_weight)
fit.__doc__ = FoldingBase.fit.__doc__.format(estimator='classifier')
def _prepare_data(self, X, y, sample_weight):
X = self._get_features(X)
self._set_classes(y)
return check_inputs(X, y, sample_weight=sample_weight, allow_multiple_targets=True)
def predict(self, X, vote_function=None):
"""
Predict labels. To get unbiased predictions on training dataset, pass training data
(with same order of events) and vote_function=None.
:param X: pandas.DataFrame of shape [n_samples, n_features]
:param vote_function: function to combine prediction of folds' estimators.
If None then folding scheme is used.
:type vote_function: None or function
:rtype: numpy.array of shape [n_samples]
"""
return numpy.argmax(self.predict_proba(X, vote_function=vote_function), axis=1)
def predict_proba(self, X, vote_function=None):
"""
Predict probabilities. To get unbiased predictions on training dataset, pass training data
(with same order of events) and vote_function=None.
:param X: pandas.DataFrame of shape [n_samples, n_features]
:param vote_function: function to combine prediction of folds' estimators.
If None then folding scheme is used.
:type vote_function: None or function
:rtype: numpy.array of shape [n_samples, n_classes]
"""
result = self._folding_prediction(X, prediction_function=get_classifier_probabilities,
vote_function=vote_function)
return result / numpy.sum(result, axis=1, keepdims=True)
def staged_predict_proba(self, X, vote_function=None):
"""
Predict probabilities after each stage of base_estimator.
To get unbiased predictions on training dataset, pass training data
(with same order of events) and vote_function=None.
:param X: pandas.DataFrame of shape [n_samples, n_features]
:param vote_function: function to combine prediction of folds' estimators.
If None then folding scheme is used.
:type vote_function: None or function
:rtype: sequence of numpy.arrays of shape [n_samples, n_classes]
"""
for proba in self._staged_folding_prediction(X, prediction_function=get_classifier_staged_proba,
vote_function=vote_function):
yield proba / numpy.sum(proba, axis=1, keepdims=True)
def get_feature_importances(self):
"""
Get features importance
:rtype: pandas.DataFrame with column effect and `index=features`
"""
return self._get_feature_importances()
@property
def feature_importances_(self):
"""Sklearn-way of returning feature importance.
This returned as numpy.array, assuming that initially passed train_features=None """
return self.get_feature_importances().ix[self.features, 'effect'].values