Source code for datasafari.explorer.explore_num

# DataSafari - DataSafari simplifies complex data science tasks into straightforward, powerful one-liners.
# Copyright (C) 2024 George Dreemer.
#
# Read more about DataSafari's LICENSE here: https://datasafari.dev/docs/other/lic-gpl3
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details, specifically under
# version 3 of the License. No later versions are applicable.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see: https://github.com/ETA444/datasafari/blob/main/LICENSE

from typing import List, Optional, Tuple, Dict
import numpy as np
from numpy.linalg import inv
import pandas as pd
from scipy.stats import (
    shapiro, skew, kurtosis, anderson,  # used in 'distribution_analysis'
    chi2  # used in 'outliers_mahalanobis'
)
from datasafari.utils.calculators import calculate_mahalanobis, calculate_vif
from datasafari.evaluator.evaluate_dtype import evaluate_dtype


# explore_num todos 2.0 #
# TODO: Implement new method: 'outlier_dbscan' (density-based spatial clustering outlier detection)
# TODO: Implement new method: 'outlier_isoforest' (isolation forest outlier detection)
# TODO: Implement new method: 'outlier_lof'(local outlier factor outlier detection)
# TODO: Overhaul explore_num output methodology to be more like transformer module one


# main function: explore_num
[docs] def explore_num( df: pd.DataFrame, numerical_variables: List[str], method: str = 'all', output: str = 'print', threshold_z: int = 3 ) -> Optional[Tuple[Dict, pd.DataFrame]]: """ **Explore numerical variables in a DataFrame and gain insights on distribution characteristics, outlier detection using multiple methods (Z-score, IQR, Mahalanobis), normality tests, skewness, kurtosis, correlation analysis, and multicollinearity detection.** Parameters: ----------- df : pd.DataFrame The DataFrame containing the numerical data to analyze. numerical_variables : list A list of strings representing the column names in `df` to be analyzed. method : str, optional, default: 'all' Specifies the analysis method to apply. - ``'correlation_analysis'`` for analyzing the correlation between numerical variables. - ``'distribution_analysis'`` for distribution characteristics, including skewness and kurtosis, and normality tests (Shapiro-Wilk, Anderson-Darling). - ``'outliers_zscore'`` for outlier detection using the Z-score method. - ``'outliers_iqr'`` for outlier detection using the Interquartile Range method. - ``'outliers_mahalanobis'`` for outlier detection using the Mahalanobis distance. - ``'multicollinearity'`` for detecting multicollinearity among the numerical variables. - ``'all'`` to perform all available analyses. output : str, optional, default: 'print' Determines the output format. - ``'print'`` to print the analysis results to the console. - ``'return'`` to return the analysis results as a DataFrame or dictionaries, depending on the analysis type. threshold_z : int, optional, default; 3 Used in method ``'outliers_zscore'``, users can define their preferred z-score threshold, if the default value does not fit their needs. Returns: -------- Tuple[Dict, pd.DataFrame] or None. - ``Tuple[Dict, pd.DataFrame]`` For 'correlation_analysis', returns a DataFrame showing the correlation coefficients between variables if output is 'return'. - ``Tuple[Dict, pd.DataFrame]`` For 'distribution_analysis', returns a DataFrame with distribution statistics if output is 'return'. - ``Tuple[Dict, pd.DataFrame]`` For outlier detection methods ('outliers_zscore', 'outliers_iqr', 'outliers_mahalanobis'), returns a dictionary mapping variables to their outlier values and a DataFrame of rows considered outliers if output is 'return'. - ``Tuple[Dict, pd.DataFrame]`` For 'multicollinearity', returns a DataFrame or a Series indicating the presence of multicollinearity, such as VIF scores, if output is 'return'. - ``Tuple[Dict, pd.DataFrame]`` If output='return' and method='all', returns a comprehensive summary of all analyses as text or a combination of DataFrames and dictionaries. - ``None`` If output='print' and method='all', returns nothing, but prints results to console. Raises: ------- TypeErrors: - If `df` is not a pandas DataFrame. - If `numerical_variables` is not a list of strings. - If `method` is not a string. - If `output` is not a string. - If `threshold_z` is not a float or an int. ValueErrors: - If the `df` is empty. - If `method` is not one of the specified valid methods. - If `output` is not 'print' or 'return'. - If 'numerical_variables' list is empty. - If variables provided through 'numerical_variables' are not numerical variables. - If any specified variables in `numerical_variables` are not found in the DataFrame's columns. Examples: --------- Generating a sample DataFrame to demonstrate the functionality: >>> import datasafari >>> import numpy as np >>> import pandas as pd >>> data = { ... 'Feature1': np.random.normal(loc=0, scale=1, size=100), ... 'Feature2': np.random.exponential(scale=2, size=100), ... 'Feature3': np.random.randint(low=1, high=100, size=100) ... } >>> df = pd.DataFrame(data) The full potential of ``explore_num()`` is unlocked by simply providing a dataframe and the numerical columns to explore: >>> explore_num(df, ['Feature1', 'Feature2', 'Feature3']) Performing correlation analysis and printing the results: >>> explore_num(df, ['Feature1', 'Feature2', 'Feature3'], method='correlation_analysis', output='print') Conducting distribution analysis and returning the results: >>> distribution_results = explore_num(df, ['Feature1', 'Feature2', 'Feature3'], method='distribution_analysis', output='return') >>> print(distribution_results) Detecting outliers using the IQR method and printing the results: >>> explore_num(df, ['Feature1', 'Feature2', 'Feature3'], method='outliers_iqr', output='print') Detecting outliers using the Z-score method with a custom threshold: >>> explore_num(df, ['Feature1', 'Feature2', 'Feature3'], method='outliers_zscore', threshold_z=2, output='print') Identifying multicollinearity and printing VIF scores: >>> explore_num(df, ['Feature1', 'Feature2', 'Feature3'], method='multicollinearity', output='print') Applying all analyses and printing comprehensive results: >>> explore_num(df, ['Feature1', 'Feature2', 'Feature3'], method='all', output='print') Notes ----- - Enhances interpretability by providing insights and conclusions based on the statistical tests and analyses conducted. - Normality tests assess whether data distribution departs from a normal distribution, which is crucial for certain statistical analyses. - Correlation analysis examines the strength and direction of relationships between numerical variables. - Multicollinearity detection is essential for regression analysis, as high multicollinearity can invalidate the model. """ # Error Handling # # TypeErrors if not isinstance(df, pd.DataFrame): raise TypeError("explore_num(): The df parameter must be a pandas DataFrame.") if not isinstance(numerical_variables, list): raise TypeError("explore_num(): The categorical_variables parameter must be a list of variable names.\n Example: var_list = ['var1', 'var2', 'var3']") else: if not all(isinstance(var, str) for var in numerical_variables): raise TypeError("explore_num(): All items in the numerical_variables list must be strings representing column names.") if not isinstance(method, str): raise TypeError("explore_num(): The method parameter must be a string.\n Example: method = 'all'") if not isinstance(output, str): raise TypeError("explore_num(): The output parameter must be a string. \n Example: output = 'return'") if not isinstance(threshold_z, (float, int)): raise TypeError("explore_num(): The value of threshold_z must be a float or int.\nExample: threshold_z = 3") # ValueErrors # Check if df is empty if df.empty: raise ValueError("explore_num(): The input DataFrame is empty.") # Check if method is valid valid_methods = ['correlation_analysis', 'distribution_analysis', 'outliers_zscore', 'outliers_iqr', 'outliers_mahalanobis', 'multicollinearity', 'all'] if method.lower() not in valid_methods: raise ValueError(f"explore_num(): Invalid method '{method}'. Valid options are: {', '.join(valid_methods)}") # Check if output is valid if output.lower() not in ['print', 'return']: raise ValueError("explore_num(): Invalid output method. Choose 'print' or 'return'.") # Check if list has any members if len(numerical_variables) == 0: raise ValueError("explore_num(): The 'numerical_variables' list must contain at least one column name.") # Check if variables are numerical numerical_types = evaluate_dtype(df, numerical_variables, output='list_n') if not all(numerical_types): raise ValueError("explore_num(): The 'numerical_variables' list must contain only names of numerical variables.") # Check if specified variables exist in the DataFrame missing_vars = [var for var in numerical_variables if var not in df.columns] if missing_vars: raise ValueError(f"explore_num(): The following variables were not found in the DataFrame: {', '.join(missing_vars)}") # Main Function # # initialize variables # # (1) each method appends to: result result = [] # (2) method 'outliers_zscore' only, returns these to the user outliers_z_dict = {} outliers_z_df = pd.DataFrame() # (3) method 'outliers_iqr' only, returns these to the user outliers_iqr_dict = {} outliers_iqr_df = pd.DataFrame() # (4) method 'outliers_mahalanobis', returns these to the user outliers_mahalanobis_df = pd.DataFrame() # (5) method 'distribution_analysis' only, returns this df to the user distribution_df = pd.DataFrame(columns=numerical_variables) # (6) method 'correlation_analysis' only, returns df to the user correlation_dfs = [] if method.lower() in ['correlation_analysis', 'all']: # calculate correlations per method pearson_df = df[numerical_variables].corr(method='pearson') spearman_df = df[numerical_variables].corr(method='spearman') kendall_df = df[numerical_variables].corr(method='kendall') # define dictionary for for-loop correlation_analysis = { 'pearson': pearson_df, 'spearman': spearman_df, 'kendall': kendall_df } # construct console output per method for correlation_method, correlation_df in correlation_analysis.items(): result.append(f"\n<______CORRELATIONS ({correlation_method.upper()})______>\n") result.append(f"Overview of {correlation_method.title()} Correlation Coefficients*\n") result.append(correlation_df.to_string()) # save dataframes for extended functionality (method='correlation_analysis') correlation_dfs = [pearson_df, spearman_df, kendall_df] if method.lower() in ['distribution_analysis', 'all']: # appends # # (1) title of method section result.append("\n<______DISTRIBUTION ANALYSIS______>\n") # (2) subtitle result.append("✎ Overview of Results*") # define # # define dist stats for dictionary stats_functions = ['min', 'max', 'mean', 'median', 'mode', 'variance', 'std_dev', 'skewness', 'kurtosis', 'shapiro_p', 'anderson_stat'] # initialize dictionary to be used in the creation of distribution_df stats_dict = {stat: [] for stat in stats_functions} # main operation: descriptive stats, skewness, kurtosis, normality testing for variable_name in numerical_variables: # modify data for this analysis: no NAs data = df[variable_name].copy() data = data.dropna() # calculate descriptive stats var_min, var_max = data.min(), data.max() mean = data.mean() median = data.median() mode = data.mode().tolist() variance = data.var() std_dev = data.std() # calculate skewness, kurtosis and tests for normality skewness = skew(data) kurt = kurtosis(data) shapiro_stat, shapiro_p = shapiro(data) anderson_stat = anderson(data) # interpretation tips # # skewness and kurtosis skewness_tip = " ☻ Tip: Symmetric if ~0, left-skewed if <0, right-skewed if >0" kurt_tip = " ☻ Tip: Mesokurtic if ~0, Platykurtic if <0, Leptokurtic if >0)" # shapiro-wilk interpretation and conclusion based on p shapiro_tip = " • H0: Data is normally distributed.\n • H1: Data is not normally distributed." shapiro_conclusion = f"✘ Conclusion: ['{variable_name}'] is likely not normally distributed." if shapiro_p < 0.05 else "\n✔ Conclusion: ['{variable_name}'] is likely normally distributed." # anderson-darling anderson_tip = " ☻ Tip: Compare the statistic to critical values. Data is likely not normally distributed if the statistic > critical value." # construct console output result.append(f"\n< Distribution Analysis Summary for: ['{variable_name}'] >\n") result.append(f"➡ Min: {var_min:.2f}\n➡ Max: {var_max:.2f}\n➡ Mean: {mean:.2f}\n➡ Median: {median:.2f}\n➡ Mode(s): {mode}") result.append(f"➡ Variance: {variance:.2f}\n➡ Standard Deviation: {std_dev:.2f}") result.append(f"\n➡ Skewness: {skewness:.2f}\n{skewness_tip}\n\n➡ Kurtosis: {kurt:.2f}\n{kurt_tip}") # this output is migrated to 'assumptions' method if user-defined method is 'all' if method.lower() == 'distribution_analysis': result.append(f"\n★ Shapiro-Wilk Test for Normality:\n{shapiro_tip}\n ➡ p-value = {shapiro_p:.4f}\n {shapiro_conclusion}\n") result.append(f"\n★ Anderson-Darling Test for Normality:\n ➡ statistic = {anderson_stat.statistic:.4f}\n ➡ significance levels = {anderson_stat.significance_level}\n ➡ critical values = {anderson_stat.critical_values}\n{anderson_tip}\n") # save calculation results to stats_dict stats_dict['min'].append(var_min) stats_dict['max'].append(var_max) stats_dict['mean'].append(mean) stats_dict['median'].append(median) stats_dict['mode'].append(mode[0] if len(mode) != 0 else pd.NA) # handle special case of multiple modes stats_dict['variance'].append(variance) stats_dict['std_dev'].append(std_dev) stats_dict['skewness'].append(skewness) stats_dict['kurtosis'].append(kurt) stats_dict['shapiro_p'].append(shapiro_p) stats_dict['anderson_stat'].append(anderson_stat.statistic) # construct df from stats_dict: distribution_df distribution_dfs = [] for stat, values in stats_dict.items(): temp_df = pd.DataFrame({stat: values}, index=numerical_variables) distribution_dfs.append(temp_df) distribution_df = pd.concat(distribution_dfs, axis=1) distribution_df.columns = stats_functions # ensure the columns are named correctly distribution_df.index.name = 'Variable/Statistic' # appends (continued) # # method='distribution_analysis' info if method is all if method.lower() == 'all': result.append("\n✎ * NOTE: If method='distribution_analysis', aside from the overview above, the function RETURNS:") result.append("■ 1 - Dataframe: where index are your variables, columns are all the calculated statistic (wide format for readability)") result.append("☻ HOW TO: df = explore_num(yourdf, yourlist, method='distribution_analysis')") if method.lower() in ['outliers_iqr', 'all']: # appends # # (1) title of method section result.append("\n<______OUTLIERS - IQR METHOD______>\n") # (2) suitability tip result.append("☻ Tip: The IQR method is robust against extreme values, ideal for identifying outliers\nin skewed distributions by focusing on the data's middle 50%.\n") # (3) subtitle result.append("✎ Overview of Results*\n") # main operation: quantile definitions, iqr and outlier classification for variable_name in numerical_variables: # calculate quantile 1, quantile 3 and inter quartile range for respective column quantile1 = df[variable_name].quantile(0.25) quantile3 = df[variable_name].quantile(0.75) iqr = quantile3 - quantile1 # determine lower and upper bounds lower_bound = quantile1 - 1.5 * iqr upper_bound = quantile3 + 1.5 * iqr # outlier classification outlier_rows = df[ (df[variable_name] < lower_bound) | (df[variable_name] > upper_bound) ] # save results: dictionary and df (objects) outliers_iqr_dict[variable_name] = outlier_rows[variable_name].tolist() outliers_iqr_df = pd.concat([outliers_iqr_df, outlier_rows], ignore_index=False) # conditional output string format and stats calculations title = f"< Results for ['{variable_name}'] >\n" result.append(title) outlier_count = len(outliers_iqr_dict[variable_name]) if outlier_count == 0: stats = f"➡ Number of outliers: {outlier_count}\n➡ Min: -\n➡ Max: -\n➡ Mean: -" row_indices = "➡ Location of outliers in your df (indices): -\n" result.append(stats) result.append(row_indices) else: outlier_min = min(outliers_iqr_dict[variable_name]) outlier_max = max(outliers_iqr_dict[variable_name]) outlier_mean = sum(outliers_iqr_dict[variable_name]) / len(outliers_iqr_dict[variable_name]) stats = f"➡ Number of outliers: {outlier_count}\n➡ Min: {outlier_min}\n➡ Max: {outlier_max}\n➡ Mean: {outlier_mean}" row_indices = f"➡ Location of outliers in your df (indices):\n{outlier_rows.index.tolist()}\n" result.append(stats) result.append(row_indices) # appends (continued) # # (6-9) method='outliers_iqr' info if method.lower() == 'all': result.append("\n✎ * NOTE: If method='outliers_iqr', aside from the overview above, the function RETURNS:") result.append("■ 1 - Dictionary: key=variable name, value=list of outlier values for that row") result.append("■ 2 - Dataframe: Rows from the original df that were classified as outliers. (preserved index)") result.append("☻ HOW TO: dict, df = explore_num(yourdf, yourlist, method='outliers_iqr')") if method.lower() in ['outliers_zscore', 'all']: # definitions # data = df.copy() # appends # # (1) title of method section result.append("\n<______OUTLIERS - Z-SCORE METHOD______>\n") # (2) suitability tip result.append("☻ Tip: The Z-Score method excels at identifying outliers in data with a distribution\nclose to normal, highlighting values far from the mean.\n") # (3) subtitle result.append("✎ Overview of Results*\n") # main operation: z-score calculation per variable, and outlier classification. for variable_name in numerical_variables: # zscore column name z_col = variable_name + '_zscore' # calculate z-score for col data[z_col] = ( (data[variable_name] - data[variable_name].mean()) / data[variable_name].std() ) # outlier classification outlier_rows = data[ data[z_col].abs() > threshold_z ] # save results: dictionary and df (objects) outliers_z_dict[variable_name] = outlier_rows[variable_name].tolist() outliers_z_df = pd.concat([outliers_z_df, outlier_rows], ignore_index=False) # conditional output string format and stats calculations title = f"< Results for ['{variable_name}'] >\n" result.append(title) outlier_count = len(outliers_z_dict[variable_name]) if outlier_count == 0: stats = f"➡ Number of outliers: {outlier_count}\n➡ Min: -\n➡ Max: -\n➡ Mean: -" row_indices = "➡ Location of outliers in your df (indices): -\n" result.append(stats) result.append(row_indices) else: outlier_min = min(outliers_z_dict[variable_name]) outlier_max = max(outliers_z_dict[variable_name]) outlier_mean = sum(outliers_z_dict[variable_name]) / len(outliers_z_dict[variable_name]) stats = f"➡ Number of outliers: {outlier_count}\n➡ Min: {outlier_min}\n➡ Max: {outlier_max}\n➡ Mean: {outlier_mean}" row_indices = f"➡ Location of outliers in your df (indices):\n{outlier_rows.index.tolist()}\n" result.append(stats) result.append(row_indices) # appends (continued) # # (6-9) method='outliers_zscore' info if method.lower() == 'all': result.append("\n✎ * NOTE: If method='outliers_zscore', aside from the overview above, the function RETURNS:") result.append("■ 1 - Dictionary: key=variable name, value=list of outlier values for that row") result.append("■ 2 - Dataframe: Rows from the original df that were classified as outliers. (preserved index)") result.append("☻ HOW TO: dict, df = explore_num(yourdf, yourlist, method='outliers_zscore')") if method.lower() in ['outliers_mahalanobis', 'all']: # definitions # # use non-na df: data data = df.copy() data = data[numerical_variables].dropna() try: # calculate the mean and inverse of the covariance matrix mean_vector = data.mean().values inv_cov_matrix = inv(np.cov(data, rowvar=False)) # apply the utility function to calculate Mahalanobis distance for each observation data['mahalanobis'] = data.apply(lambda row: calculate_mahalanobis(row.values, mean_vector, inv_cov_matrix), axis=1) # determine outliers based on the chi-square distribution p_value_threshold = 0.05 critical_value = chi2.ppf((1 - p_value_threshold), df=len(numerical_variables)) # classify outliers based on mahalanobis distance relative to critical value outliers_mahalanobis_df = data[data['mahalanobis'] > critical_value] # clean up df data.drop(columns=['mahalanobis'], inplace=True) # construct console output result.append("\n<______OUTLIERS - MAHALANOBIS METHOD*______>\n") result.append(f"Identified outliers based on Mahalanobis distance exceeding the critical value ({critical_value:.2f}) from the chi-square distribution (p-val < {p_value_threshold}.\n") result.append(outliers_mahalanobis_df.to_string()) # appends (continued) # # (6-9) method='outliers_mahalanobis' info if method.lower() == 'all': result.append("\n✎ * NOTE: If method='outliers_mahalanobis', aside from the overview above, the function RETURNS:") result.append("■ 1 - Dataframe: Rows from the original df that were classified as outliers. (preserved index)") result.append("☻ HOW TO: df = explore_num(yourdf, yourlist, method='outliers_mahalanobis')") except np.linalg.LinAlgError as error: result.append(f"Error calculating Mahalanobis distance: {error}") if method.lower() in ['multicollinearity', 'all']: # use non-na df: data data = df.copy() data = data[numerical_variables].dropna() vifs = calculate_vif(data, numerical_variables) result.append("\n<______MULTICOLLINEARITY CHECK - VIF______>\n") result.append(f"Variance Inflation Factors:\n{vifs.to_string()}\n") result.append("☻ Tip: VIF > 10 indicates potential multicollinearity concerns.") # Combine all results combined_result = "\n".join(result) if output.lower() == 'print': # print by default/unconditionally print(combined_result) # extended functionality of output: return (method-specific) if method.lower() == 'outliers_zscore': return outliers_z_dict, outliers_z_df if method.lower() == 'outliers_iqr': return outliers_iqr_dict, outliers_iqr_df if method.lower() == 'outliers_mahalanobis': return outliers_mahalanobis_df if method.lower() == 'distribution_analysis': return distribution_df if method.lower() == 'correlation_analysis': return correlation_dfs elif output.lower() == 'return': # normal functionality of output: return if method.lower() in ['all', 'multicollinearity']: return combined_result # extended functionality of output: return (method-specific) if method.lower() == 'outliers_zscore': return outliers_z_dict, outliers_z_df if method.lower() == 'outliers_iqr': return outliers_iqr_dict, outliers_iqr_df if method.lower() == 'outliers_mahalanobis': return outliers_mahalanobis_df if method.lower() == 'distribution_analysis': return distribution_df if method.lower() == 'correlation_analysis': return correlation_dfs else: raise ValueError("Invalid output method. Choose 'print' or 'return'.")