Source code for SuperSCC.feature_selection.feature_selection

"""
Feature selection module for selecting data features.
"""

import re
import os
import pickle
import warnings
from datetime import datetime
from functools import reduce

from dcor import distance_correlation
import numpy as np
import pandas as pd
from scipy.stats import rankdata, gmean
from sklearn.metrics import accuracy_score, cohen_kappa_score, matthews_corrcoef, balanced_accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.feature_selection import RFECV, VarianceThreshold, SelectKBest, chi2, SelectFromModel, f_classif, mutual_info_classif
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler

from ..utils import record_time, remove_time, check_duplicated_elements, retrieve_positive_markers, jaccard_score


def reverse_rank_weight(x):
    """
    A function to reverse rank weight to makre sure the greatest ranking value representing the most importance.
    """
    assert isinstance(x, dict), "x should be dictionary"
    unique_values = list(set(x.values()))
    unique_values.sort()
    half_length = int(len(unique_values) / 2)
    
    d = dict()
    
    for key, value in x.items():
        if value > unique_values[half_length]:
            for i in range(-1, -half_length-1, -1):
                if unique_values[i] == value:
                    j = i + (i * -2) - 1
                    d[key] = unique_values[j]
        
        else:
            for i in range(half_length+1):
                if unique_values[i] == value:
                    j = i + 1
                    d[key] = unique_values[-j]
    return d    


def model_accuracy(x, y, model, cv = 5, linear_svm_multi_class = "ovr", logistic_multi_class = "ovr", class_weight = "balanced", n_estimators = 100, random_state = 10):
    """
    A function to evulate the model accuracy on cross-validation splits.

    Parameters
    ----------
    data: 
        A pandas data frame object. Rows are cells. Columns are features.
    y: 
        A list of labels for each row in x.
    model: 
        A string to determine the evulation model. Three available values including 'svm', 'logistic' and 'random_forest'.
    cv: 
        A int to control the number of cross validation. 
    linear_svm_multi_class: 
        A string to decide which mode to deal with multiclassification in the linear svm model. Default is "ovr". This parameter only takes effect when model is set in 'svm'. Other available words include "ovo".
    logistic_multi_class: 
        A string to decide which mode to deal with multiclassification in the logistic model. Default is "ovr". Other available words include "multinomial" and "auto".
    class_weight: 
        A string to decide whether class weights will be considered. If None, all classes are supposed to have weight one. The "balanced" mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as n_samples / (n_classes * np.bincount(y)). Default is 'balanced'.
    random_state: 
        A int to control the randomness of the bootstrapping of the samples. It takes effect when model is set in 'random_foreast' or in "logistic". Default is 10.
    """
    # prepare containers
    score1_ls = list()
    score2_ls = list()
    score3_ls = list()
    score4_ls = list()

    for i in range(cv):
        X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size = 0.2)
        if model == "svm":
            classifier = LinearSVC(multi_class =  linear_svm_multi_class, class_weight = class_weight).fit(X_train, Y_train)
        elif model == "random_foreast":
            classifier = RandomForestClassifier(n_estimators = n_estimators, random_state = random_state).fit(X_train, Y_train)
        elif model == "logistic":
            classifier = LogisticRegression(multi_class = logistic_multi_class, random_state = random_state, max_iter=200, class_weight = class_weight).fit(X_train, Y_train)

        y_pred = classifier.predict(X_test)
        score1 = cohen_kappa_score(Y_test, y_pred)
        score1_ls.append(score1)
        score2 = matthews_corrcoef(Y_test, y_pred)
        score2_ls.append(score2)
        score3 = balanced_accuracy_score(Y_test, y_pred)
        score3_ls.append(score3)
        score4 = accuracy_score(Y_test, y_pred)
        score4_ls.append(score4)

    accuracy = "cohen_kappa_score: {}; matthews_corrcoef: {}; balanced_accuracy_score: {}; accuracy_score: {}".format(np.mean(score1), np.mean(score2), np.mean(score3), np.mean(score4))

    return accuracy


def filtering_based_selection(X, y, variance_threshold="mean", mutual_info=False, chi_square_test=False, F_test=True, rank_method="dense", logger=None):
    """
    Perform filtering-based feature selection using variance threshold, chi-square test, F-test, and mutual information.
    
    Parameters
    ----------
    X : pandas.DataFrame
        Feature matrix (rows are samples, columns are features)
    y : pandas.Series
        Target labels
    variance_threshold : str
        Variance cutoff method - "zero" or "mean"
    mutual_info : bool
        Whether to use mutual information filtering
    chi_square_test : bool
        Whether to use chi-square test filtering
    F_test : bool
        Whether to use F-test filtering
    rank_method : str
        Ranking method for features
    logger : object
        Logger object for writing logs
        
    Returns
    -------
    dict
        Dictionary containing filtered features and rankings
    """
    message = "{} ======== filtering based selection ========".format(record_time())
    print(message)
    if logger != None:
        logger.write("info", remove_time(message))
    
    # Filter out by variance 
    if variance_threshold == "zero":
        var_selector = VarianceThreshold()
        X_var = var_selector.fit_transform(X)
        
        # features selection by ensemble mode
        all_features = list(var_selector.feature_names_in_)
        remained_features = var_selector.get_feature_names_out()
        remained_index = [all_features.index(i) for i in remained_features]
        remained_features_var = var_selector.variances_[remained_index]

        retained_features_ranking_by_variance = dict([*zip(remained_features, rankdata(remained_features_var, method = rank_method))])
        message = "* {} {}".format(record_time(), "feature ranking based on variance of each feature")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
        
        # features selection by intersection mode
        retained_features_by_filter = X.columns[var_selector.get_support()]
        message = "* {} {} features remained after filter out features with 0 variance".format(record_time(), X_var.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

        y_var = y

        try:
            X_var = pd.DataFrame(X_var.todense())
        except:
            X_var = pd.DataFrame(np.array(X_var))
        
        X_var.columns = remained_features
        

    elif variance_threshold == "mean":
        var_selector = VarianceThreshold(np.mean(np.var(np.array(X), axis = 0)))
        X_var = var_selector.fit_transform(X)
        
        # features selection by ensemble mode
        all_features = list(var_selector.feature_names_in_)
        remained_features = var_selector.get_feature_names_out()
        remained_index = [all_features.index(i) for i in remained_features]
        remained_features_var = var_selector.variances_[remained_index]
        
        retained_features_ranking_by_variance = dict([*zip(remained_features, rankdata(remained_features_var, method = rank_method))])
        message = "* {} {}".format(record_time(), "feature ranking based on variance of each feature")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
                
        # features selection by intersection mode
        retained_features_by_filter = X.columns[var_selector.get_support()]
        message = "* {} {} features remained after filter out features below mean variance of all features".format(record_time(), X_var.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

        y_var = y

        try:
            X_var = pd.DataFrame(X_var.todense())
        except:
            X_var = pd.DataFrame(np.array(X_var))
        
        X_var.columns = remained_features

    # filter by chi square test
    if chi_square_test and (F_test == False and mutual_info == False):
        
        # features selection by ensemble mode
        chivalue, pvalues_chi = chi2(X_var, y_var)
        # when pvalue is NaN, it will be replaced with 1
        pvalues_chi = [1 if np.isnan( _ ) else _ for _ in pvalues_chi]
        retained_features_ranking_by_correlation = dict([*zip(remained_features, rankdata(pvalues_chi, method = rank_method))])
        # reverse the ranking weight to make sure the greatest weight representing the most importance
        retained_features_ranking_by_correlation = reverse_rank_weight(retained_features_ranking_by_correlation)
        message = "* {} {}".format(record_time(), "feature ranking based on p value calculated by chi square test to check the correlation between feature and lable")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
        
        # features selection by intersection mode
        chivalue, pvalues_chi = chi2(X_var, y_var)
        k = chivalue.shape[0] - (pvalues_chi > 0.05).sum()
        selector = SelectKBest(chi2, k = k)
        X_fschi = selector.fit_transform(X_var, y_var)
        retained_features_by_filter = retained_features_by_filter[selector.get_support()]
        message = "** {} {} features remained after further chi sqaure test filtering".format(record_time(), X_fschi.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

    # filter by F test
    if F_test and (chi_square_test == False and mutual_info == False):
        
        # features selection by ensemble mode
        F, pvalues_f = f_classif(np.array(X_var), y_var)

        # when pvalue is NaN, it will be replaced with 1
        pvalues_f = [1 if np.isnan( _ ) else _ for _ in pvalues_f]
        retained_features_ranking_by_correlation = dict([*zip(remained_features, rankdata(pvalues_f, method =  rank_method))])
        # reverse the ranking weight to make sure the greatest weight representing the most importance
        retained_features_ranking_by_correlation = reverse_rank_weight(retained_features_ranking_by_correlation)
        message = "* {} {}".format(record_time(), "feature ranking based on p value calculated by F test to check the correlation between feature and lable")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
        
        # features selection by intersection mode
        F, pvalues_f = f_classif(np.array(X_var), y_var)
        k = F.shape[0] - (pvalues_f > 0.05).sum()
        selector = SelectKBest(f_classif, k = k)
        X_fsF = selector.fit_transform(X_var, y)
        retained_features_by_filter = retained_features_by_filter[selector.get_support()]
        message = "** {} {} features remained after further F test filtering".format(record_time(), X_fsF.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

    # filter by mutual information
    if mutual_info and (F_test == False and chi_square_test == False):
        
        # features selection by ensemble mode
        res = mutual_info_classif(X_var, y_var)
        retained_features_ranking_by_correlation = dict([*zip(remained_features, rankdata(res, method = rank_method))])
        message = "* {} {}".format(record_time(), "feature ranking based on p value calculated by mutual information to check the correlation between feature and lable")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
        
        # features selection by intersection mode
        # res = mutual_info_classif(X_var, y_var)
        k = res.shape[0] - sum(res <= 0)
        selector = SelectKBest(mutual_info_classif, k = k)
        X_fsmic = selector.fit_transform(X_var, y)
        retained_features_by_filter = retained_features_by_filter[selector.get_support()]
        message = "** {} {} features remained after further mutual information filtering".format(record_time(), X_fsmic.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
    
    return {
        'X_filtered': X_var,
        'y_filtered': y_var,
        'retained_features_ranking_by_variance': retained_features_ranking_by_variance,
        'retained_features_ranking_by_correlation': retained_features_ranking_by_correlation,
        'retained_features_by_filter': retained_features_by_filter
    }


def embedding_based_selection(X_var, y_var, model="svm", random_foreast_threshold=None, n_estimators=100, 
                             random_state=10, normalization_method="Min-Max", logistic_multi_class="ovr", 
                             linear_svm_multi_class="ovr", class_weight="balanced", rank_method="dense", 
                             cv=5, logger=None):
    """
    Perform embedding-based feature selection using random forest, logistic regression, and SVM.
    
    Parameters
    ----------
    X_var : pandas.DataFrame
        Filtered feature matrix from filtering step
    y_var : pandas.Series
        Target labels
    model : str
        Model type - "random_foreast", "logistic", or "svm"
    random_foreast_threshold : float
        Threshold for random forest feature importance
    n_estimators : int
        Number of trees for random forest
    random_state : int
        Random state for reproducibility
    normalization_method : str
        Normalization method - "Min-Max" or "Standardization"
    logistic_multi_class : str
        Multi-class strategy for logistic regression
    linear_svm_multi_class : str
        Multi-class strategy for SVM
    class_weight : str
        Class weight strategy
    rank_method : str
        Ranking method for features
    cv : int
        Cross-validation folds
    logger : object
        Logger object for writing logs
        
    Returns
    -------
    dict
        Dictionary containing embedding results and rankings
    """
    message = "{} ======== embedding based selection ========".format(record_time())
    print(message)
    if logger != None:
        logger.write("info", remove_time(message))

    embedding_positive_marker_selection = None
    
    # embedding-based on feature selection
    # select by random forest model
    if model == "random_foreast":
        RFC_ = RandomForestClassifier(n_estimators = n_estimators, random_state = random_state)
        # when random_foreast_threshold is None, 
        #  `1 / number of features` will be used as threshold
        if random_foreast_threshold == None:
            random_foreast_threshold = 1 / X_var.shape[1]
        RFC_embedding_selector = SelectFromModel(RFC_, threshold = random_foreast_threshold)
      
        X_RFC_embedding = RFC_embedding_selector.fit_transform(X_var, y_var)

        # evaluate the accuracy of classifier
        accuracy = model_accuracy(X_var, y_var, model = model, cv = cv, linear_svm_multi_class = linear_svm_multi_class, class_weight = class_weight, n_estimators = n_estimators, random_state = random_state, logistic_multi_class = logistic_multi_class)

        # features selection by ensemble mode
        retained_features_ranking_by_embedding = dict([*zip(RFC_embedding_selector.feature_names_in_ , rankdata(RFC_embedding_selector.estimator_.feature_importances_, method = rank_method))])
        message = "* {} {}".format(record_time(), "feature ranking based on feature importance calcualted by random foreast model")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

        # when the number of unique labels is 2
        if len(set(y_var)) == 2:
            embedding_positive_marker_selection = dict([*zip(X_var.columns, RFC_embedding_selector.estimator_.feature_importances_)])
        
        # features selection by intersection mode
        retained_features_by_embedding = RFC_embedding_selector.get_feature_names_out()
        message = "* {} {} features remained after random foreast based embedding filtering".format(record_time(), X_RFC_embedding.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

    # select by logistic regression model
    elif model == "logistic":
        if normalization_method == "Min-Max":
           X_stand = MinMaxScaler().fit_transform(np.array(X_var))
        elif normalization_method == "Standardization":
           X_stand = StandardScaler().fit_transform(np.array(X_var))
        
        logistic_ = LogisticRegression(multi_class = logistic_multi_class, random_state = random_state, max_iter=200, class_weight = class_weight)
        log_embedding_selector = SelectFromModel(logistic_, norm_order = 1)
        X_log_embedding = log_embedding_selector.fit_transform(np.array(X_stand), y_var)

        # evaluate the accuracy of classifier
        accuracy = model_accuracy(X_stand, y_var, model = model, cv = cv, linear_svm_multi_class = linear_svm_multi_class, class_weight = class_weight, n_estimators = n_estimators, random_state = random_state, logistic_multi_class = logistic_multi_class)

        # features selection by ensemble mode
        retained_features_ranking_by_embedding = dict([*zip(X_var.columns, rankdata(np.sum(np.abs(log_embedding_selector.estimator_.coef_), axis = 0), method = rank_method))]) # sum all absolute coefficients of each feature 
        
        # when the number of unique labels is 2
        if len(set(y_var)) == 2:
            embedding_positive_marker_selection = dict([*zip(X_var.columns, log_embedding_selector.estimator_.coef_)])
        
        message = "* {} {}".format(record_time(), "feature ranking based on coefficient calcualted by logistic model")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
        
        # features selection by intersection mode
        retained_features_by_embedding = log_embedding_selector.get_feature_names_out()

        message = "* {} {} features remained after logistic regression based embedding filtering".format(record_time(), X_log_embedding.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

    # select by SVM model
    elif model  == "svm":
        # Standardization or Min-Max scaling is required for linear SVM 
        if normalization_method == "Min-Max":
            X_stand = MinMaxScaler().fit_transform(np.array(X_var))
        elif normalization_method == "Standardization":
            X_stand = StandardScaler().fit_transform(np.array(X_var))
        
        SVC_ = LinearSVC(multi_class = linear_svm_multi_class, class_weight = class_weight)
        SVC_ = SVC_.fit(X_stand, y_var)
        SVC_embedding_selector = SelectFromModel(SVC_, norm_order = 1, prefit=True)
        X_SVC_embedding = SVC_embedding_selector.transform(np.array(X_stand))

        # evaluate the accuracy of classifier
        accuracy = model_accuracy(X_stand, y_var, model = model, cv = cv, linear_svm_multi_class = linear_svm_multi_class, class_weight = class_weight, n_estimators = n_estimators, random_state = random_state, logistic_multi_class = logistic_multi_class)
        
        # features selection by ensemble mode
        feature_importance = rankdata(np.sum(np.abs(SVC_.coef_), axis = 0), method = rank_method) # sum all absolute coefficients of each feature
        retained_features_ranking_by_embedding = dict([*zip(X_var.columns, feature_importance)])
        message = "* {} {}".format(record_time(), "feature ranking based on coefficient calcualted by linear SVM model")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

        # when the number of unique labels is 2
        if len(set(y_var)) == 2:
            embedding_positive_marker_selection = dict([*zip(X_var.columns, SVC_.coef_[0])])
       
        # features selection by intersection mode
        retained_features_by_embedding = X_var.columns[SVC_embedding_selector.get_support()]
        message = "* {} {} features remained after svm based embedding filtering".format(record_time(), X_SVC_embedding.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
    
    return {
        'retained_features_ranking_by_embedding': retained_features_ranking_by_embedding,
        'retained_features_by_embedding': retained_features_by_embedding,
        'embedding_positive_marker_selection': embedding_positive_marker_selection,
        'accuracy': accuracy,
        'X_stand': X_stand if model in ["logistic", "svm"] else X_var
    }


def wrapping_based_selection(X_var, y_var, X_stand, model="svm", n_features_to_select=0.15, 
                            step=100, cv=5, n_jobs=-1, n_estimators=100, random_state=10, 
                            logistic_multi_class="ovr", linear_svm_multi_class="ovr", 
                            class_weight="balanced", logger=None):
    """
    Perform wrapping-based feature selection using RFECV with specified model.
    
    Parameters
    ----------
    X_var : pandas.DataFrame
        Filtered feature matrix from filtering step
    y_var : pandas.Series
        Target labels
    X_stand : numpy.ndarray
        Standardized feature matrix (for logistic/svm models)
    model : str
        Model type - "random_foreast", "logistic", or "svm"
    n_features_to_select : int or float
        Number of features to select
    step : int or float
        Step size for RFECV
    cv : int
        Cross-validation folds
    n_jobs : int
        Number of parallel jobs
    n_estimators : int
        Number of trees for random forest
    random_state : int
        Random state for reproducibility
    logistic_multi_class : str
        Multi-class strategy for logistic regression
    linear_svm_multi_class : str
        Multi-class strategy for SVM
    class_weight : str
        Class weight strategy
    logger : object
        Logger object for writing logs
        
    Returns
    -------
    dict
        Dictionary containing wrapping results and rankings
    """
    message = "{} ======== wrapping based selection ========".format(record_time())
    print(message)
    if logger != None:
        logger.write("info", remove_time(message))
    
    if n_features_to_select == None:
        # when features to select is None,
        # 50% of all features will be used as threshold
        n_features_to_select = int(X_var.shape[1] * 0.5)
    elif isinstance(n_features_to_select, float):
        n_features_to_select = int(n_features_to_select * X_var.shape[1])

    if model == "random_foreast":
        RFC_ = RandomForestClassifier(n_estimators = n_estimators, random_state = random_state)
        RFC_wrapping_selector = RFECV(RFC_, min_features_to_select = n_features_to_select, step = step, cv = cv, n_jobs= n_jobs)
        X_RFC_wrapping = RFC_wrapping_selector.fit_transform(X_var, y_var)
        
        # features selection by ensemble mode
        retained_features_ranking_by_wrapping = dict([*zip(RFC_wrapping_selector.feature_names_in_ , RFC_wrapping_selector.ranking_)])
        retained_features_ranking_by_wrapping = reverse_rank_weight(retained_features_ranking_by_wrapping)

        message = "* {} {}".format(record_time(), "feature ranking based on ranking provided RFE - random foreast model")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

        
        # features selection by intersection mode
        retained_features_by_wrapping = X_var.columns[RFC_wrapping_selector.support_]

        message = "* {} {} features remained after RFE - random foreast based wrapping filtering".format(record_time(), X_RFC_wrapping.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

    elif model == "logistic":
        logistic_ = LogisticRegression(multi_class = logistic_multi_class, random_state = random_state, max_iter=200, class_weight = class_weight)
        log_wrapping_selector = RFECV(logistic_, min_features_to_select = n_features_to_select, step = step, cv = cv, n_jobs = n_jobs)
        X_log_wrapping = log_wrapping_selector.fit_transform(np.array(X_stand), y_var)
        
        # features selection by ensemble mode
        retained_features_ranking_by_wrapping = dict([*zip(X_var.columns, log_wrapping_selector.ranking_)])
        retained_features_ranking_by_wrapping = reverse_rank_weight(retained_features_ranking_by_wrapping)
        message = "* {} {}".format(record_time(), "feature ranking based on ranking provided RFE - logistic model")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
                
        # features selection by intersection mode
        retained_features_by_wrapping = X_var.columns[log_wrapping_selector.support_]
        message = "* {} {} features remained after RFE - logistic regression based wrapping filtering".format(record_time(), X_log_wrapping.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))

    elif model == "svm":
        SVC_ = LinearSVC(multi_class = linear_svm_multi_class, class_weight = class_weight)
        SVC_wrapping_selector = RFECV(SVC_, min_features_to_select = n_features_to_select, step = step, cv = cv, n_jobs = n_jobs)
        X_SVC_wrapping = SVC_wrapping_selector.fit_transform(X_stand, y_var)
        
        # features selection by ensemble mode
        retained_features_ranking_by_wrapping = dict([*zip(X_var.columns, SVC_wrapping_selector.ranking_)])
        retained_features_ranking_by_wrapping = reverse_rank_weight(retained_features_ranking_by_wrapping)
        message = "* {} {}".format(record_time(), "feature ranking based on ranking provided RFE - SVM model")
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
                
       # features selection by intersection mode
        retained_features_by_wrapping = X_var.columns[SVC_wrapping_selector.support_]
        message = "* {} {} features remained after RFE - svm based wrapping filtering".format(record_time(), X_SVC_wrapping.shape[1])
        print(message)
        if logger != None:
            logger.write("info", remove_time(message))
    
    return {
        'retained_features_ranking_by_wrapping': retained_features_ranking_by_wrapping,
        'retained_features_by_wrapping': retained_features_by_wrapping
    }


def aggregate_features(X_var, y_var, filtering_results, embedding_results, wrapping_results, 
                      merge_rank_method="geom.mean", n_features_to_select=0.15, logger=None):
    """
    Combine results from all feature selection methods using ensemble or intersection approach.
    
    Parameters
    ----------
    X_var : pandas.DataFrame
        Filtered feature matrix
    y_var : pandas.Series
        Target labels
    filtering_results : dict
        Results from filtering-based selection
    embedding_results : dict
        Results from embedding-based selection
    wrapping_results : dict
        Results from wrapping-based selection
    merge_rank_method : str
        Method to combine rankings - "geom.mean", "mean", "median", "max"
    n_features_to_select : int or float
        Number of features to select for ensemble method
    logger : object
        Logger object for writing logs
        
    Returns
    -------
    dict
        Dictionary containing final aggregated results
    """
    message = "{} ======== final feature selection ========".format(record_time())
    print(message)
    if logger != None:
        logger.write("info", remove_time(message))
    
    if isinstance(n_features_to_select, float):
        n_features_to_select = int(n_features_to_select * X_var.shape[1])
                
    # Features selection by ensemble mode
    rank_ls = list()

    for feature in X_var.columns:
        ranks = [filtering_results['retained_features_ranking_by_variance'][feature], 
                filtering_results['retained_features_ranking_by_correlation'][feature], 
                embedding_results['retained_features_ranking_by_embedding'][feature], 
                wrapping_results['retained_features_ranking_by_wrapping'][feature]]
        if merge_rank_method == "max":
            rank_ls.append((feature, max(ranks)))
        elif merge_rank_method == "median":
            rank_ls.append((feature, np.median(np.array(ranks))))
        elif merge_rank_method == "mean":
            rank_ls.append((feature, np.mean(np.array(ranks))))
        elif merge_rank_method == "geom.mean":
            rank_ls.append((feature, gmean(np.array(ranks))))

    rank_ls = sorted(rank_ls, key = lambda x: -x[1])
    rank_ls = [ _ for _ in rank_ls[0:n_features_to_select] ]

    # Features selection by intersection mode
    retained_features_by_filter = set(filtering_results['retained_features_by_filter'])
    retained_features_by_embedding = set(embedding_results['retained_features_by_embedding'])
    retained_features_by_wrapping = set(wrapping_results['retained_features_by_wrapping'])

    final_feture_selection = reduce(lambda x,y: x.intersection(y), [retained_features_by_embedding, retained_features_by_filter, retained_features_by_wrapping])

    message2 = "* {} select top {} features ranked by using {} method to combine feature rankings obtained by different estimators".format(record_time(), len(rank_ls), merge_rank_method)
    message = "* {} {} features remained after intersecting the key features found by filtering, embedding and wrapping-based feature selection methods".format(record_time(), len(final_feture_selection))
    print(message2)
    print(message)
    if logger != None:
        logger.write("info", remove_time(message2))
        logger.write("info", remove_time(message))
    
    return {
        'final_feature_selection_by_ensemble': rank_ls,
        'final_feature_selection_by_intersection': final_feture_selection,
        'retained_features_by_filtering': retained_features_by_filter,
        'retained_features_by_embedding': retained_features_by_embedding,
        'retained_features_by_wrapping': retained_features_by_wrapping
    }


[docs] def feature_selection(data, label_column, filename = None, logger = None, rank_method = "dense", merge_rank_method = "geom.mean", variance_threshold = "mean", mutual_info = False, chi_square_test = False, F_test = True, model = "svm", random_foreast_threshold = None, n_estimators = 100, random_state = 10, normalization_method = "Min-Max", logistic_multi_class = "ovr", linear_svm_multi_class = "ovr", class_weight = "balanced", n_features_to_select = 0.15, step = 100, cv = 5, n_jobs = -1, save = True): """ A function to do feature seletion based on filtering, embedding and wrapping method respectively or combing those methods together. Parameters ---------- data: A log normalized expression matrix (rows are cells; columns are features) with an extra column containing clustering or cell type labels. label_column: The name of cell type column in the data. filename: A string to name the output file. Default is None. logger: A log_file object to write log information into disk. Default is None. rank_method: A string to decide which rank method will be used to rank the coefficient values returned by different estimators. Default is "dense". Other available words including "average", "min", "max" and "ordinal". merge_rank_method: A string to decide which method will be used to combine the rankings from different estimators. Default is "geom.mean". Other available words including "mean", "min", and "max". variance_threshold: A string to decide which variance cutoff is used to filter out features. "zero" or "mean" could be selected. Default is 'mean'. mutual_info: A Bool value decide whether a mutual information method is employed to filtering out features further. When it's true, F_test and chi_sqaure_test should be specified in false. Default is False. chi_sqaure_test: A Bool value decide whether a chi square test method is employed to filtering out features further. When it's true, F_test and mutual_info should be specified in false. Default is False. F_test: A Bool value decide whether a F test method is employed to filtering out features further. When it's true, chi_sqaure_test and mutual_info should be specified in false. Default is True. model: A string to decide which model is used by embedding- and wrapper- based feature selection. "random_foreast", "logistic" and "svm" could be selected. Default is 'svm'. random_foreast_threshold: A float or int value to set the cutoff (feature_importance_) by random foreast model-basedd embedding feature selection. It only takes effect when model is set in 'random_foreast'. Default is None. When it is None, `1 / the number of all features` will be automaticcally used for this value. n_estimators: A int to indicate the number of trees in the forest. It only takes effect when model is set in 'random_foreast'. Default is 100. random_state: A int to control the randomness of the bootstrapping of the samples. It takes effect when model is set in 'random_foreast' or in "logistic". Default is 10. normalization_method: A string to decide how to normalize data. Default is "Min-Max". Other available words including "Standardization". logistic_multi_class: A string to decide which mode to deal with multiclassification in the logistic model. Default is "ovr". Other available words include "multinomial" and "auto". linear_svm_multi_class: A string to decide which mode to deal with multiclassification in the linear svm model. Default is "ovr". This parameter only takes effect when model is set in 'svm'. Other available words include "ovo". class_weight: A string to decide whether class weights will be considered. If None, all classes are supposed to have weight one. The "balanced" mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as n_samples / (n_classes * np.bincount(y)). Default is 'balanced'. n_featurs_to_selct: A int or float to control the number of features to select. If integer, the parameter is the absolute number of features to select. If float between 0 and 1, it is the fraction of features to select. Default is 0.15. step: A int or float to control the number of features be removed in each round of RFECV algorithm. If greater than or equal to 1, then ``step`` corresponds to the (integer) number of features to remove at each iteration. If within (0.0, 1.0), then ``step`` corresponds to the percentage (rounded down) of features to remove at each iteration. Default is 100. cv: A int to decide the number of cross validation in RFECV algorithm. Default is 5. n_jobs: A int to decide the number of thread used for the program. Default is -1, meaning using all available threads. save: A Bool value to decide whether write the output into the disk. """ message = "{} {}".format(record_time(), "start feature selection") print(message) params = { "label_column": label_column, "file_name": filename, "rank_method": rank_method, "merge_rank_method": merge_rank_method, "variance_threshold": variance_threshold, "mutual_info": mutual_info, "chi_square_test": chi_square_test, "F_test": F_test, "model" : model, "random_foreast_threshold": random_foreast_threshold, "n_estimators": n_estimators, "random_state": random_state, "normalization_method": normalization_method, "logistic_multi_class": logistic_multi_class, "linear_svm_multi_class": linear_svm_multi_class, "class_weight": class_weight, "n_features_to_select": n_features_to_select, "step" : step, "cv": cv, "n_jobs": n_jobs, "save" : save } if logger != None: logger.write("info", remove_time(message)) for key, value in params.items(): logger.write("critical", "{}: {}".format(key, value)) # step 1 - convert category label into numeric label message = "{} step 1 - converting categoric label into numeric label".format(record_time()) print(message) if logger != None: logger.write("info", remove_time(message)) # if label is string, converse to numeric label le = None if any(map(lambda x: isinstance(x, str), data[label_column])): le = LabelEncoder().fit(data[label_column]) label = le.transform(data[label_column]) data[label_column] = label # fix duplicated column names by adding ordinal suffix data.columns = check_duplicated_elements(data.columns.tolist()) # get data and label X = data.iloc[:, 0:-1] y = data.iloc[:, -1] # step 2 - do feature selection message = "{} step 2 - do feature selection".format(record_time()) print(message) if logger != None: logger.write("info", remove_time(message)) # Step 2.1: Filtering-based selection filtering_results = filtering_based_selection( X, y, variance_threshold=variance_threshold, mutual_info=mutual_info, chi_square_test=chi_square_test, F_test=F_test, rank_method=rank_method, logger=logger ) # Step 2.2: Embedding-based selection embedding_results = embedding_based_selection( filtering_results['X_filtered'], filtering_results['y_filtered'], model=model, random_foreast_threshold=random_foreast_threshold, n_estimators=n_estimators, random_state=random_state, normalization_method=normalization_method, logistic_multi_class=logistic_multi_class, linear_svm_multi_class=linear_svm_multi_class, class_weight=class_weight, rank_method=rank_method, cv=cv, logger=logger ) # Step 2.3: Wrapping-based selection wrapping_results = wrapping_based_selection( filtering_results['X_filtered'], filtering_results['y_filtered'], embedding_results['X_stand'], model=model, n_features_to_select=n_features_to_select, step=step, cv=cv, n_jobs=n_jobs, n_estimators=n_estimators, random_state=random_state, logistic_multi_class=logistic_multi_class, linear_svm_multi_class=linear_svm_multi_class, class_weight=class_weight, logger=logger ) # Step 2.4: Aggregate features aggregation_results = aggregate_features( filtering_results['X_filtered'], filtering_results['y_filtered'], filtering_results, embedding_results, wrapping_results, merge_rank_method=merge_rank_method, n_features_to_select=n_features_to_select, logger=logger ) # Build output dictionaries output1 = { "retained_features_ranking_by_variance": filtering_results['retained_features_ranking_by_variance'], "retained_features_ranking_by_correlation": filtering_results['retained_features_ranking_by_correlation'], "retained_features_ranking_by_embedding": embedding_results['retained_features_ranking_by_embedding'], "retained_features_ranking_by_wrapping": wrapping_results['retained_features_ranking_by_wrapping'], "final_feature_selection_by_ensemble": aggregation_results['final_feature_selection_by_ensemble'], "model_accuracy": embedding_results['accuracy'], "params_used_for_feature_selection": params } # only run when the number of unique labels is 2 if len(set(filtering_results['y_filtered'])) == 2 and embedding_results['embedding_positive_marker_selection'] is not None: output1.update({"positive_marker_selection": embedding_results['embedding_positive_marker_selection']}) output2 = { "retained_features_by_filtering": aggregation_results['retained_features_by_filtering'], "retained_features_by_embedding": aggregation_results['retained_features_by_embedding'], "retained_features_by_wrapping": aggregation_results['retained_features_by_wrapping'], "final_feature_selection_by_intersection": aggregation_results['final_feature_selection_by_intersection'], "model_accuracy": embedding_results['accuracy'], "params_used_for_feature_selection": params } if save == True: if filename != None: filename1 = filename + "_" + model + "_" + "ensemble" + "_" + "feature_selection" + "_" + record_time() + ".pkl" with open(filename1, "wb") as file: pickle.dump(output1, file) filename2 = filename + "_" + model + "_" + "intersection" + "_" + "feature_selection" + "_" + record_time() + ".pkl" with open(filename2, "wb") as file: pickle.dump(output2, file) else: filename1 = model + "_" + "ensemble" + "_" + "feature_selection" + "_" + record_time() + ".pkl" with open(filename1, "wb") as file: pickle.dump(output1, file) filename2 = model + "_" + "intersection" + "_" + "feature_selection" + "_" + record_time() + ".pkl" with open(filename2, "wb") as file: pickle.dump(output2, file) # merge two feature selection results output1.update(output2) if le is not None: output1.update({"label_classes": le.classes_}) message = "{} finish feature selection".format(record_time()) print(message) if logger != None: logger.write("info", remove_time(message)) return output1
[docs] def find_signature_genes(data, label_column = "cluster", n_features_to_select = 0.15, ratio_of_none_zero_counts = 0.1, class_weight = "balanced", HVG = True, save = False, filename = None, n_jobs = -1, logger = None, **kwargs): """ A simple wrapper of feature_selection function for finding highly variable genes for overall clusters or for markers of the corresponding cluster. Parameters ---------- data: A log normalized expression matrix (rows are cells; columns are features) with an extra column containing clustering or cell type labels. label_column: A string to specify the name of cell type column in the data. n_features_to_select: A int or float to control the number of features to select. If integer, the parameter is the absolute number of features to select. If float between 0 and 1, it is the fraction of features to select. Default is 0.15. ratio_of_none_zero_counts: A float to determine the cutoff in which a feature will be omited when below specified value. A higher value leads to less features will be kept for calculation. class_weight: A string to decide whether class weights will be considered. If None, all classes are supposed to have weight one. The “balanced” mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as n_samples / (n_classes * np.bincount(y)). Default is 'balanced'. HVG: A Bool value to decider whether only detected highly variable features will be returned. save: A Bool value to decide whether the result will be written into disk. Default is True. filename: A string to name the output file. Default is None. n_jobs: A int to decide the number of thread used for the program. Default is -1, meaning using all available threads. logger: A log_file object to write log information into disk. Default is None. **kwargs: Other paremeters passed to feature_selection function. """ # step1 - do feature selection if isinstance(n_features_to_select, float): n_features_to_select = int(n_features_to_select * data.shape[1]) features = feature_selection(data = data.copy(), label_column = label_column, n_features_to_select = n_features_to_select, save = save, filename = filename, class_weight = class_weight, logger = logger, n_jobs = n_jobs, **kwargs) if not HVG: # group data group_data = data.groupby(label_column) # prepare containers sub_high_expression_genes = dict() for key in list(set(data.loc[:, label_column])): sub_data = group_data.get_group(key) sub_data = sub_data.iloc[:, 0:-1] try: key = features["label_classes"][key] except: key = key sub_data_none_zero = map(lambda x: sub_data.loc[:, x][sub_data.loc[:, x] > 0], sub_data.columns) sub_data_none_zero = [*map(lambda x: x.shape[0], sub_data_none_zero)] sub_data_none_zero_ratio = pd.DataFrame(np.array(sub_data_none_zero) / sub_data.shape[0], columns = ["ratio_of_none_zero_counts"]) sub_data_none_zero_ratio.loc[:, "feature"] = sub_data.columns # get the features with at least 10% (default) cells having none-zero expression values remained_features = sub_data_none_zero_ratio.loc[sub_data_none_zero_ratio.ratio_of_none_zero_counts > ratio_of_none_zero_counts, "feature"] # get the mean expression of each feature each_feature_mean = sub_data.mean() intermediate = pd.DataFrame(each_feature_mean, columns=["expression"]) intermediate.loc[:, "feature"] = intermediate.index intermediate = intermediate.join(sub_data_none_zero_ratio.set_index("feature"), on = "feature") # only keep features with at least 10% (default) cells having none-zero expression values intermediate = intermediate.loc[intermediate.feature.isin(remained_features), :] intermediate = intermediate.sort_values(by = "expression") intermediate.loc[:, "rank"] = rankdata(intermediate.loc[:, "expression"], method="dense") sub_high_expression_genes[str(key)] = intermediate condition = np.array(list(sub_high_expression_genes.keys())) data1 = sub_high_expression_genes[condition[condition != "OnevsRest"][0]] data1.rename(columns={'ratio_of_none_zero_counts': 'pct1', "expression": "expression1", "rank": "rank1"}, inplace=True) data2 = sub_high_expression_genes["OnevsRest"] data2.rename(columns={'ratio_of_none_zero_counts': 'pct2', "expression": "expression2", "rank": "rank2"}, inplace=True) data3 = data1.join(data2.set_index("feature"), on = "feature") data3 = data3[["feature", "expression1", "pct1", "rank1", "expression2", "pct2", "rank2"]] scores = dict(features["final_feature_selection_by_ensemble"]) score_keys = list(scores.keys()) for j in score_keys: if j not in data3.feature.tolist(): scores.pop(j) new_score_key = [i for i in scores.keys()] new_score_value = [i for i in scores.values()] new_scores = pd.DataFrame({"feature": new_score_key, "score": new_score_value}) data4 = data3.join(new_scores.set_index("feature"), on = "feature") output = {"features": features, "sub_high_expression_genes": {condition[condition != "OnevsRest"][0]: data4}} else: output = {"features": features} if save: with open("markers_of_clusters_after_1st_merging_{}.pkl".format(record_time()), "wb") as file: pickle.dump(output, file) return output
[docs] def find_markers_ovr( data, label_column = "cluster", n_features_to_select = 0.15, ratio_of_none_zero_counts = 0.1, class_weight = "balanced", save = False, logger = None, filename = None, n_jobs = -1, **kwargs ): """ A simple wrapper of fing_signature_genes function to run in one vs rest mode. Parameters ---------- data: A log normalized expression matrix (rows are cells; columns are features) with an extra column containing clustering or cell type labels. label_column: A string to specify the name of cell type column in the data. n_features_to_select: A int or float to control the number of features to select. If integer, the parameter is the absolute number of features to select. If float between 0 and 1, it is the fraction of features to select. Default is 0.15. ratio_of_none_zero_counts: A float to determine the cutoff in which a feature will be omited when below specified value. A higher value leads to less features will be kept for calculation. class_weight: A string to decide whether class weights will be considered. If None, all classes are supposed to have weight one. The “balanced” mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as n_samples / (n_classes * np.bincount(y)). Default is 'balanced'. save: A Bool value to decide whether the result will be written into disk. Default is True. logger: A log_file object to write log information into disk. Default is None. filename: A string to name the output file. Default is None. n_jobs: A int to decide the number of thread used for the program. Default is -1, meaning using all available threads. **kwargs: Other paremeters passed to feature_selection function. """ res = dict() keys = data.loc[:, label_column].unique() for key in keys: wk_data = data.copy() wk_data.loc[:, label_column] = wk_data.loc[:, label_column].astype(str) wk_data.loc[wk_data[label_column] != key, label_column] = "OnevsRest" res[key] = find_signature_genes(data = wk_data, label_column = label_column, logger = logger, save = False, filename = "sub_cluster_{}".format(key), class_weight = class_weight, ratio_of_none_zero_counts = ratio_of_none_zero_counts, n_features_to_select = n_features_to_select, n_jobs = n_jobs, HVG = False, **kwargs) if save: if filename != None: with open("{}_{}.pkl".format(filename, record_time()), "wb") as file: pickle.dump(res, file) else: with open("markers_{}.pkl".format(filename, record_time()), "wb") as file: pickle.dump(res, file) return res
def pairwise_compare(data, focus = "expression1", ep_cut_off = 1, pct_cut_off = 0.5, only_positive = True): """ A function to calculate the jaccard index and distance correlation between features of paired group. Parameters ---------- data: A dict returned by global_consensus_cluster or by sub_consensus_cluster function. level: A string. "between" should be used when data is the result of global_consensus_cluster. "within" should be used when data is the result of sub_consensus_cluster. Default is "between". focus: A string to decide the value used for distance correlation calculation. "expression1" - the mean expression of feauture in the interest cluster will be used. "rank1" - the rank of feature based on expression in the interest cluster will be used. Default is 'expression1'. ep_cut_off: A int or float value to decide the expression cutoff to select positive informative features. pct_cut_off: A int or float value to decide the expression percentage cutoff to select positive informative features. only_positive: A Bool value to decide whether only positive informative features will be used to calculate the feature similarities between clusters. Default is True. """ assert isinstance(data, dict), "data should be the result after running consensus_cluster function" jaccard_index = dict() distance_cor = dict() keys = sorted(list(data["global_sign_features"].keys())) for key1 in range(len(keys)-1): for key2 in range(key1+1, len(keys)): high_expression_1 = data["global_sign_features"][keys[key1]]["sub_high_expression_genes"][keys[key1]] sign_genes_1 = data["global_sign_features"][keys[key1]]["features"]["final_feature_selection_by_ensemble"] high_expression_2 = data["global_sign_features"][keys[key2]]["sub_high_expression_genes"][keys[key2]] sign_genes_2 = data["global_sign_features"][keys[key2]]["features"]["final_feature_selection_by_ensemble"] if isinstance(sign_genes_1[0], tuple) and isinstance(sign_genes_2[0], tuple): sub_data1_feature_names = [i[0] for i in sign_genes_1] sub_data2_feature_names = [i[0] for i in sign_genes_2] elif isinstance(sign_genes_1[0], tuple) and isinstance(sign_genes_2[0], tuple) == False: sub_data1_feature_names = [i[0] for i in sign_genes_1] sub_data2_feature_names = sign_genes_2 elif isinstance(sign_genes_1[0], tuple) == False and isinstance(sign_genes_2[0], tuple): sub_data1_feature_names = sign_genes_1 sub_data2_feature_names = [i[0] for i in sign_genes_2] else: sub_data1_feature_names = sign_genes_1 sub_data2_feature_names = sign_genes_2 if only_positive: new_data = {"global_sign_features_before_merging": data["global_sign_features"]} positive_genes1 = retrieve_positive_markers(new_data, keys[key1], ep_cut_off = ep_cut_off, pct_cut_off = pct_cut_off, level = "before") positive_genes2 = retrieve_positive_markers(new_data, keys[key2], ep_cut_off = ep_cut_off, pct_cut_off = pct_cut_off, level = "before") try: # calculate jaccard index based on all positive genes between comparisons jaccard_index["global cluster {} vs global cluster {}".format(keys[key1], keys[key2])] = jaccard_score(positive_genes1.feature, positive_genes2.feature) # calculate distance correlation based on shared positive genes between comparisons shared_positive_genes = set(positive_genes1.feature).intersection(positive_genes2.feature) sub_data1_feature_ranking = positive_genes1.loc[positive_genes1.feature.isin(shared_positive_genes)].sort_values("feature").loc[:, focus].values sub_data2_feature_ranking = positive_genes2.loc[positive_genes2.feature.isin(shared_positive_genes)].sort_values("feature").loc[:, focus].values distance_cor["global cluster {} vs global cluster {}".format(keys[key1], keys[key2])] = distance_correlation(sub_data1_feature_ranking, sub_data2_feature_ranking) # when positive markers can't be extracted under the current cutoff # it would use all informative genes (containing both negative and positive genes) # for calculation except: # calculate jaccard index based on all positive genes between comparisons jaccard_index["global cluster {} vs global cluster {}".format(keys[key1], keys[key2])] = jaccard_score(sub_data1_feature_names, sub_data2_feature_names) # calculate the distance correlation between the all shared genes consisting of positive/negative markers between comparisons shared_sign_features = set(sub_data1_feature_names).intersection(sub_data2_feature_names) sub_data1_feature_ranking = high_expression_1.loc[high_expression_1.feature.isin(shared_sign_features), :] sub_data2_feature_ranking = high_expression_2.loc[high_expression_2.feature.isin(shared_sign_features), :] shared_high_exp_features = set(sub_data1_feature_ranking.feature).intersection(sub_data2_feature_ranking.feature) sub_data1_feature_ranking = sub_data1_feature_ranking.loc[sub_data1_feature_ranking.feature.isin(shared_high_exp_features), :].sort_values("feature") sub_data2_feature_ranking = sub_data2_feature_ranking.loc[sub_data2_feature_ranking.feature.isin(shared_high_exp_features), :].sort_values("feature") distance_cor["global cluster {} vs global cluster {}".format(keys[key1], keys[key2])] = distance_correlation(sub_data1_feature_ranking.loc[:, focus].values, sub_data2_feature_ranking.loc[:, focus].values) else: # calculate jaccard index based on all positive genes between comparisons jaccard_index["global cluster {} vs global cluster {}".format(keys[key1], keys[key2])] = jaccard_score(sub_data1_feature_names, sub_data2_feature_names) # calculate the distance correlation between the all shared genes consisting of positive/negative markers between comparisons shared_sign_features = set(sub_data1_feature_names).intersection(sub_data2_feature_names) sub_data1_feature_ranking = high_expression_1.loc[high_expression_1.feature.isin(shared_sign_features), :] sub_data2_feature_ranking = high_expression_2.loc[high_expression_2.feature.isin(shared_sign_features), :] shared_high_exp_features = set(sub_data1_feature_ranking.feature).intersection(sub_data2_feature_ranking.feature) sub_data1_feature_ranking = sub_data1_feature_ranking.loc[sub_data1_feature_ranking.feature.isin(shared_high_exp_features), :].sort_values("feature") sub_data2_feature_ranking = sub_data2_feature_ranking.loc[sub_data2_feature_ranking.feature.isin(shared_high_exp_features), :].sort_values("feature") distance_cor["global cluster {} vs global cluster {}".format(keys[key1], keys[key2])] = distance_correlation(sub_data1_feature_ranking.loc[:, focus].values, sub_data2_feature_ranking.loc[:, focus].values) res = {"jaccard_index": jaccard_index, "distance_cor": distance_cor} return res