from __future__ import annotations
import argparse
import json
import logging
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
from dotenv import load_dotenv
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from transformers import AutoTokenizer
# Import FeatureExtractor from .features
try:
from .azure_pipeline import register_processed_data_assets_from_paths
from .features import FeatureExtractor
except ImportError:
from azure_pipeline import register_processed_data_assets_from_paths
from features import FeatureExtractor
logger = logging.getLogger(__name__)
[docs]
def log_class_distributions(df: pd.DataFrame, output_tasks: list[str], df_name: str):
"""Logs the class distribution for specified tasks in a dataframe."""
logger.info(f"{df_name.capitalize()} data class distributions:")
logger.info(f"Available {df_name} columns: {list(df.columns)}")
for col in output_tasks:
if col in df.columns:
dist = df[col].value_counts(normalize=True).to_dict()
logger.info(f" {col}: {json.dumps(dist, indent=2)}")
else:
logger.warning(f" {col}: COLUMN NOT FOUND in {df_name} data")
[docs]
class DatasetLoader:
"""
A class to handle loading and preprocessing of emotion classification datasets.
This class handles:
- Loading training and test data from CSV files
- Cleaning and preprocessing the data
- Mapping emotions to standardized categories
- Visualizing data distributions
Attributes:
emotion_mapping (dict): Dictionary mapping sub-emotions to standardized emotions
train_df (pd.DataFrame): Processed training data
test_df (pd.DataFrame): Processed test data
"""
[docs]
def __init__(self):
# Initialize the DataLoader with emotion mapping.
self.train_df = None
self.test_df = None
[docs]
def load_training_data(self, data_dir="./../../data/raw/all groups"):
"""
Load and preprocess training data from multiple CSV files.
Args:
data_dir (str): Directory containing training data CSV files
Returns:
pd.DataFrame: Processed training data
"""
# Load the dataset (contains train_data-0001.csv, train_data-0002.csv, etc.)
self.train_df = pd.DataFrame()
# Loop over all files in the data directory
for i_file in os.listdir(data_dir):
# If the file is not a CSV, skip it
if not i_file.endswith(".csv"):
logger.warning(f"Skipping non-CSV file: {i_file}")
continue
# Read the current CSV file and select specific columns
try:
df_ = pd.read_csv(os.path.join(data_dir, i_file))[
[
"start_time",
"end_time",
"text",
"emotion",
"sub-emotion",
"intensity",
]
]
except Exception as e:
logger.error(f"Error reading {i_file}: {e}")
continue
# Handle column name variations (sub-emotion vs sub_emotion)
if "sub-emotion" in df_.columns:
df_ = df_.rename(columns={"sub-emotion": "sub_emotion"})
# Concatenate the current file's data with the main DataFrame
self.train_df = pd.concat([self.train_df, df_])
# Drop null and duplicate rows
self.train_df = self.train_df.dropna()
self.train_df = self.train_df.drop_duplicates()
# Reset index of the combined DataFrame
self.train_df = self.train_df.reset_index(drop=True)
return self.train_df
[docs]
def load_test_data(self, test_file="./../../data/test_data-0001.csv"):
"""
Load and preprocess test data from a CSV file.
Args:
test_file (str): Path to the test data CSV file
Returns:
pd.DataFrame: Processed test data
"""
# Read the test data CSV file
try:
self.test_df = pd.read_csv(test_file)[
[
"start_time",
"end_time",
"text",
"emotion",
"sub-emotion",
"intensity",
]
]
except Exception as e:
logger.error(f"Error reading test file {test_file}: {e}")
return None
# Handle column name variations (sub-emotion vs sub_emotion)
if "sub-emotion" in self.test_df.columns:
self.test_df = self.test_df.rename(columns={"sub-emotion": "sub_emotion"})
# Drop null and duplicate rows
self.test_df = self.test_df.dropna()
self.test_df = self.test_df.drop_duplicates()
# Reset index of the test DataFrame
self.test_df = self.test_df.reset_index(drop=True)
return self.test_df
[docs]
def plot_distributions(self):
"""Plot distributions of emotions, sub-emotions, and intensities \
for both training and test sets."""
# Distribution of emotions in the training set
fig, axes = plt.subplots(1, 3, figsize=(20, 6))
for i, col in enumerate(["emotion", "sub_emotion", "intensity"]):
sns.countplot(data=self.train_df, x=col, palette="Set2", ax=axes[i])
axes[i].set_title(f"'{col.capitalize()}' Distribution in Train/Val Set")
axes[i].set_xlabel(col.capitalize())
axes[i].set_ylabel("Count")
axes[i].tick_params(axis="x", rotation=45)
plt.tight_layout()
plt.show()
# Distribution of emotions in the test set
fig, axes = plt.subplots(1, 3, figsize=(20, 6))
for i, col in enumerate(["emotion", "sub_emotion", "intensity"]):
sns.countplot(data=self.test_df, x=col, palette="Set2", ax=axes[i])
axes[i].set_title(f"'{col.capitalize()}' Distribution in Test Set")
axes[i].set_xlabel(col.capitalize())
axes[i].set_ylabel("Count")
axes[i].tick_params(axis="x", rotation=45)
plt.tight_layout()
plt.show()
[docs]
class DataPreparation:
"""
A class to handle data preparation for emotion classification tasks.
This class handles:
- Label encoding for target variables
- Dataset creation
- Dataloader setup
Args:
output_columns (list): List of output column names to encode
model_name (str): Name of the pretrained model to use for tokenization
max_length (int): Maximum sequence length for tokenization
batch_size (int): Batch size for dataloaders
feature_config (dict, optional): Configuration for feature extraction
"""
[docs]
def __init__(
self,
output_columns,
tokenizer,
max_length=128,
batch_size=16,
feature_config=None,
encoders_save_dir=None,
encoders_load_dir=None,
):
self.output_columns = output_columns
self.tokenizer = tokenizer
self.max_length = max_length
self.batch_size = batch_size
# Initialize label encoders
self.label_encoders = {col: LabelEncoder() for col in output_columns}
# Determine project root directory to construct lexicon path
_current_file_path_dp = os.path.abspath(__file__)
# Assuming data.py is in src/emotion_clf_pipeline/
_project_root_dir_dp = os.path.dirname(
os.path.dirname(os.path.dirname(_current_file_path_dp))
)
# Fix for Docker container: if we're in /app, use /app as project root
if _project_root_dir_dp == "/" and os.path.exists("/app/models"):
_project_root_dir_dp = "/app"
emolex_lexicon_path = os.path.join(
_project_root_dir_dp,
"models",
"features",
"EmoLex",
"NRC-Emotion-Lexicon-Wordlevel-v0.92.txt",
)
# Use provided encoders_save_dir or default
self.encoders_output_dir = (
encoders_save_dir
if encoders_save_dir
else os.path.join(_project_root_dir_dp, "models", "encoders")
)
# Store encoders_load_dir
self.encoders_input_dir = encoders_load_dir
# Attempt to load encoders if encoders_input_dir is provided
self.encoders_loaded = self._load_encoders()
# Initialize feature extractor with configuration and lexicon path
self.feature_extractor = FeatureExtractor(
feature_config=feature_config, lexicon_path=emolex_lexicon_path
)
def _load_encoders(self):
"""Load label encoders from disk if encoders_input_dir is set."""
if not self.encoders_input_dir:
logger.info(
"Encoder input directory not provided."
" Will fit new encoders if training."
)
return False
loaded_all = True
for col in self.output_columns:
encoder_path = os.path.join(self.encoders_input_dir, f"{col}_encoder.pkl")
if os.path.exists(encoder_path):
try:
with open(encoder_path, "rb") as f:
# Load trusted sklearn encoder from controlled environment
self.label_encoders[col] = pickle.load(f) # nosec B301
logger.info(f"Loaded encoder for {col} from {encoder_path}")
except Exception as e:
logger.error(
f"Error loading encoder for {col} from {encoder_path}: {e}."
" A new encoder will be used."
)
self.label_encoders[col] = LabelEncoder()
loaded_all = False
else:
logger.warning(
f"Encoder file not found for {col} at {encoder_path}. "
"A new encoder will be used and fitted if train data is provided."
)
self.label_encoders[col] = LabelEncoder()
loaded_all = False
if loaded_all:
logger.info("All encoders loaded successfully.")
else:
logger.warning(
"One or more encoders failed to load or were not found. "
"New encoders will be fitted for these if training data is provided."
)
return loaded_all
# def apply_data_augmentation(
# self,
# train_df,
# balance_strategy="equal",
# samples_per_class=None,
# augmentation_ratio=2,
# random_state=42,
# ):
# """
# Apply text augmentation to balance the training data.
# Args:
# train_df (pd.DataFrame): Training dataframe
# balance_strategy (str, optional): Strategy for balancing. Options:
# 'equal', 'majority', 'target'. Defaults to 'equal'.
# samples_per_class (int, optional): Number of samples per class for
# 'equal' or 'target' strategy. Defaults to None.
# augmentation_ratio (int, optional): Maximum ratio of augmented to
# original samples. Defaults to 2.
# random_state (int, optional): Random seed. Defaults to 42.
# Returns:
# pd.DataFrame: Balanced training dataframe
# """
# logger.info(f"Applying data augmentation with strategy: {balance_strategy}")
# original_class_dist = train_df["emotion"].value_counts()
# logger.info("Original class distribution:")
# for emotion, count in original_class_dist.items():
# logger.info(f" {emotion}: {count}")
# # Create an instance of TextAugmentor
# augmentor = TextAugmentor(random_state=random_state)
# # Apply the appropriate balancing strategy
# if balance_strategy == "equal":
# # Generate exactly equal samples per class
# if samples_per_class is None:
# # If not specified, use the average count
# samples_per_class = int(
# len(train_df) / len(train_df["emotion"].unique())
# )
# balanced_df = augmentor.generate_equal_samples(
# train_df,
# text_column="text",
# emotion_column="emotion",
# samples_per_class=samples_per_class,
# random_state=random_state,
# )
# elif balance_strategy == "majority":
# # Balance up to the majority class
# balanced_df = augmentor.balance_dataset(
# train_df,
# text_column="text",
# emotion_column="emotion",
# target_count=None, # Use majority class count
# augmentation_ratio=augmentation_ratio,
# random_state=random_state,
# )
# elif balance_strategy == "target":
# # Balance to a target count
# if samples_per_class is None:
# # If not specified, use the median count
# samples_per_class = int(train_df["emotion"].value_counts().median())
# balanced_df = augmentor.balance_dataset(
# train_df,
# text_column="text",
# emotion_column="emotion",
# target_count=samples_per_class,
# augmentation_ratio=augmentation_ratio,
# random_state=random_state,
# )
# else:
# raise ValueError(f"Unknown balance strategy: {balance_strategy}")
# # Apply additional sub-emotion balancing if needed
# if "sub_emotion" in self.output_columns:
# logger.info("After emotion balancing, checking sub-emotion distribution:")
# sub_emotion_dist = balanced_df["sub_emotion"].value_counts()
# logger.info(f"Sub-emotion classes: {len(sub_emotion_dist)}")
# logger.info(
# f"Min class size: {sub_emotion_dist.min()}, "
# f"Max class size: {sub_emotion_dist.max()}"
# )
# # If sub-emotion is highly imbalanced, apply additional balancing
# imbalance_ratio = sub_emotion_dist.max() / sub_emotion_dist.min()
# if imbalance_ratio > 5: # If max/min ratio is greater than 5
# logger.info(
# f"Sub-emotion imbalance ratio: {imbalance_ratio:.1f}, "
# "applying additional balancing"
# )
# # Apply augmentation for sub-emotions with extreme imbalance
# sub_balanced_df = augmentor.balance_dataset(
# balanced_df,
# text_column="text",
# emotion_column="sub_emotion",
# target_count=max(
# 50, sub_emotion_dist.median() // 2
# ), # Target at least 50 samples or half median
# augmentation_ratio=1, # Keep augmentation minimal
# random_state=random_state,
# )
# balanced_df = sub_balanced_df
# return balanced_df
[docs]
def prepare_data(
self,
train_df,
test_df=None,
validation_split=0.2,
apply_augmentation=False,
balance_strategy="equal",
samples_per_class=None,
augmentation_ratio=2,
):
"""
Prepare data for training emotion classification models.
Args:
train_df (pd.DataFrame): Training dataframe
test_df (pd.DataFrame, optional): Test dataframe. Defaults to None.
validation_split (float, optional): Fraction of training data to use
for validation. Defaults to 0.2.
apply_augmentation (bool, optional): Whether to apply data
augmentation. Defaults to False.
balance_strategy (str, optional): Strategy for balancing if
augmentation is applied. Options: 'equal', 'majority', 'target'.
Defaults to 'equal'.
samples_per_class (int, optional): Number of samples per class for
balancing. Defaults to None.
augmentation_ratio (int, optional): Maximum ratio of augmented to
original samples. Defaults to 2.
Returns:
tuple: (train_dataset, val_dataset, test_dataset, train_dataloader,
val_dataloader, test_dataloader, class_weights_tensor)
"""
# Handle evaluation-only scenario (train_df is None)
is_evaluation_only = train_df is None
if is_evaluation_only:
logger.info(
"Evaluation-only mode: train_df is None, "
"skipping training data preparation"
)
# Ensure encoders are loaded for evaluation
if not self.encoders_loaded:
logger.error(
"Cannot perform evaluation without pre-loaded encoders "
"when train_df is None"
)
raise ValueError(
"Label encoders must be pre-loaded for evaluation-only mode"
)
logger.info("Using pre-loaded label encoders for evaluation")
else:
# Standard training/validation preparation
# Create output directory for encoders if needed
if not self.encoders_loaded and self.encoders_output_dir:
os.makedirs(self.encoders_output_dir, exist_ok=True)
logger.info(
f"Ensured encoder output dir exists: {self.encoders_output_dir}"
)
# Fit label encoders on training data ONLY IF NOT LOADED
if not self.encoders_loaded:
logger.info(
"Fitting new label encoders as they were not loaded or load failed."
)
for col in self.output_columns:
if col in train_df.columns:
# Ensure the column is treated as string for consistent fitting
self.label_encoders[col].fit(train_df[col].astype(str))
logger.info(f"Fitted encoder for column: {col}")
else:
logger.warning(
f"Column {col} not found in train_df for fitting encoder."
)
# Save label encoders if they were just fitted
# and a save directory is provided
if self.encoders_output_dir:
self._save_encoders()
else:
logger.info("Using pre-loaded label encoders.")
# Transform training data labels
for col in self.output_columns:
if col in train_df.columns:
try:
# Ensure the column is treated as string for
# consistent transformation
train_df[f"{col}_encoded"] = self.label_encoders[col].transform(
train_df[col].astype(str)
)
except ValueError as e:
logger.error(
f"Error transforming column {col} in training data: {e}"
)
logger.error(
f"Classes known to encoder for {col}: "
" {list(self.label_encoders[col].classes_) if "
" hasattr(self.label_encoders[col], 'classes_') else "
" 'Encoder not fitted or classes_ not available'}"
)
raise e
else:
logger.warning(
f"Column {col} (for encoding) not found in train_df."
)
# Handle training data preparation (skip if evaluation-only)
train_dataset = None
val_dataset = None
train_dataloader = None
val_dataloader = None
if not is_evaluation_only:
# Split into train and validation sets
if validation_split == 0.0:
train_indices = list(range(len(train_df)))
val_indices = []
logger.info(
"validation_split is 0.0, using all train_df for train_indices."
)
elif validation_split > 0 and validation_split < 1:
stratify_on = None
if self.output_columns and self.output_columns[0] in train_df:
# sklearn's train_test_split handles cases with single class
# for stratification
# by not stratifying if it's not possible.
stratify_on = train_df[self.output_columns[0]]
train_indices, val_indices = train_test_split(
range(len(train_df)),
test_size=validation_split,
random_state=42,
stratify=stratify_on,
)
else:
# If validation_split is not 0.0 and not in (0.0, 1.0)
# This case should ideally not be hit with current CLI usage
# (0.0 or 0.1).
raise ValueError(
f"Unsupported validation_split value: {validation_split}. "
" Must be 0.0 or in (0.0, 1.0)."
)
# Fit TF-IDF vectorizer on training texts
logger.info("Fitting TF-IDF vectorizer...")
self.feature_extractor.fit_tfidf(train_df["text"].values)
# Extract features for all texts
logger.info("Extracting features for training data...")
train_features = []
for text in tqdm(
train_df["text"],
desc="Processing training texts",
ncols=120,
colour="green",
):
train_features.append(self.feature_extractor.extract_all_features(text))
train_features = np.array(train_features)
# Create train and validation datasets
train_dataset = EmotionDataset(
texts=train_df["text"].values[train_indices],
labels=train_df[
[f"{col}_encoded" for col in self.output_columns]
].values[train_indices],
features=train_features[train_indices],
tokenizer=self.tokenizer,
max_length=self.max_length,
output_tasks=self.output_columns,
)
val_dataset = None
if val_indices:
val_dataset = EmotionDataset(
texts=train_df["text"].values[val_indices],
labels=train_df[
[f"{col}_encoded" for col in self.output_columns]
].values[val_indices],
features=train_features[val_indices],
tokenizer=self.tokenizer,
max_length=self.max_length,
output_tasks=self.output_columns,
)
# Create dataloaders
train_dataloader = DataLoader(
train_dataset, batch_size=self.batch_size, shuffle=True
)
val_dataloader = (
DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
if val_dataset
else None
)
# Create test dataloader if test data is provided
test_dataloader = None
if test_df is not None:
# For evaluation-only mode, ensure feature extractor is fitted
if is_evaluation_only:
logger.info(
"Evaluation-only mode: Assuming feature extractor is pre-fitted"
)
# If not fitted, this will raise an error which is expected behavior
# Transform test data labels
for col in self.output_columns:
if col in test_df.columns:
# Ensure consistency with fitting: apply .astype(str)
test_df[f"{col}_encoded"] = self.label_encoders[col].transform(
test_df[col].astype(str)
)
# Extract features for test texts
logger.info("Extracting features for test data...")
test_features = []
for text in tqdm(
test_df["text"],
desc="Processing test texts",
ncols=120,
colour="blue",
):
test_features.append(self.feature_extractor.extract_all_features(text))
test_features = np.array(test_features)
# Transform test labels
for col in self.output_columns:
if col in test_df.columns:
try:
# Ensure the column is treated as string for consistent
# transformation
test_df[f"{col}_encoded"] = self.label_encoders[col].transform(
test_df[col].astype(str)
)
except ValueError as e:
logger.error(
f"Error transforming column {col} in test data: {e}"
)
raise e
else:
logger.warning(f"Column {col} (for encoding) not found in test_df.")
# Get labels
if all(f"{col}_encoded" in test_df.columns for col in self.output_columns):
# If all output columns have encoded labels, use them
test_labels = test_df[
[f"{col}_encoded" for col in self.output_columns]
].values
else:
# If not all output columns have encoded labels, set to None
test_labels = None
test_dataset = EmotionDataset(
texts=test_df["text"].values,
labels=test_labels,
features=test_features,
tokenizer=self.tokenizer,
feature_extractor=self.feature_extractor,
max_length=self.max_length,
output_tasks=self.output_columns,
)
test_dataloader = DataLoader(test_dataset, batch_size=self.batch_size)
# Make a copy of the dataframes to avoid modifying the originals
# Store processed dataframes as attributes
if train_df is not None:
self.train_df_processed = train_df.copy()
else:
self.train_df_processed = None
if test_df is not None:
self.test_df_processed = test_df.copy()
self.test_df_split = test_df.copy()
else:
self.test_df_processed = None
self.test_df_split = None
# Apply data augmentation if requested
if apply_augmentation:
# Assuming augmentation logic might be added here or called
# For now, if it was empty, it remains effectively so.
# If self.apply_data_augmentation was intended:
# train_df = self.apply_data_augmentation(train_df, balance_strategy,
# samples_per_class, augmentation_ratio)
# And then train_dataset/val_dataset would need to be recreated or updated.
# This is a potential latent issue if augmentation is used.
pass
return train_dataloader, val_dataloader, test_dataloader
def _save_encoders(self):
"""Save label encoders to disk."""
if not self.encoders_output_dir:
logger.warning(
"Encoders output directory not set. Skipping saving encoders."
)
return
os.makedirs(self.encoders_output_dir, exist_ok=True)
for col, encoder in self.label_encoders.items():
encoder_path = os.path.join(self.encoders_output_dir, f"{col}_encoder.pkl")
with open(encoder_path, "wb") as f:
pickle.dump(encoder, f)
[docs]
def get_num_classes(self):
"""Get the number of classes for each output column."""
num_classes = {}
for col in self.output_columns:
if hasattr(self.label_encoders[col], "classes_"):
num_classes[col] = len(self.label_encoders[col].classes_)
else:
# This case should ideally not happen if encoders are always
# fitted or loaded before this call
# logger.warning(f"Label encoder for column {col} does not have "
# "classes_ attribute. It might not have been fitted or "
# "loaded correctly."
# )
num_classes[col] = 0
return num_classes
[docs]
class EmotionDataset(Dataset):
"""Custom Dataset for emotion classification."""
[docs]
def __init__(
self,
texts,
tokenizer,
features,
labels=None,
feature_extractor=None,
max_length=128,
output_tasks=None,
):
"""
Initialize the dataset.
Args:
texts (list): List of text samples
tokenizer: BERT tokenizer
features (np.ndarray): Pre-extracted features
labels (list, optional): List of label tuples (emotion, sub_emotion,
intensity). None for prediction.
feature_extractor (FeatureExtractor, optional): Feature extractor
instance. Not strictly needed if features are pre-computed.
max_length (int): Maximum sequence length for BERT
output_tasks (list, optional): List of tasks to output. Used only if
labels are provided.
"""
self.texts = texts
self.tokenizer = tokenizer
self.features = features # Should be pre-calculated and correctly dimensioned
self.labels = labels
# self.feature_extractor = feature_extractor
self.max_length = max_length
self.output_tasks = output_tasks
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
encoding = self.tokenizer(
text,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="pt",
)
item = {
"input_ids": encoding["input_ids"].flatten(),
"attention_mask": encoding["attention_mask"].flatten(),
"features": torch.tensor(self.features[idx], dtype=torch.float32),
}
# Add labels if they are available (i.e., during training/evaluation)
if self.labels is not None and self.output_tasks is not None:
current_labels = self.labels[idx]
for i, task in enumerate(self.output_tasks):
item[f"{task}_label"] = torch.tensor(
current_labels[i], dtype=torch.long
)
return item
[docs]
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Data processing and feature engineering pipeline"
)
parser.add_argument(
"--raw-train-path",
type=str,
default="data/raw/train",
help="Path to raw training data (directory or CSV file)",
)
parser.add_argument(
"--raw-test-path",
type=str,
default="data/raw/test/test_data-0001.csv",
help="Path to raw test data CSV file",
)
parser.add_argument(
"--output-dir",
type=str,
default="data/processed",
help="Output directory for processed data",
)
parser.add_argument(
"--encoders-dir",
type=str,
default="models/encoders",
help="Directory to save label encoders",
)
parser.add_argument(
"--model-name-tokenizer",
type=str,
default="microsoft/deberta-v3-xsmall",
help="HuggingFace model name for tokenizer",
)
parser.add_argument(
"--max-length",
type=int,
default=256,
help="Maximum sequence length for tokenization",
)
parser.add_argument(
"--output-tasks",
type=str,
default="emotion,sub-emotion,intensity",
help="Comma-separated list of output tasks",
)
parser.add_argument(
"--register-data-assets",
action="store_true",
help="Register the processed data as assets in Azure ML",
)
args = parser.parse_args()
return args
[docs]
def main():
"""Main function to run the data processing pipeline."""
load_dotenv()
args = parse_args()
# Setup logging
log_file = os.path.join(args.output_dir, "data_processing.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
filename=log_file,
)
logger.info("=== Starting Data Processing Pipeline ===")
# Parse output tasks
output_tasks = [task.strip() for task in args.output_tasks.split(",")]
# Update output_tasks to use underscore instead of hyphen
# for consistency with data columns
output_tasks = [task.replace("sub-emotion", "sub_emotion") for task in output_tasks]
# Set paths from arguments
RAW_TRAIN_PATH = args.raw_train_path
RAW_TEST_FILE = args.raw_test_path
PROCESSED_DATA_DIR = args.output_dir
ENCODERS_DIR = args.encoders_dir
# Processing parameters
MODEL_NAME = args.model_name_tokenizer
MAX_LENGTH = args.max_length
BATCH_SIZE = 16
# Feature configuration
FEATURE_CONFIG = {
"pos": False,
"textblob": False,
"vader": False,
"tfidf": True,
"emolex": True,
}
# Intensity mapping for standardization
INTENSITY_MAPPING = {
"mild": "mild",
"neutral": "mild",
"moderate": "moderate",
"intense": "strong",
"overwhelming": "strong",
}
# Create output directories
os.makedirs(PROCESSED_DATA_DIR, exist_ok=True)
os.makedirs(ENCODERS_DIR, exist_ok=True)
try:
# ====================================================================
# STEP 1: Load Raw Data
# ====================================================================
logger.info("Step 1: Loading raw data...")
dataset_loader = DatasetLoader()
# Load training data
if os.path.isdir(RAW_TRAIN_PATH):
logger.info(f"Loading training data from directory: {RAW_TRAIN_PATH}")
train_df = dataset_loader.load_training_data(data_dir=RAW_TRAIN_PATH)
elif os.path.isfile(RAW_TRAIN_PATH):
logger.info(f"Loading training data from file: {RAW_TRAIN_PATH}")
train_df = pd.read_csv(RAW_TRAIN_PATH)
else:
logger.error(f"Training data path not found: {RAW_TRAIN_PATH}")
raise FileNotFoundError(f"Training data path not found: {RAW_TRAIN_PATH}")
# Load test data
if os.path.exists(RAW_TEST_FILE):
if os.path.isdir(RAW_TEST_FILE):
logger.info(f"Loading test data from directory: {RAW_TEST_FILE}")
# If test data is a directory, load all CSV files in it
test_files = []
for file in os.listdir(RAW_TEST_FILE):
if file.endswith(".csv"):
test_files.append(os.path.join(RAW_TEST_FILE, file))
if test_files:
# Load and combine all test CSV files
test_dfs = []
for test_file in test_files:
logger.info(f"Loading test file: {test_file}")
df = dataset_loader.load_test_data(test_file=test_file)
if df is not None:
test_dfs.append(df)
if test_dfs:
test_df = pd.concat(test_dfs, ignore_index=True)
logger.info(f"Combined {len(test_dfs)} test files")
else:
logger.error("No valid test CSV files found in directory")
test_df = None
else:
logger.error(
f"No CSV files found in test directory: {RAW_TEST_FILE}"
)
test_df = None
else:
logger.info(f"Loading test data from file: {RAW_TEST_FILE}")
test_df = dataset_loader.load_test_data(test_file=RAW_TEST_FILE)
else:
logger.error(f"Test data path not found: {RAW_TEST_FILE}")
test_df = None
# Check if we have valid data before proceeding
if test_df is None or len(test_df) == 0:
logger.error("No valid test data loaded")
raise ValueError("No valid test data loaded")
logger.info(f"Loaded {len(train_df)} training samples")
logger.info(f"Loaded {len(test_df)} test samples")
# ====================================================================
# STEP 2: Data Cleaning and Preprocessing
# ====================================================================
logger.info("Step 2: Applying data cleaning and preprocessing...")
# Clean data by removing rows with NaN in critical columns
critical_columns = ["text", "emotion", "sub-emotion", "intensity"]
# Only check columns that exist in the dataframes
train_critical = [col for col in critical_columns if col in train_df.columns]
test_critical = [col for col in critical_columns if col in test_df.columns]
initial_train_len = len(train_df)
initial_test_len = len(test_df)
train_df = train_df.dropna(subset=train_critical)
test_df = test_df.dropna(subset=test_critical)
train_df = train_df.drop_duplicates()
test_df = test_df.drop_duplicates()
train_removed = initial_train_len - len(train_df)
test_removed = initial_test_len - len(test_df)
logger.info(
f"After cleaning: {len(train_df)} training samples "
f"({train_removed} removed)"
)
logger.info(
f"After cleaning: {len(test_df)} test samples " f"({test_removed} removed)"
)
# Apply intensity mapping
train_df["intensity"] = (
train_df["intensity"].map(INTENSITY_MAPPING).fillna("mild")
)
test_df["intensity"] = (
test_df["intensity"].map(INTENSITY_MAPPING).fillna("mild")
)
# Display class distributions after cleaning
logger.info("Displaying class distributions after cleaning...")
log_class_distributions(train_df, output_tasks, "training")
log_class_distributions(test_df, output_tasks, "test")
# ====================================================================
# STEP 3: Initialize Tokenizer and Data Preparation
# ====================================================================
logger.info("Step 3: Initializing tokenizer and data preparation...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
data_prep = DataPreparation(
output_columns=output_tasks,
tokenizer=tokenizer,
max_length=MAX_LENGTH,
batch_size=BATCH_SIZE,
feature_config=FEATURE_CONFIG,
encoders_save_dir=ENCODERS_DIR,
)
# ====================================================================
# STEP 4: Process Data and Extract Features
# ====================================================================
logger.info("Step 4: Processing data and extracting features...")
# Prepare data (this will fit encoders, extract features, and create datasets)
train_dataloader, val_dataloader, test_dataloader = data_prep.prepare_data(
train_df=train_df.copy(), test_df=test_df.copy(), validation_split=0.1
)
logger.info(f"Encoders saved to: {ENCODERS_DIR}")
# ====================================================================
# STEP 5: Save Processed Data
# ====================================================================
logger.info("Step 5: Saving processed data...")
# Save processed training data
if (
hasattr(data_prep, "train_df_processed")
and data_prep.train_df_processed is not None
):
train_output_path = os.path.join(PROCESSED_DATA_DIR, "train.csv")
data_prep.train_df_processed.to_csv(train_output_path, index=False)
logger.info(f"Processed training data saved to: {train_output_path}")
logger.info(
f"Processed training data shape: "
f"{data_prep.train_df_processed.shape}"
)
else:
logger.warning(
"Processed training DataFrame not found in DataPreparation object"
)
# Save processed test data
if (
hasattr(data_prep, "test_df_processed")
and data_prep.test_df_processed is not None
):
test_output_path = os.path.join(PROCESSED_DATA_DIR, "test.csv")
data_prep.test_df_processed.to_csv(test_output_path, index=False)
logger.info(f"Processed test data saved to: {test_output_path}")
logger.info(
f"Processed test data shape: {data_prep.test_df_processed.shape}"
)
else:
logger.warning(
"Processed test DataFrame not found in DataPreparation object"
)
# ====================================================================
# STEP 6: Validation and Summary
# ====================================================================
logger.info("Step 6: Validation and summary...")
# Get encoder information
num_classes = data_prep.get_num_classes()
logger.info("Label encoder information:")
for col, count in num_classes.items():
logger.info(f" {col}: {count} classes")
if hasattr(data_prep.label_encoders[col], "classes_"):
classes = list(data_prep.label_encoders[col].classes_)
logger.info(f" Classes: {classes}")
# Log feature dimensions
feature_dim = data_prep.feature_extractor.get_feature_dim()
logger.info(f"Feature dimension: {feature_dim}")
# Log dataset sizes
logger.info("Dataset summary:")
logger.info(f" Training samples: {len(train_dataloader.dataset)}")
val_samples = len(val_dataloader.dataset) if val_dataloader else 0
logger.info(f" Validation samples: {val_samples}")
test_samples = len(test_dataloader.dataset) if test_dataloader else 0
logger.info(f" Test samples: {test_samples}")
logger.info("=== Data Processing Pipeline Completed Successfully ===")
# ====================================================================
# STEP 7: Register Data Assets in Azure ML (if requested)
# ====================================================================
if args.register_data_assets:
logger.info("Step 7: Registering processed data as assets in Azure ML...")
try:
register_processed_data_assets_from_paths(
train_csv_path=os.path.join(PROCESSED_DATA_DIR, "train.csv"),
test_csv_path=os.path.join(PROCESSED_DATA_DIR, "test.csv"),
)
logger.info("Data asset registration process completed.")
except ImportError:
logger.error(
"Could not import Azure registration function. "
"Make sure azure-ai-ml is installed."
)
except Exception as e:
logger.error(f"Failed to register data assets: {e}")
except Exception as e:
logger.error(f"Data processing pipeline failed: {str(e)}")
logger.error("Full error traceback:", exc_info=True)
raise
if __name__ == "__main__":
main()