# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from scipy.sparse import issparse
from collections import defaultdict
import logging
from mealy.error_analysis_utils import check_lists_having_same_elements, generate_preprocessing_steps, invert_transform_via_identity
from mealy.constants import ErrorAnalyzerConstants
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='mealy | %(levelname)s - %(message)s')
[docs]class PipelinePreprocessor(FeatureNameTransformer):
"""Transformer of feature values from the original values to preprocessed ones.
A PipelinePreprocessor parses an input Pipeline preprocessor and generate
a mapping between the input unprocessed feature values and the output
preprocessed feature values.
Args:
ct_preprocessor (sklearn.compose.ColumnTransformer): preprocessing steps.
original_features (list): list of original unpreprocessed feature names, default=None.
"""
def __init__(self, ct_preprocessor, original_features=None):
self.ct_preprocessor = ct_preprocessor
self.original2preprocessed = defaultdict(list)
self.preprocessed2original = {}
self.categorical_features = []
logger.info('Retrieving the list of features used in the pipeline')
original_features_from_ct = self._get_feature_list_from_column_transformer()
if original_features is None:
original_features = original_features_from_ct
elif not check_lists_having_same_elements(original_features, original_features_from_ct):
# If user explicitly gives a list of input features, we compare it with the list derived from the ColumnTransformer
raise ValueError('The list of features given by user does not correspond to the list of features handled by the Pipeline.')
super(PipelinePreprocessor, self).__init__(original_features=original_features, preprocessed_features=[])
logger.info('Generating the feature id mapping dict')
self._create_feature_mapping()
def _get_feature_list_from_column_transformer(self):
all_features = []
for _, transformer, feature_names in self.ct_preprocessor.transformers_:
for step in generate_preprocessing_steps(transformer):
if isinstance(step, ErrorAnalyzerConstants.VALID_CATEGORICAL_STEPS):
# Check for categorical features
self.categorical_features += feature_names
break
all_features += feature_names
return all_features
def _create_feature_mapping(self):
"""
Update the dicts of input <-> output feature id mapping: self.original2preprocessed and self.preprocessed2original
"""
for _, transformer, feature_names in self.ct_preprocessor.transformers_:
orig_feat_ids = np.where(np.in1d(self.original_feature_names, feature_names))[0]
for step in generate_preprocessing_steps(transformer):
output_dim_changed = False
if isinstance(step, ErrorAnalyzerConstants.STEPS_THAT_CHANGE_OUTPUT_DIMENSION_WITH_OUTPUT_FEATURE_NAMES):
# It is assumed that for each pipeline, at most one step changes the feature's dimension
# For now, it can only be a OneHotEncoder step
self._update_feature_mapping_dict_using_output_names(step,
feature_names,
orig_feat_ids)
output_dim_changed = True
break
if not output_dim_changed:
self._update_feature_mapping_dict_using_input_names(feature_names, orig_feat_ids)
def _update_feature_mapping_dict_using_input_names(self, transformer_feature_names, original_feature_ids):
self.preprocessed_feature_names.extend(transformer_feature_names)
for original_feat_id in original_feature_ids:
idx = len(self.preprocessed2original)
self.original2preprocessed[original_feat_id] = [idx]
self.preprocessed2original[idx] = original_feat_id
def _update_feature_mapping_dict_using_output_names(self, single_transformer, transformer_feature_names, original_feature_ids):
out_feature_names = list(single_transformer.get_feature_names(input_features=transformer_feature_names))
self.preprocessed_feature_names.extend(out_feature_names)
for orig_id, orig_name in zip(original_feature_ids, transformer_feature_names):
part_out_feature_names = [name for name in out_feature_names if orig_name + '_' in name]
offset = len(self.preprocessed2original)
for i in range(len(part_out_feature_names)):
self.original2preprocessed[orig_id].append(offset + i)
self.preprocessed2original[offset + i] = orig_id
def _transform_feature_id(self, index):
"""
Args:
index: int
Returns: index of output feature(s) generated by the requested feature.
"""
return self.original2preprocessed[index]
def _get_feature_ids_related_to_transformer(self, transformer_feature_names):
original_features = self.get_original_feature_names()
original_feature_ids = np.where(np.in1d(original_features, transformer_feature_names))[0]
preprocessed_feature_ids = []
for i in original_feature_ids:
preprocessed_feature_ids += self._transform_feature_id(i)
return original_feature_ids, preprocessed_feature_ids
@staticmethod
def _inverse_single_step(single_step, step_output, transformer_feature_names):
inverse_transform_function_available = getattr(single_step, "inverse_transform", None)
if invert_transform_via_identity(single_step):
logger.info("Reversing step using identity transformation on column(s): {}".format(single_step, ', '.join(transformer_feature_names)))
return step_output
if inverse_transform_function_available:
logger.info("Reversing step using inverse_transform() method on column(s): {}".format(single_step, ', '.join(transformer_feature_names)))
return single_step.inverse_transform(step_output)
raise TypeError('The package does not support {} because it does not provide inverse_transform function.'.format(single_step))
[docs] def is_categorical(self, index=None, name=None):
"""Check whether an unprocessed feature at a given index or with a given name is categorical.
Args:
index (int): feature index.
name (str): feature name.
Return:
bool: True if the input feature is categorical, else False. If both index and name are provided, the index
is retained.
"""
if index is not None:
name = self.original_feature_names[index]
if name is not None:
return name in self.categorical_features
else:
raise ValueError("Either the input index or its name should be specified.")
def get_top_ranked_feature_ids(self, feature_importances, max_nr_features):
ranked_transformed_feature_ids = np.argsort(- feature_importances)
if max_nr_features <= 0:
max_nr_features += len(self.get_original_feature_names())
ranked_feature_ids, seen = [], set()
for idx in ranked_transformed_feature_ids:
inverse_transformed_feature_id = self.inverse_transform_feature_id(idx)
if inverse_transformed_feature_id not in seen:
seen.add(inverse_transformed_feature_id)
ranked_feature_ids.append(inverse_transformed_feature_id)
if max_nr_features == len(ranked_feature_ids):
return ranked_feature_ids
return ranked_feature_ids # should never be reached, but just in case
def inverse_thresholds(self, tree):
used_feature_mask = tree.feature >= 0
feats_idx = tree.feature[used_feature_mask]
thresholds = tree.threshold.astype('O')
thresh = thresholds[used_feature_mask]
n_cols = len(self.get_preprocessed_feature_names())
dummy_x, indices= [], []
for f, t in zip(feats_idx, thresh):
row = [0]*n_cols
row[f] = t
dummy_x.append(row)
indices.append(self.inverse_transform_feature_id(f))
undo_dummy_x = self.inverse_transform(np.array(dummy_x))
descaled_thresh = [undo_dummy_x[i, j] for i, j in enumerate(indices)]
thresholds[used_feature_mask] = descaled_thresh
return thresholds
[docs]class DummyPipelinePreprocessor(FeatureNameTransformer):
def __init__(self, model_performance_predictor_features):
super(DummyPipelinePreprocessor, self).__init__(
original_features=model_performance_predictor_features,
preprocessed_features=model_performance_predictor_features)
def is_categorical(self, index=None, name=None):
return False
def inverse_transform_feature_id(self, index):
return index
def inverse_transform(self, x):
return x
def get_top_ranked_feature_ids(self, feature_importances, max_nr_features):
if max_nr_features == 0:
return np.argsort(- feature_importances)
return np.argsort(- feature_importances)[:max_nr_features]
def inverse_thresholds(self, tree):
return tree.threshold.astype('O')