emotion_clf_pipeline package
Submodules
emotion_clf_pipeline.api module
Emotion Classification API.
A RESTful API service that analyzes YouTube video content for emotional sentiment. The service transcribes video audio, processes the text through an emotion classification pipeline, and returns structured emotion predictions with timestamps.
- Key Features:
YouTube video transcription using AssemblyAI
Multi-dimensional emotion analysis (emotion, sub-emotion, intensity)
Time-stamped transcript segmentation
CORS-enabled for web frontend integration
Feedback collection for training data improvement
Comprehensive monitoring with Prometheus metrics
- class emotion_clf_pipeline.api.FeedbackItem(*args, **kwargs)[source]
Bases:
BaseModel
Represents a single corrected emotion prediction for training data.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class emotion_clf_pipeline.api.FeedbackRequest(*args, **kwargs)[source]
Bases:
BaseModel
Request payload for submitting emotion classification feedback.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- feedbackData: List[FeedbackItem]
- class emotion_clf_pipeline.api.FeedbackResponse(*args, **kwargs)[source]
Bases:
BaseModel
Response for feedback submission.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class emotion_clf_pipeline.api.PredictionRequest(*args, **kwargs)[source]
Bases:
BaseModel
Request payload for emotion prediction endpoint.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class emotion_clf_pipeline.api.PredictionResponse(*args, **kwargs)[source]
Bases:
BaseModel
Complete emotion analysis response for a YouTube video.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- transcript: List[TranscriptItem]
- class emotion_clf_pipeline.api.TranscriptItem(*args, **kwargs)[source]
Bases:
BaseModel
Represents a single analyzed segment from video transcript.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- emotion_clf_pipeline.api.convert_azure_result_to_api_format(azure_result)[source]
Convert Azure prediction result format to the expected API format.
- emotion_clf_pipeline.api.create_baseline_stats_from_training_data()[source]
Create baseline statistics file for drift detection from training data.
This creates the baseline_stats.pkl file that the monitoring system expects for drift detection to work properly.
- emotion_clf_pipeline.api.create_feedback_csv(feedback_data)[source]
Create CSV content from feedback data.
- Parameters:
feedback_data (List[FeedbackItem])
- Return type:
- emotion_clf_pipeline.api.format_time_seconds(time_input)[source]
Convert various time formats to HH:MM:SS format.
- Parameters:
time_input – Time in seconds (float/int), or already formatted string
- Returns:
MM:SS format
- Return type:
Formatted time string in HH
- async emotion_clf_pipeline.api.get_live_monitoring_summary()
Get comprehensive live monitoring summary with real-time calculations
- emotion_clf_pipeline.api.get_metrics()
Local monitoring metrics endpoint.
Exposes comprehensive metrics summary for monitoring the API’s performance and usage. Returns JSON format for easy consumption by monitoring tools.
- async emotion_clf_pipeline.api.get_monitoring_file(file_name)
Serve monitoring files for the frontend dashboard with enhanced data processing
- Parameters:
file_name (str)
- async emotion_clf_pipeline.api.get_monitoring_trends()
Get trend analysis and predictions for monitoring data
- emotion_clf_pipeline.api.get_next_training_filename()[source]
Generate the next available training data filename by checking existing files in the local data/raw/train directory.
- Return type:
- emotion_clf_pipeline.api.handle_prediction(request)
Analyze YouTube video content for emotional sentiment. Supports both local (on-premise) and Azure (cloud) prediction methods.
- Parameters:
request (PredictionRequest)
- Return type:
- emotion_clf_pipeline.api.handle_refresh()
Triggers a manual refresh of the baseline model from Azure ML.
This endpoint allows for a zero-downtime model update by pulling the latest model tagged as ‘emotion-clf-baseline’ from the registry and loading it into the running API instance.
TODO: Secure this endpoint.
- emotion_clf_pipeline.api.health_check()
Docker container health check endpoint.
Returns 200 OK when the API is ready to serve requests. Used by Docker Compose healthcheck configuration.
- emotion_clf_pipeline.api.load_training_metrics_to_monitoring()[source]
Load training metrics from saved results and update monitoring system.
This ensures model performance metrics are available in Prometheus even after API restarts.
- emotion_clf_pipeline.api.read_root()
API health check and information endpoint.
- emotion_clf_pipeline.api.save_feedback(request)
Save user feedback on emotion predictions as training data.
- Parameters:
request (FeedbackRequest)
- Return type:
- emotion_clf_pipeline.api.save_feedback_to_azure(filename, csv_content)[source]
Save feedback CSV as a new version of the emotion-raw-train Azure ML data asset. Creates a new version as URI_FOLDER to match the existing data asset type.
- async emotion_clf_pipeline.api.startup_event()
On application startup, sync the latest baseline model from Azure ML. This ensures the API is always using the production-ready model.
emotion_clf_pipeline.cli module
Enhanced Command-Line Interface for the Emotion Classification Pipeline.
This script provides a unified CLI for both local and Azure ML execution of: - Data preprocessing pipeline - Model training pipeline - Prediction from YouTube URLs - Pipeline status monitoring
Supports seamless switching between local and Azure ML execution modes while maintaining backward compatibility with existing usage patterns.
- emotion_clf_pipeline.cli.add_endpoint_cleanup_args(parser)[source]
Add arguments for endpoint cleanup command.
- emotion_clf_pipeline.cli.add_endpoint_deploy_args(parser)[source]
Add arguments for Azure ML endpoint deployment command.
- emotion_clf_pipeline.cli.add_endpoint_details_args(parser)[source]
Add arguments for getting endpoint details command.
- emotion_clf_pipeline.cli.add_endpoint_test_args(parser)[source]
Add arguments for endpoint testing command.
- emotion_clf_pipeline.cli.add_pipeline_args(parser)[source]
Add arguments for the complete training pipeline.
- emotion_clf_pipeline.cli.add_schedule_create_args(parser)[source]
Add arguments for creating pipeline schedules.
- emotion_clf_pipeline.cli.add_schedule_pipeline_args(parser)[source]
Add pipeline-specific arguments for scheduling (avoiding conflicts).
- emotion_clf_pipeline.cli.add_upload_model_args(parser)[source]
Add arguments for model upload command.
- emotion_clf_pipeline.cli.cmd_endpoint_cleanup(args)[source]
Delete the Azure ML endpoint and all associated deployments.
- emotion_clf_pipeline.cli.cmd_endpoint_deploy(args)[source]
Deploy model to Azure ML Kubernetes endpoint.
- emotion_clf_pipeline.cli.cmd_endpoint_details(args)[source]
Get detailed information about the Azure ML endpoint.
- emotion_clf_pipeline.cli.cmd_endpoint_test(args)[source]
Test the deployed Azure ML endpoint with sample data.
- emotion_clf_pipeline.cli.cmd_pipeline(args)[source]
Execute the appropriate pipeline based on mode.
- emotion_clf_pipeline.cli.cmd_predict(args)[source]
Handle predict command with support for both local and Azure inference.
- emotion_clf_pipeline.cli.cmd_schedule_setup_defaults(args)[source]
Handle setup default schedules command.
- emotion_clf_pipeline.cli.cmd_upload_model(args)[source]
Upload model artifacts to Azure ML with enhanced packaging.
- emotion_clf_pipeline.cli.display_prediction_summary(result)[source]
Display a summary of the prediction results.
- Parameters:
result (dict)
- emotion_clf_pipeline.cli.get_azure_config()[source]
Get Azure configuration from environment variables.
- emotion_clf_pipeline.cli.retry(tries=3, delay=5, backoff=2)[source]
A simple retry decorator for functions that might fail due to transient issues.
emotion_clf_pipeline.data module
- class emotion_clf_pipeline.data.DataPreparation(output_columns, tokenizer, max_length=128, batch_size=16, feature_config=None, encoders_save_dir=None, encoders_load_dir=None)[source]
Bases:
object
A class to handle data preparation for emotion classification tasks.
This class handles: - Label encoding for target variables - Dataset creation - Dataloader setup
- Parameters:
output_columns (list) – List of output column names to encode
model_name (str) – Name of the pretrained model to use for tokenization
max_length (int) – Maximum sequence length for tokenization
batch_size (int) – Batch size for dataloaders
feature_config (dict, optional) – Configuration for feature extraction
- __init__(output_columns, tokenizer, max_length=128, batch_size=16, feature_config=None, encoders_save_dir=None, encoders_load_dir=None)[source]
- prepare_data(train_df, test_df=None, validation_split=0.2, apply_augmentation=False, balance_strategy='equal', samples_per_class=None, augmentation_ratio=2)[source]
Prepare data for training emotion classification models.
- Parameters:
train_df (pd.DataFrame) – Training dataframe
test_df (pd.DataFrame, optional) – Test dataframe. Defaults to None.
validation_split (float, optional) – Fraction of training data to use
0.2. (for validation. Defaults to)
apply_augmentation (bool, optional) – Whether to apply data
False. (augmentation. Defaults to)
balance_strategy (str, optional) – Strategy for balancing if
Options (augmentation is applied.) – ‘equal’, ‘majority’, ‘target’.
'equal'. (Defaults to)
samples_per_class (int, optional) – Number of samples per class for
None. (balancing. Defaults to)
augmentation_ratio (int, optional) – Maximum ratio of augmented to
2. (original samples. Defaults to)
- Returns:
(train_dataset, val_dataset, test_dataset, train_dataloader, val_dataloader, test_dataloader, class_weights_tensor)
- Return type:
- class emotion_clf_pipeline.data.DatasetLoader[source]
Bases:
object
A class to handle loading and preprocessing of emotion classification datasets.
This class handles: - Loading training and test data from CSV files - Cleaning and preprocessing the data - Mapping emotions to standardized categories - Visualizing data distributions
- train_df
Processed training data
- Type:
pd.DataFrame
- test_df
Processed test data
- Type:
pd.DataFrame
- load_test_data(test_file='./../../data/test_data-0001.csv')[source]
Load and preprocess test data from a CSV file.
- Parameters:
test_file (str) – Path to the test data CSV file
- Returns:
Processed test data
- Return type:
pd.DataFrame
- class emotion_clf_pipeline.data.EmotionDataset(*args, **kwargs)[source]
Bases:
Dataset
Custom Dataset for emotion classification.
- __init__(texts, tokenizer, features, labels=None, feature_extractor=None, max_length=128, output_tasks=None)[source]
Initialize the dataset.
- Parameters:
texts (list) – List of text samples
tokenizer – BERT tokenizer
features (np.ndarray) – Pre-extracted features
labels (list, optional) – List of label tuples (emotion, sub_emotion,
prediction. (intensity). None for)
feature_extractor (FeatureExtractor, optional) – Feature extractor
pre-computed. (instance. Not strictly needed if features are)
max_length (int) – Maximum sequence length for BERT
output_tasks (list, optional) – List of tasks to output. Used only if
provided. (labels are)
- emotion_clf_pipeline.data.log_class_distributions(df, output_tasks, df_name)[source]
Logs the class distribution for specified tasks in a dataframe.
- Parameters:
df (pandas.DataFrame)
df_name (str)
emotion_clf_pipeline.model module
Emotion classification model components.
Provides multi-task DEBERTA-based emotion classification with sub-emotion mapping and intensity prediction. Supports both local and Azure ML model synchronization.
- class emotion_clf_pipeline.model.CustomPredictor(model, tokenizer, device, encoders_dir='models/encoders', feature_config=None)[source]
Bases:
object
Multi-task emotion prediction engine.
Handles emotion classification inference by combining the trained model with feature engineering and post-processing. Maps sub-emotions to main emotions for consistent predictions.
- __init__(model, tokenizer, device, encoders_dir='models/encoders', feature_config=None)[source]
Initialize emotion predictor with model and supporting components.
- post_process(df)[source]
Refine predictions by aligning sub-emotions with main emotions.
Uses probability distributions to select sub-emotions that are consistent with predicted main emotions, improving classification coherence.
- Parameters:
df (pd.DataFrame) – Predictions with sub_emotion_logits column
- Returns:
Refined predictions with emotion_pred_post_processed
- Return type:
pd.DataFrame
- class emotion_clf_pipeline.model.DEBERTAClassifier(*args, **kwargs)[source]
Bases:
Module
Multi-task DEBERTA-based emotion classifier.
Performs simultaneous classification for: - Main emotions (7 categories) - Sub-emotions (28 categories) - Emotion intensity (3 levels)
Combines DEBERTA embeddings with engineered features through projection layers.
- __init__(model_name, feature_dim, num_classes, hidden_dim=256, dropout=0.1)[source]
Initialize the multi-task emotion classifier.
- Parameters:
model_name (str) – Pretrained DEBERTA model identifier
feature_dim (int) – Dimension of engineered features
num_classes (dict) – Class counts for each task (emotion, sub_emotion, intensity)
hidden_dim (int) – Hidden layer dimension. Defaults to 256.
dropout (float) – Dropout probability. Defaults to 0.1.
- forward(input_ids, attention_mask, features)[source]
Compute multi-task emotion predictions.
- Parameters:
input_ids (torch.Tensor) – Tokenized input text
attention_mask (torch.Tensor) – Attention mask for input
features (torch.Tensor) – Engineered features
- Returns:
Logits for each classification task
- Return type:
- class emotion_clf_pipeline.model.EmotionPredictor[source]
Bases:
object
High-level interface for emotion classification.
Provides a simple API for predicting emotions from text with automatic model loading, Azure ML synchronization, and feature configuration. Handles single texts or batches transparently.
- ensure_best_baseline()[source]
Ensure we have the best available baseline model from Azure ML.
This is an alias for ensure_best_baseline_model() for backward compatibility. Checks Azure ML for models with better F1 scores than the current local baseline and downloads them if found.
- Returns:
True if a better model was downloaded and loaded, False otherwise
- Return type:
- ensure_best_baseline_model()[source]
Ensure we have the best available baseline model from Azure ML.
This method checks Azure ML for models with better F1 scores than the current local baseline and downloads them if found. It forces a reload of the prediction model to use the updated baseline.
- Returns:
True if a better model was downloaded and loaded, False otherwise
- Return type:
- predict(texts, feature_config=None, reload_model=False)[source]
Predict emotions for single text or batch of texts.
Automatically handles model loading, feature extraction, and result formatting. Returns structured predictions with emotion, sub-emotion, and intensity classifications.
- Parameters:
- Returns:
Prediction dict for single text, list for batch
- Return type:
- class emotion_clf_pipeline.model.ModelLoader(model_name='microsoft/deberta-v3-xsmall', device=None)[source]
Bases:
object
Handles DEBERTA model and tokenizer loading with device management.
Supports loading pretrained models, applying custom weights, and creating predictor instances. Provides automatic device selection (GPU/CPU).
- __init__(model_name='microsoft/deberta-v3-xsmall', device=None)[source]
Initialize model loader with tokenizer.
- Parameters:
model_name (str) – Pretrained model identifier. Defaults to ‘microsoft/deberta-v3-xsmall’.
device (torch.device, optional) – Target device. Auto-detects if None.
- create_predictor(model, encoders_dir='models/encoders', feature_config=None)[source]
Create predictor instance for emotion classification.
- Parameters:
- Returns:
Ready-to-use predictor instance
- Return type:
- ensure_best_baseline_model()[source]
Ensure we have the best available baseline model from Azure ML.
This method checks Azure ML for models with better F1 scores than the current local baseline and downloads them if found. It forces a reload of the prediction model to use the updated baseline.
- Returns:
True if a better model was downloaded and loaded, False otherwise
- Return type:
- load_baseline_model(weights_dir='models/weights', sync_azure=True)[source]
Load stable production model with optional Azure ML sync.
- load_dynamic_model(weights_dir='models/weights', sync_azure=True)[source]
Load latest trained model with optional Azure ML sync.
- load_model(feature_dim, num_classes, weights_path=None, hidden_dim=256, dropout=0.1)[source]
Create and optionally load pretrained model weights.
- Parameters:
- Returns:
Loaded model ready for inference or training
- Return type:
- Raises:
FileNotFoundError – If weights_path doesn’t exist
RuntimeError – If weight loading fails
emotion_clf_pipeline.predict module
A complete end-to-end pipeline for extracting and analyzing emotional content from YouTube videos. This module orchestrates the entire workflow from audio extraction to emotion classification, providing a streamlined interface for sentiment analysis research and applications.
The pipeline supports multiple transcription services with automatic fallback mechanisms to ensure robustness in production environments.
- Usage:
python predict.py “https://youtube.com/watch?v=…” –transcription_method whisper
- class emotion_clf_pipeline.predict.AzureEndpointPredictor(api_key, endpoint_url, encoders_dir='models/encoders')[source]
Bases:
object
A class to interact with an Azure endpoint for emotion classification. It handles API requests, decodes predictions, and post-processes sub-emotions.
- __init__(api_key, endpoint_url, encoders_dir='models/encoders')[source]
Initialize with API key, endpoint URL, and encoder directory. Automatically converts private network URLs to public NGROK URLs.
- decode_and_postprocess(raw_predictions)[source]
Decode raw predictions and post-process sub-emotion to ensure consistency.
- emotion_clf_pipeline.predict.extract_audio_transcript(video_url)[source]
Extract transcript using speech-to-text from stt.py.
- emotion_clf_pipeline.predict.extract_transcript(video_url)[source]
Extract transcript from YouTube video using subtitles (fallback to STT).
- emotion_clf_pipeline.predict.get_azure_config(endpoint_url=None, api_key=None, use_ngrok=None, server_ip=None)[source]
Get Azure endpoint configuration with fallback priorities.
Priority order: 1. Explicit parameters passed to function 2. Environment variables from .env file 3. Raise error if required values missing
- Parameters:
- Returns:
Dictionary with Azure configuration
- Raises:
ValueError – If required configuration is missing
- Return type:
- emotion_clf_pipeline.predict.get_video_title(youtube_url)[source]
Extract video title from YouTube URL for meaningful file naming.
Video titles serve as natural identifiers for organizing processing results and enable easy correlation between source content and analysis outputs.
- Parameters:
youtube_url (str) – Valid YouTube video URL
- Returns:
Video title or “Unknown Title” if extraction fails
- Return type:
Note
Gracefully handles network errors and invalid URLs to prevent pipeline interruption during batch processing scenarios.
- emotion_clf_pipeline.predict.predict_emotion(texts, feature_config=None, reload_model=False)[source]
Apply emotion classification to text using trained models.
This function serves as the core intelligence of the pipeline, transforming raw text into structured emotional insights. It supports both single text analysis and batch processing for efficiency.
- Parameters:
texts – Single text string or list of texts for analysis
feature_config – Optional configuration for feature extraction methods
reload_model – Force model reinitialization (useful for memory management)
- Returns:
- Emotion predictions with confidence scores.
Single dict for one text, list of dicts for multiple texts. Returns None if prediction fails.
- Return type:
- Performance:
Logs processing latency for performance monitoring and optimization.
- emotion_clf_pipeline.predict.predict_emotions_azure(video_url, endpoint_url=None, api_key=None, use_stt=False, chunk_size=200, use_ngrok=None, server_ip=None)[source]
Predict emotions using Azure ML endpoint with auto-loaded configuration.
- Parameters:
video_url (str) – URL or path to video/audio file
endpoint_url (str | None) – Azure ML endpoint URL (overrides .env if provided)
api_key (str | None) – Azure ML API key (overrides .env if provided)
use_stt (bool) – Whether to use speech-to-text for audio
chunk_size (int) – Text chunk size for processing
use_ngrok (bool | None) – Whether to use NGROK tunnel (overrides .env if provided)
server_ip (str | None) – Server IP for NGROK (overrides .env if provided)
- Returns:
Dictionary containing predictions and metadata
- Return type:
- emotion_clf_pipeline.predict.predict_emotions_local(video_url, model_path='models/weights/baseline_weights.pt', config_path='models/weights/model_config.json', use_stt=False, chunk_size=200)[source]
Predict emotions using local model inference.
- Parameters:
- Returns:
Dictionary containing predictions and metadata
- Return type:
- emotion_clf_pipeline.predict.process_text_chunks(text, model, feature_extractor, chunk_size=200, expected_feature_dim=121)[source]
Process text in chunks for local model inference.
- emotion_clf_pipeline.predict.process_youtube_url_and_predict(youtube_url, transcription_method)[source]
- Execute the complete emotion analysis pipeline for a YouTube video.
This is the main orchestration function that coordinates all pipeline stages:
Audio extraction from YouTube (with title metadata)
Speech-to-text transcription (with fallback mechanisms)
Emotion classification (with temporal alignment)
Results persistence (structured Excel output)
The function maintains data lineage throughout the process, ensuring that timestamps from transcription are preserved and aligned with emotion predictions for temporal analysis capabilities.
- Parameters:
- Returns:
- Structured emotion analysis results where each dictionary
contains temporal and emotional metadata: - start_time/end_time: Temporal boundaries of the segment - text: Transcribed speech content - emotion/sub_emotion: Classified emotional states - intensity: Emotional intensity measurement
- Return type:
Returns empty list if essential processing steps fail.
Note
Creates necessary output directories automatically. All intermediate and final results are persisted to disk for reproducibility and further analysis.
- emotion_clf_pipeline.predict.speech_to_text(transcription_method, audio_file, output_file)[source]
Convert audio to text using configurable transcription services.
Implements a robust transcription strategy with automatic fallback: - Primary: AssemblyAI (cloud-based, high accuracy) - Fallback: Whisper (local processing, privacy-preserving)
This dual-service approach ensures pipeline reliability even when external services are unavailable or API limits are reached.
- Parameters:
transcription_method – “assemblyAI” or “whisper” for primary service
audio_file – Path to input audio file
output_file – Path where transcript will be saved
- Raises:
ValueError – If transcription_method is not recognized
Note
AssemblyAI failures trigger automatic Whisper fallback. All transcription attempts are logged for debugging purposes.
- emotion_clf_pipeline.predict.time_str_to_seconds(time_str)[source]
Convert time strings to seconds for numerical operations.
Handles multiple time formats commonly found in transcription outputs: - HH:MM:SS or HH:MM:SS.mmm (hours, minutes, seconds with optional milliseconds) - MM:SS or MM:SS.mmm (minutes, seconds with optional milliseconds) - Numeric values (already in seconds)
This conversion is essential for temporal analysis and synchronization between audio timestamps and emotion predictions.
- Parameters:
time_str – Time in string format or numeric value
- Returns:
- Time converted to seconds, or 0.0 if parsing fails
Note:
Returns 0.0 for invalid inputs rather than raising exceptions to maintain pipeline robustness during batch processing.
- Return type:
emotion_clf_pipeline.stt module
- class emotion_clf_pipeline.stt.SpeechToTextTranscriber(api_key)[source]
Bases:
object
- Parameters:
api_key (str)
- __init__(api_key)[source]
Initialize the transcriber with an API key.
- Parameters:
api_key (str) – AssemblyAI API key
- process(audio_file, output_file='transcript.xlsx')[source]
Process an audio file and save the transcript.
- save_transcript(transcript, output_file)[source]
Save the transcript to a file (CSV/Excel) with sentences and timestamps.
- Parameters:
transcript (assemblyai.Transcript) – AssemblyAI transcript object
output_file (str) – Path to save the output file
- Return type:
None
- transcribe_audio(file_path, config=None)[source]
Transcribe the audio file using AssemblyAI.
- Parameters:
file_path (str) – Path to the audio file
config (assemblyai.TranscriptionConfig | None) – Optional transcription configuration
- Returns:
AssemblyAI transcript object
- Return type:
assemblyai.Transcript
- class emotion_clf_pipeline.stt.WhisperTranscriber(model_size='base', force_cpu=False)[source]
Bases:
object
- __init__(model_size='base', force_cpu=False)[source]
Initialize the WhisperTranscriber with a specific model size.
- extract_sentences(result)[source]
Extract sentences with timestamps from Whisper transcription result.
- static format_timestamp(seconds)[source]
Format time in seconds to HH:MM:SS format.
- Parameters:
seconds (float) – Time in seconds
- Returns:
MM:SS format
- Return type:
Formatted time string in HH
- process(audio_file, output_file='transcript.xlsx', language=None)[source]
Process the audio file and generate a transcript.
- emotion_clf_pipeline.stt.check_cuda_status()[source]
Check and print detailed CUDA status information. This function helps diagnose CUDA-related issues.
- Returns:
True if CUDA is available and properly configured
- Return type:
- emotion_clf_pipeline.stt.sanitize_filename(filename, max_length=200)[source]
Sanitize a filename to be safe for all operating systems.
Removes or replaces characters that are invalid on Windows, macOS, or Linux. Also handles edge cases like reserved names and excessive length.
- Parameters:
- Returns:
A sanitized filename safe for cross-platform use
- Return type:
Note
Ensures compatibility with Windows (most restrictive), macOS, and Linux filesystem naming conventions while preserving readability.
- emotion_clf_pipeline.stt.save_youtube_audio(url, destination)[source]
Download the audio from a YouTube video and save it as an MP3 file.
- emotion_clf_pipeline.stt.save_youtube_video(url, destination)[source]
Download a YouTube video and save it as an MP4 file.
This function downloads the highest quality progressive video stream available, falling back to adaptive streams if necessary. Progressive streams contain both video and audio in a single file.
- Parameters:
- Returns:
(video_file_path, title) - Path to saved video file and video title
- Return type:
- Raises:
Exception – If video download fails or no suitable streams are found
Note
Prioritizes progressive MP4 streams for best compatibility. Falls back to highest resolution adaptive stream if progressive unavailable.
emotion_clf_pipeline.train module
DeBERTa-based Multi-Task Emotion Classification Trainer.
This module implements a production-ready training framework for emotion classification models supporting multiple prediction tasks: emotion, sub-emotion, and intensity levels.
Key Features: - Multi-task learning with weighted loss functions - Automatic model checkpointing and validation-based selection - Azure ML integration for model versioning and deployment - Comprehensive evaluation metrics and visualization - Flexible feature engineering pipeline integration
The trainer handles end-to-end model lifecycle from training through evaluation and deployment, with built-in support for class imbalance and feature fusion.
- class emotion_clf_pipeline.train.AzureMLLogger[source]
Bases:
object
Comprehensive logging class for Azure ML integration.
Handles both MLflow and Azure ML native logging to ensure metrics and artifacts appear correctly in Azure ML job overview.
- create_evaluation_plots(test_preds, test_labels, test_metrics, evaluation_dir, output_tasks)[source]
Create comprehensive evaluation plots for Azure ML visualization.
- Parameters:
test_preds – Dictionary of test predictions per task
test_labels – Dictionary of test labels per task
test_metrics – Dictionary of test metrics per task
evaluation_dir – Directory to save plots
output_tasks – List of output tasks
- log_artifact(local_path, artifact_path=None)[source]
Log artifacts (files/images) to both Azure ML and MLflow.
- log_evaluation_artifacts(evaluation_dir)[source]
Log all evaluation artifacts to Azure ML for visualization.
- Parameters:
evaluation_dir – Directory containing evaluation artifacts
- class emotion_clf_pipeline.train.AzureMLManager(weights_dir='models/weights')[source]
Bases:
object
Unified Azure ML manager for emotion classification pipeline.
Handles all Azure ML operations including: - Model weight synchronization (download/upload) - Model promotion and versioning - Status reporting and configuration validation - Backup and recovery operations
- __init__(weights_dir='models/weights')[source]
Initialize Azure ML manager.
- Parameters:
weights_dir – Directory path for local model weights storage
- create_backup(timestamp=None)[source]
Create timestamped backup of existing model weights.
- Parameters:
timestamp – Optional timestamp string, defaults to current time
- download_models(dry_run=False)[source]
Download models from Azure ML if they don’t exist locally.
- Parameters:
dry_run – If True, only show what would be downloaded
- Returns:
(baseline_downloaded, dynamic_downloaded)
- Return type:
- get_status_info()[source]
Get comprehensive status information.
- Returns:
Combined configuration and model status information
- Return type:
- handle_post_training_sync(f1_score, auto_upload=False, auto_promote_threshold=0.85)[source]
Handle sync operations after training completion.
- Parameters:
f1_score – F1 score from training
auto_upload – Whether to automatically upload dynamic model
auto_promote_threshold – F1 threshold for auto-promotion
- Returns:
Results of sync operations
- Return type:
- print_status_report(save_to_file=None)[source]
Generate and display comprehensive status report.
- Parameters:
save_to_file – Optional file path to save status as JSON
- promote_dynamic_to_baseline(dry_run=False)[source]
Promote dynamic model to baseline (locally and in Azure ML).
- Parameters:
dry_run – If True, only show what would be promoted
- Returns:
True if promotion successful, False otherwise
- Return type:
- class emotion_clf_pipeline.train.CustomTrainer(model, train_dataloader, val_dataloader, test_dataloader, device, test_set_df, class_weights_tensor, encoders_dir, output_tasks=None, learning_rate=2e-05, weight_decay=0.01, epochs=1, feature_config=None)[source]
Bases:
object
Production-ready trainer for multi-task emotion classification using DeBERTa.
Manages the complete training lifecycle including data loading, model training, validation, checkpointing, and evaluation. Supports flexible task configuration and automatic model promotion based on performance thresholds.
Key Capabilities: - Multi-task learning with weighted loss aggregation - Automatic best model selection via validation metrics - Feature engineering pipeline integration - Azure ML model versioning and deployment - Class imbalance handling through weighted loss functions
Thread Safety: Not thread-safe. Use separate instances for concurrent training.
- __init__(model, train_dataloader, val_dataloader, test_dataloader, device, test_set_df, class_weights_tensor, encoders_dir, output_tasks=None, learning_rate=2e-05, weight_decay=0.01, epochs=1, feature_config=None)[source]
Initialize the emotion classification trainer.
Sets up training infrastructure, loads encoders, validates model dimensions, and configures feature engineering pipeline. Automatically determines feature dimensions from training data.
- Parameters:
model – DeBERTa classifier instance with multi-task heads
train_dataloader – PyTorch DataLoader for training data
val_dataloader – PyTorch DataLoader for validation data
test_dataloader – PyTorch DataLoader for test data
device – PyTorch device (cuda/cpu) for model execution
test_set_df – Pandas DataFrame containing original test data with text
class_weights_tensor – Tensor or dict of class weights for imbalanced data
encoders_dir – Directory path containing label encoder pickle files
output_tasks – List of pred tasks [‘emotion’, ‘sub_emotion’, ‘intensity’]
learning_rate – AdamW optimizer learning rate (default: 2e-5)
weight_decay – L2 regularization coefficient (default: 0.01)
epochs – Number of training epochs (default: 1)
feature_config – Dict specifying which engineered features to use
- Raises:
FileNotFoundError – If encoder files are missing from encoders_dir
ValueError – If model dimensions don’t match encoder classes
- Side Effects:
Loads and validates label encoders
Configures task-specific loss weights
Logs initialization status and warnings
- static calculate_metrics(preds, labels, task_name='')[source]
Compute comprehensive classification metrics for model evaluation.
Calculates accuracy, F1-score, precision, and recall using weighted averaging to handle class imbalance. Generates detailed classification report with per-class statistics.
- Parameters:
preds – Model predictions as numeric class indices
labels – Ground truth labels as numeric class indices
task_name – Descriptive name for logging context
- Returns:
- Metrics dictionary containing:
acc: Accuracy score (0-1)
f1: Weighted F1-score (0-1)
prec: Weighted precision (0-1)
rec: Weighted recall (0-1)
report: Detailed classification report string
- Return type:
Handles edge cases like empty datasets and length mismatches gracefully by returning zero metrics with appropriate warnings.
- evaluate(dataloader, criterion_dict, is_test=False)[source]
Evaluate model performance on validation or test data.
Runs inference on provided dataset without gradient computation, collecting predictions and computing loss for all active tasks.
- Parameters:
dataloader – PyTorch DataLoader containing evaluation data
criterion_dict – Task-specific loss functions for loss computation
is_test – Boolean flag for logging context (test vs validation)
- Returns:
- (avg_eval_loss, all_preds, all_labels) where:
avg_eval_loss: Mean loss across all evaluation batches
all_preds: Dict mapping task names to prediction lists
all_labels: Dict mapping task names to ground truth lists
- Return type:
- Side Effects:
Sets model to evaluation mode (disables dropout/batch norm)
Logs evaluation progress via tqdm progress bar
- evaluate_final_model(model_path, evaluation_output_dir)[source]
Perform comprehensive evaluation of a trained model on test data.
Loads a trained model from disk, runs inference on the test dataset, and generates detailed evaluation reports including per-sample predictions, accuracy metrics, and exported results for analysis.
- Parameters:
model_path – File path to saved model state dict (.pt file)
evaluation_output_dir – Directory for saving evaluation artifacts
- Returns:
- Comprehensive results with columns:
text: Original input text samples
true_{task}: Ground truth labels for each task
pred_{task}: Model predictions for each task
{task}_correct: Boolean correctness per task
all_correct: Boolean indicating all tasks correct (if multi-task)
- Return type:
pd.DataFrame
- Raises:
FileNotFoundError – If model file doesn’t exist at specified path
RuntimeError – If model loading or inference fails
- Side Effects:
Loads model weights and sets to evaluation mode
Creates evaluation output directory if it doesn’t exist
Saves detailed evaluation report as CSV file
Logs progress and any warnings encountered
- plot_evaluation_results(results_df, output_dir)[source]
Generate comprehensive plots for the evaluation results.
- Parameters:
results_df – DataFrame containing evaluation results
output_dir – Directory for saving plot artifacts
- Side Effects:
Creates plots for per-task accuracy, confusion matrix, and sample predictions
Saves plots as image files in the specified directory
- static print_metrics(metrics_dict, split, loss=None)[source]
Display formatted training metrics in a readable table format.
Renders metrics for all tasks in a visually appealing table with color-coded headers and consistent decimal formatting. Supports different contexts (train/validation/test) with appropriate styling.
- Parameters:
metrics_dict – Dict mapping task names to metric dictionaries
split – Context string (‘Train’, ‘Val’, ‘Test’) for header styling
loss – Optional loss value to display above metrics table
- Side Effects:
Prints colored headers and formatted tables to console
Uses tabulate library for professional table formatting
Applies context-appropriate terminal colors
- static promote_dynamic_to_baseline(weights_dir='models/weights')[source]
Promote current dynamic model to baseline status for production use.
Copies dynamic_weights.pt to baseline_weights.pt, effectively making the current best-performing model the new production baseline. This operation is typically performed after validating model performance meets promotion criteria.
- Parameters:
weights_dir (str) – Directory containing model weight files
- Returns:
True if promotion successful, False if dynamic model missing
- Return type:
- Side Effects:
Creates or overwrites baseline_weights.pt file
Logs promotion status and any errors encountered
- setup_training()[source]
Initialize training components for multi-task learning.
Configures loss functions, optimizer, and learning rate scheduler for all active prediction tasks. Sets up class-weighted losses for imbalanced datasets and linear warmup scheduling.
- Returns:
- (criterion_dict, optimizer, scheduler) where:
criterion_dict: Task-specific CrossEntropyLoss functions
optimizer: AdamW optimizer with L2 regularization
scheduler: Linear warmup learning rate scheduler
- Return type:
- Side Effects:
Moves class weights to appropriate device
Logs successful setup completion
- should_promote_to_baseline(dynamic_f1, baseline_f1, threshold=0.01)[source]
Determine whether dynamic model performance justifies baseline promotion.
Compares dynamic model F1 score against current baseline with a configurable improvement threshold to prevent frequent updates from marginal improvements. Implements a simple but effective promotion strategy based on statistical significance.
- Parameters:
dynamic_f1 – F1 score of the newly trained dynamic model
baseline_f1 – F1 score of the current production baseline model
threshold – Minimum improvement required for promotion (default: 0.01)
- Returns:
True if dynamic model should replace baseline, False otherwise
- Return type:
Note
Uses emotion task F1 as the primary promotion criterion. In multi-task scenarios, consider weighted combinations of task performances.
- train_and_evaluate(trained_model_output_dir, metrics_output_file, weights_dir_base='models/weights')[source]
Execute complete training pipeline with validation-based model selection.
Orchestrates the full training workflow including epoch iteration, validation evaluation, best model tracking, and artifact persistence. Integrates with MLflow for experiment tracking and Azure ML for model deployment.
- Parameters:
trained_model_output_dir – Directory path for saving the best model
metrics_output_file – JSON file path for training metrics storage
weights_dir_base – Base directory for temporary model checkpoints
- Returns:
Best validation F1 scores for each task from optimal epoch
- Return type:
- Side Effects:
Creates temporary directories for model checkpoints
Logs training progress and metrics to MLflow
Saves model configuration and state dict files
Attempts Azure ML model upload with auto-promotion
Cleans up temporary checkpoint files after completion
- train_epoch(criterion_dict, optimizer, scheduler)[source]
Execute one complete training epoch across all batches.
Performs forward pass, loss computation, backpropagation, and optimizer updates for all configured tasks. Collects predictions and ground truth labels for comprehensive metric calculation.
- Parameters:
criterion_dict – Task-specific loss functions (from setup_training)
optimizer – AdamW optimizer instance
scheduler – Learning rate scheduler instance
- Returns:
- (avg_train_loss, train_metrics_epoch) where:
avg_train_loss: Mean loss across all batches
train_metrics_epoch: Dict of metrics per task
- Return type:
- Side Effects:
Updates model parameters via backpropagation
Advances learning rate scheduler
Logs training progress via tqdm progress bar