Source code for rep.estimators.matrixnet

"""
:class:`MatrixNetClassifier` and :class:`MatrixNetRegressor` are wrappers for MatrixNet web service - proprietary BDT
developed at Yandex. Think about this as a specific Boosted Decision Tree algorithm which is available as a service.
At this moment MatrixMet is available only for **CERN users**.

To use MatrixNet, first acquire token::
 * Go to https://yandex-apps.cern.ch/ (login with your CERN-account)
 * Click `Add token` at the left panel
 * Choose service `MatrixNet` and click `Create token`
 * Create `~/.rep-matrixnet.config.json` file with the following content
   (custom path to the config file can be specified when creating a wrapper object)::

        {
            "url": "https://ml.cern.yandex.net/v1",

            "token": "<your_token>"
        }

"""

from __future__ import division, print_function, absolute_import

import contextlib
import hashlib
import json
import numbers
import os
import shutil
import tempfile
import time
from abc import ABCMeta
from collections import defaultdict
from copy import deepcopy
from logging import getLogger

import numpy
import pandas
from six import BytesIO
from sklearn.utils import check_random_state

from ._matrixnetapplier import MatrixNetApplier
from ._mnkit import MatrixNetClient
from .interface import Classifier, Regressor
from .utils import check_inputs, score_to_proba, remove_first_line
from ..utils import take_last

__author__ = 'Tatiana Likhomanenko, Alex Rogozhnikov'
__all__ = ['MatrixNetBase', 'MatrixNetClassifier', 'MatrixNetRegressor']

logger = getLogger(__name__)

CHUNKSIZE = 1000
SYNC_SLEEP_TIME = 10
DEFAULT_CONFIG_PATH = "$HOME/.rep-matrixnet.config.json"


@contextlib.contextmanager
def make_temp_directory():
    temp_dir = tempfile.mkdtemp()
    yield temp_dir
    shutil.rmtree(temp_dir)


[docs]class MatrixNetBase(object): """Base class for MatrixNetClassifier and MatrixNetRegressor. This is a wrapper around **MatrixNet (specific BDT)** technology developed at **Yandex**, which is available for CERN people using authorization. Trained estimator is downloaded and stored at your computer, so you can use it at any time. :param features: features used in training :type features: list[str] or None :param str api_config_file: path to the file with remote api configuration in the json format:: {"url": "https://ml.cern.yandex.net/v1", "token": "<your_token>"} :param int iterations: number of constructed trees (default=100) :param float regularization: regularization number (default=0.01) :param intervals: number of bins for features discretization or dict with borders list for each feature for its discretization (default=8) :type intervals: int or dict(str, list) :param int max_features_per_iteration: depth (default=6, supports 1 <= .. <= 6) :param float features_sample_rate_per_iteration: training features sampling (default=1.0) :param float training_fraction: training rows bagging (default=0.5) :param auto_stop: error value for training pre-stopping :type auto_stop: None or float :param bool sync: synchronous or asynchronous training on the server :param random_state: state for a pseudo random generator :type random_state: None or int or RandomState """ __metaclass__ = ABCMeta _model_type = None def __init__(self, api_config_file=DEFAULT_CONFIG_PATH, iterations=100, regularization=0.01, intervals=8, max_features_per_iteration=6, features_sample_rate_per_iteration=1.0, training_fraction=0.5, auto_stop=None, sync=True, random_state=42): self.api_config_file = api_config_file self.iterations = iterations self.regularization = regularization self.intervals = intervals self.auto_stop = auto_stop self.max_features_per_iteration = max_features_per_iteration self.features_sample_rate_per_iteration = features_sample_rate_per_iteration self.training_fraction = training_fraction self.sync = sync self.random_state = random_state self._initialisation_before_fit() def _initialisation_before_fit(self): self.formula_mx = None self.mn_cls = None self._api = None self._feature_importances = None self._pool_hash = None self._fit_status = False def _configure_api(self, config_file_path): config_file_path = os.path.expandvars(config_file_path) with open(config_file_path, 'r') as conf_file: config = json.load(conf_file) self._api = MatrixNetClient(config['url'], token=config['token']) if self.mn_cls is not None: self.mn_cls.requests_kwargs['headers']['X-Yacern-Token'] = self._api.auth_token def __getstate__(self): result = deepcopy(self.__dict__) if '_api' in result: del result['_api'] if result['mn_cls'] is not None: result['mn_cls'].requests_kwargs['headers']['X-Yacern-Token'] = "" return result def __convert_borders(self, borders, features): """ convert borders for features into correct format to send to the server """ converted_borders = "" for i, feature in enumerate(features): if not feature in borders: continue for border in borders[feature]: converted_borders += "{}\t{}\t0\n".format(i, border) return converted_borders def _md5(self, filename): """ compute md5 hash for file """ md5 = hashlib.md5() with open(filename, 'rb') as file_d: for chunk in iter(lambda: file_d.read(128 * md5.block_size), b''): md5.update(chunk) return md5.hexdigest() def _save_df_to_file(self, df, labels, sample_weight, outfile): """ save DataFrame to send to server """ header = True mode = 'w' for row in range(0, len(df), CHUNKSIZE): df_ef = df.iloc[row: row + CHUNKSIZE, :].copy() df_ef['is_signal'] = labels[row: row + CHUNKSIZE] df_ef['weight'] = sample_weight[row: row + CHUNKSIZE] df_ef.to_csv(outfile, sep='\t', index=False, header=header, mode=mode) header = False mode = 'a' def _upload_training_to_bucket(self, X, y, sample_weight): with make_temp_directory() as temp_dir: data_local = os.path.join(temp_dir, 'data.csv') self._save_df_to_file(X, y, sample_weight, data_local) self._pool_hash = self._md5(data_local) self._configure_api(self.api_config_file) mn_bucket = self._api.bucket(bucket_id=self._pool_hash) if 'data.csv' not in set(mn_bucket.ls()): mn_bucket.upload(data_local) return mn_bucket def _train_formula(self, mn_bucket, features): """ prepare parameters and call _train_sync """ if self.random_state is None: seed = None elif isinstance(self.random_state, int): seed = self.random_state else: seed = check_random_state(self.random_state).randint(0, 10000) mn_options = {'iterations': int(self.iterations), 'regularization': float(self.regularization), 'max_features_per_iteration': int(self.max_features_per_iteration), 'features_sample_rate_per_iteration': float(self.features_sample_rate_per_iteration), 'training_fraction': float(self.training_fraction), 'seed': None, 'intervals': None, 'auto_stop': None, 'train_type': self._model_type} if seed is not None: mn_options['seed'] = int(seed) if isinstance(self.intervals, numbers.Number): mn_options['intervals'] = int(self.intervals) else: assert set(self.intervals.keys()) == set(features), 'intervals must contains borders for all features' with make_temp_directory() as temp_dir: borders_local = os.path.join(temp_dir, 'borders') with open(borders_local, "w") as file_b: file_b.write(self.__convert_borders(self.intervals, features)) suffix = '.{}.baseline'.format(self._md5(borders_local)) borders_name = borders_local + suffix os.rename(borders_local, borders_name) if borders_name not in set(mn_bucket.ls()): mn_bucket.upload(borders_name) mn_options['intervals'] = 'borders' + suffix if self.auto_stop is not None: mn_options['auto_stop'] = float(self.auto_stop) descriptor = { 'mn_parameters': mn_options, 'mn_version': 1, 'fields': list(features), 'extra': {}, } self._configure_api(self.api_config_file) self.mn_cls = self._api.classifier( parameters=descriptor, description="REP-submitted classifier", bucket_id=mn_bucket.bucket_id, ) self.mn_cls.upload() self._fit_status = True
[docs] def training_status(self): """ Check if training has finished on the server :rtype: bool """ self._configure_api(self.api_config_file) assert self._fit_status and self.mn_cls is not None, 'Call fit before' print(self.mn_cls) assert self.mn_cls.get_status() != 'failed', 'Estimator is failed, run resubmit function, job id {}'.format( self.mn_cls.classifier_id) if self.mn_cls.get_status() == 'completed': self._download_formula() self._download_features() return True else: return False
[docs] def synchronize(self): """ Synchronise asynchronic training: wait while training process will be finished on the server """ assert self._fit_status, 'Do fit, model is not trained' if self.formula_mx is not None and self._feature_importances is not None: return while not self.training_status(): time.sleep(SYNC_SLEEP_TIME) assert (self.formula_mx is not None and self._feature_importances is not None), \ "Classifier wasn't fitted, please call `fit` first"
def _download_formula(self): """ Download formula from the server """ if self.formula_mx is not None: return with tempfile.NamedTemporaryFile() as outfile: self._configure_api(self.api_config_file) self.mn_cls.save_formula(outfile.name) with open(outfile.name, 'rb') as formula_file: self.formula_mx = formula_file.read() assert len(self.formula_mx) > 0, "Formula is empty" def _download_features(self): if self._feature_importances is not None: return with tempfile.NamedTemporaryFile() as outfile: self.mn_cls.save_stats(outfile.name) stats = json.loads(open(outfile.name).read())['factors'] importances = defaultdict(list) columns = ["name", "effect", "info", "efficiency"] for data in stats: for key in columns: importances[key].append(data[key]) df = pandas.DataFrame(importances) df_result = {'effect': df['effect'].values / max(df['effect']), 'information': df['info'].values / max(df['info']), 'efficiency': df['efficiency'].values / max(df['efficiency'])} self._feature_importances = pandas.DataFrame(df_result, index=df['name'].values)
[docs] def get_feature_importances(self): """ Get features importance: `effect`, `efficiency`, `information` characteristics :rtype: pandas.DataFrame with `index=self.features` """ self.synchronize() return self._feature_importances
@property def feature_importances_(self): """Sklearn-way of returning feature importance. This returned as numpy.array, 'effect' column is used among MatrixNet importances. """ return numpy.array(self.get_feature_importances()['effect'].ix[self.features])
[docs] def get_iterations(self): """ Return number of already constructed trees during training :return: int or None """ self._configure_api(self.api_config_file) if self.mn_cls is not None: return self.mn_cls.get_iterations() else: return None
[docs] def resubmit(self): """ Resubmit training process on the server in case of failing job. """ if self.mn_cls is not None: self._configure_api(self.api_config_file) self.mn_cls.resubmit()
[docs]class MatrixNetClassifier(MatrixNetBase, Classifier): __doc__ = 'MatrixNet classification model. \n' + remove_first_line(MatrixNetBase.__doc__) def __init__(self, features=None, api_config_file=DEFAULT_CONFIG_PATH, iterations=100, regularization=0.01, intervals=8, max_features_per_iteration=6, features_sample_rate_per_iteration=1.0, training_fraction=0.5, auto_stop=None, sync=True, random_state=42): MatrixNetBase.__init__(self, api_config_file=api_config_file, iterations=iterations, regularization=regularization, intervals=intervals, max_features_per_iteration=max_features_per_iteration, features_sample_rate_per_iteration=features_sample_rate_per_iteration, training_fraction=training_fraction, auto_stop=auto_stop, sync=sync, random_state=random_state) Classifier.__init__(self, features=features) self._model_type = 'classification' def _set_classes_special(self, y): self._set_classes(y) assert self.n_classes_ == 2, "Support only 2 classes (data contain {})".format(self.n_classes_)
[docs] def fit(self, X, y, sample_weight=None): self._initialisation_before_fit() X, y, sample_weight = check_inputs(X, y, sample_weight=sample_weight, allow_none_weights=False) self._set_classes_special(y) X = self._get_features(X) mn_bucket = self._upload_training_to_bucket(X, y, sample_weight) self._train_formula(mn_bucket, list(X.columns)) if self.sync: self.synchronize() return self
fit.__doc__ = Classifier.fit.__doc__
[docs] def predict_proba(self, X): return take_last(self.staged_predict_proba(X, step=100000))
predict_proba.__doc__ = Classifier.predict_proba.__doc__
[docs] def staged_predict_proba(self, X, step=10): """ Predict probabilities for data for each class label on each stage. :param pandas.DataFrame X: data of shape [n_samples, n_features] :param int step: step for returned iterations (10 by default). :return: iterator """ self.synchronize() X = self._get_features(X) data = X.astype(float) data = pandas.DataFrame(data) mx = MatrixNetApplier(BytesIO(self.formula_mx)) for stage, prediction in enumerate(mx.staged_apply(data)): if stage % step == 0: yield score_to_proba(prediction) if stage % step != 0: yield score_to_proba(prediction)
[docs]class MatrixNetRegressor(MatrixNetBase, Regressor): __doc__ = 'MatrixNet for regression model. \n' + remove_first_line(MatrixNetBase.__doc__) def __init__(self, features=None, api_config_file=DEFAULT_CONFIG_PATH, iterations=100, regularization=0.01, intervals=8, max_features_per_iteration=6, features_sample_rate_per_iteration=1.0, training_fraction=0.5, auto_stop=None, sync=True, random_state=42): MatrixNetBase.__init__(self, api_config_file=api_config_file, iterations=iterations, regularization=regularization, intervals=intervals, max_features_per_iteration=max_features_per_iteration, features_sample_rate_per_iteration=features_sample_rate_per_iteration, training_fraction=training_fraction, auto_stop=auto_stop, sync=sync, random_state=random_state) Regressor.__init__(self, features=features) self._model_type = 'regression'
[docs] def fit(self, X, y, sample_weight=None): self._initialisation_before_fit() X, y, sample_weight = check_inputs(X, y, sample_weight=sample_weight, allow_none_weights=False) X = self._get_features(X) mn_bucket = self._upload_training_to_bucket(X, y, sample_weight) self._train_formula(mn_bucket, list(X.columns)) if self.sync: self.synchronize() return self
fit.__doc__ = Classifier.fit.__doc__
[docs] def predict(self, X): return take_last(self.staged_predict(X, step=100000))
predict.__doc__ = Classifier.predict.__doc__
[docs] def staged_predict(self, X, step=10): """ Predict probabilities for data for each class label on each stage. :param pandas.DataFrame X: data of shape [n_samples, n_features] :param int step: step for returned iterations (10 by default). :return: iterator """ self.synchronize() X = self._get_features(X) data = X.astype(float) data = pandas.DataFrame(data) mx = MatrixNetApplier(BytesIO(self.formula_mx)) for stage, prediction in enumerate(mx.staged_apply(data)): if stage % step == 0: yield prediction if stage % step != 0: yield prediction