emotion_clf_pipeline package

Submodules

emotion_clf_pipeline.api module

Emotion Classification API.

A RESTful API service that analyzes YouTube video content for emotional sentiment. The service transcribes video audio, processes the text through an emotion classification pipeline, and returns structured emotion predictions with timestamps.

Key Features:
  • YouTube video transcription using AssemblyAI

  • Multi-dimensional emotion analysis (emotion, sub-emotion, intensity)

  • Time-stamped transcript segmentation

  • CORS-enabled for web frontend integration

  • Feedback collection for training data improvement

  • Comprehensive monitoring with Prometheus metrics

class emotion_clf_pipeline.api.FeedbackItem(*args, **kwargs)[source]

Bases: BaseModel

Represents a single corrected emotion prediction for training data.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

emotion: str
end_time: str
intensity: str
start_time: str
sub_emotion: str
text: str
class emotion_clf_pipeline.api.FeedbackRequest(*args, **kwargs)[source]

Bases: BaseModel

Request payload for submitting emotion classification feedback.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

feedbackData: List[FeedbackItem]
videoTitle: str
class emotion_clf_pipeline.api.FeedbackResponse(*args, **kwargs)[source]

Bases: BaseModel

Response for feedback submission.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

filename: str
message: str
record_count: int
success: bool
class emotion_clf_pipeline.api.PredictionRequest(*args, **kwargs)[source]

Bases: BaseModel

Request payload for emotion prediction endpoint.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

method: str = 'local'
url: str
class emotion_clf_pipeline.api.PredictionResponse(*args, **kwargs)[source]

Bases: BaseModel

Complete emotion analysis response for a YouTube video.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

title: str
transcript: List[TranscriptItem]
videoId: str
class emotion_clf_pipeline.api.TranscriptItem(*args, **kwargs)[source]

Bases: BaseModel

Represents a single analyzed segment from video transcript.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

emotion: str
end_time: str
intensity: str
sentence: str
start_time: str
sub_emotion: str
emotion_clf_pipeline.api.convert_azure_result_to_api_format(azure_result)[source]

Convert Azure prediction result format to the expected API format.

Parameters:

azure_result (Dict[str, Any]) – Result from predict_emotions_azure function

Returns:

List of predictions in the expected API format

Return type:

List[Dict[str, Any]]

emotion_clf_pipeline.api.create_baseline_stats_from_training_data()[source]

Create baseline statistics file for drift detection from training data.

This creates the baseline_stats.pkl file that the monitoring system expects for drift detection to work properly.

emotion_clf_pipeline.api.create_feedback_csv(feedback_data)[source]

Create CSV content from feedback data.

Parameters:

feedback_data (List[FeedbackItem])

Return type:

str

emotion_clf_pipeline.api.format_time_seconds(time_input)[source]

Convert various time formats to HH:MM:SS format.

Parameters:

time_input – Time in seconds (float/int), or already formatted string

Returns:

MM:SS format

Return type:

Formatted time string in HH

async emotion_clf_pipeline.api.get_live_monitoring_summary()

Get comprehensive live monitoring summary with real-time calculations

emotion_clf_pipeline.api.get_metrics()

Local monitoring metrics endpoint.

Exposes comprehensive metrics summary for monitoring the API’s performance and usage. Returns JSON format for easy consumption by monitoring tools.

async emotion_clf_pipeline.api.get_monitoring_file(file_name)

Serve monitoring files for the frontend dashboard with enhanced data processing

Parameters:

file_name (str)

Get trend analysis and predictions for monitoring data

emotion_clf_pipeline.api.get_next_training_filename()[source]

Generate the next available training data filename by checking existing files in the local data/raw/train directory.

Return type:

str

emotion_clf_pipeline.api.handle_prediction(request)

Analyze YouTube video content for emotional sentiment. Supports both local (on-premise) and Azure (cloud) prediction methods.

Parameters:

request (PredictionRequest)

Return type:

PredictionResponse

emotion_clf_pipeline.api.handle_refresh()

Triggers a manual refresh of the baseline model from Azure ML.

This endpoint allows for a zero-downtime model update by pulling the latest model tagged as ‘emotion-clf-baseline’ from the registry and loading it into the running API instance.

TODO: Secure this endpoint.

Return type:

Dict[str, Any]

emotion_clf_pipeline.api.health_check()

Docker container health check endpoint.

Returns 200 OK when the API is ready to serve requests. Used by Docker Compose healthcheck configuration.

Return type:

Dict[str, str]

emotion_clf_pipeline.api.load_training_metrics_to_monitoring()[source]

Load training metrics from saved results and update monitoring system.

This ensures model performance metrics are available in Prometheus even after API restarts.

emotion_clf_pipeline.api.read_root()

API health check and information endpoint.

Return type:

Dict[str, str]

emotion_clf_pipeline.api.save_feedback(request)

Save user feedback on emotion predictions as training data.

Parameters:

request (FeedbackRequest)

Return type:

FeedbackResponse

emotion_clf_pipeline.api.save_feedback_to_azure(filename, csv_content)[source]

Save feedback CSV as a new version of the emotion-raw-train Azure ML data asset. Creates a new version as URI_FOLDER to match the existing data asset type.

Parameters:
  • filename (str)

  • csv_content (str)

Return type:

bool

async emotion_clf_pipeline.api.startup_event()

On application startup, sync the latest baseline model from Azure ML. This ensures the API is always using the production-ready model.

emotion_clf_pipeline.cli module

Enhanced Command-Line Interface for the Emotion Classification Pipeline.

This script provides a unified CLI for both local and Azure ML execution of: - Data preprocessing pipeline - Model training pipeline - Prediction from YouTube URLs - Pipeline status monitoring

Supports seamless switching between local and Azure ML execution modes while maintaining backward compatibility with existing usage patterns.

emotion_clf_pipeline.cli.add_endpoint_cleanup_args(parser)[source]

Add arguments for endpoint cleanup command.

emotion_clf_pipeline.cli.add_endpoint_deploy_args(parser)[source]

Add arguments for Azure ML endpoint deployment command.

emotion_clf_pipeline.cli.add_endpoint_details_args(parser)[source]

Add arguments for getting endpoint details command.

emotion_clf_pipeline.cli.add_endpoint_test_args(parser)[source]

Add arguments for endpoint testing command.

emotion_clf_pipeline.cli.add_pipeline_args(parser)[source]

Add arguments for the complete training pipeline.

emotion_clf_pipeline.cli.add_predict_args(parser)[source]

Add prediction-specific arguments.

emotion_clf_pipeline.cli.add_preprocess_args(parser)[source]

Add preprocessing-specific arguments.

emotion_clf_pipeline.cli.add_schedule_create_args(parser)[source]

Add arguments for creating pipeline schedules.

emotion_clf_pipeline.cli.add_schedule_pipeline_args(parser)[source]

Add pipeline-specific arguments for scheduling (avoiding conflicts).

emotion_clf_pipeline.cli.add_train_args(parser)[source]

Add training-specific arguments.

emotion_clf_pipeline.cli.add_upload_model_args(parser)[source]

Add arguments for model upload command.

emotion_clf_pipeline.cli.cmd_data(args)[source]

Handle data preprocessing commands.

emotion_clf_pipeline.cli.cmd_endpoint(args)[source]

Handle endpoint management commands.

emotion_clf_pipeline.cli.cmd_endpoint_cleanup(args)[source]

Delete the Azure ML endpoint and all associated deployments.

emotion_clf_pipeline.cli.cmd_endpoint_deploy(args)[source]

Deploy model to Azure ML Kubernetes endpoint.

emotion_clf_pipeline.cli.cmd_endpoint_details(args)[source]

Get detailed information about the Azure ML endpoint.

emotion_clf_pipeline.cli.cmd_endpoint_test(args)[source]

Test the deployed Azure ML endpoint with sample data.

emotion_clf_pipeline.cli.cmd_pipeline(args)[source]

Execute the appropriate pipeline based on mode.

emotion_clf_pipeline.cli.cmd_predict(args)[source]

Handle predict command with support for both local and Azure inference.

emotion_clf_pipeline.cli.cmd_preprocess(args)[source]

Handle preprocess command.

emotion_clf_pipeline.cli.cmd_schedule_create(args)[source]

Handle schedule create command.

emotion_clf_pipeline.cli.cmd_schedule_delete(args)[source]

Handle schedule delete command.

emotion_clf_pipeline.cli.cmd_schedule_details(args)[source]

Handle schedule details command.

emotion_clf_pipeline.cli.cmd_schedule_disable(args)[source]

Handle schedule disable command.

emotion_clf_pipeline.cli.cmd_schedule_enable(args)[source]

Handle schedule enable command.

emotion_clf_pipeline.cli.cmd_schedule_list(args)[source]

Handle schedule list command.

emotion_clf_pipeline.cli.cmd_schedule_setup_defaults(args)[source]

Handle setup default schedules command.

emotion_clf_pipeline.cli.cmd_status(args)[source]

Check the status of an Azure ML job.

emotion_clf_pipeline.cli.cmd_train(args)[source]

Handle train command.

emotion_clf_pipeline.cli.cmd_upload_model(args)[source]

Upload model artifacts to Azure ML with enhanced packaging.

emotion_clf_pipeline.cli.display_prediction_summary(result)[source]

Display a summary of the prediction results.

Parameters:

result (dict)

emotion_clf_pipeline.cli.get_azure_config()[source]

Get Azure configuration from environment variables.

emotion_clf_pipeline.cli.main()[source]

Main function to parse arguments and execute commands.

emotion_clf_pipeline.cli.retry(tries=3, delay=5, backoff=2)[source]

A simple retry decorator for functions that might fail due to transient issues.

Parameters:
  • tries (int) – The maximum number of attempts.

  • delay (int) – The initial delay between retries in seconds.

  • backoff (int) – The factor by which the delay should increase for each retry.

emotion_clf_pipeline.cli.run_pipeline_azure(*args, **kwargs)[source]
emotion_clf_pipeline.cli.run_pipeline_local(args)[source]

Run the complete pipeline locally (preprocess + train).

emotion_clf_pipeline.cli.run_preprocess_azure(args)[source]

Run data preprocessing on Azure ML.

emotion_clf_pipeline.cli.run_preprocess_local(args)[source]

Run data preprocessing locally.

emotion_clf_pipeline.cli.run_train_azure(args)[source]

Run model training on Azure ML.

emotion_clf_pipeline.cli.run_train_local(args)[source]

Run model training locally.

emotion_clf_pipeline.data module

class emotion_clf_pipeline.data.DataPreparation(output_columns, tokenizer, max_length=128, batch_size=16, feature_config=None, encoders_save_dir=None, encoders_load_dir=None)[source]

Bases: object

A class to handle data preparation for emotion classification tasks.

This class handles: - Label encoding for target variables - Dataset creation - Dataloader setup

Parameters:
  • output_columns (list) – List of output column names to encode

  • model_name (str) – Name of the pretrained model to use for tokenization

  • max_length (int) – Maximum sequence length for tokenization

  • batch_size (int) – Batch size for dataloaders

  • feature_config (dict, optional) – Configuration for feature extraction

__init__(output_columns, tokenizer, max_length=128, batch_size=16, feature_config=None, encoders_save_dir=None, encoders_load_dir=None)[source]
get_num_classes()[source]

Get the number of classes for each output column.

prepare_data(train_df, test_df=None, validation_split=0.2, apply_augmentation=False, balance_strategy='equal', samples_per_class=None, augmentation_ratio=2)[source]

Prepare data for training emotion classification models.

Parameters:
  • train_df (pd.DataFrame) – Training dataframe

  • test_df (pd.DataFrame, optional) – Test dataframe. Defaults to None.

  • validation_split (float, optional) – Fraction of training data to use

  • 0.2. (for validation. Defaults to)

  • apply_augmentation (bool, optional) – Whether to apply data

  • False. (augmentation. Defaults to)

  • balance_strategy (str, optional) – Strategy for balancing if

  • Options (augmentation is applied.) – ‘equal’, ‘majority’, ‘target’.

  • 'equal'. (Defaults to)

  • samples_per_class (int, optional) – Number of samples per class for

  • None. (balancing. Defaults to)

  • augmentation_ratio (int, optional) – Maximum ratio of augmented to

  • 2. (original samples. Defaults to)

Returns:

(train_dataset, val_dataset, test_dataset, train_dataloader, val_dataloader, test_dataloader, class_weights_tensor)

Return type:

tuple

class emotion_clf_pipeline.data.DatasetLoader[source]

Bases: object

A class to handle loading and preprocessing of emotion classification datasets.

This class handles: - Loading training and test data from CSV files - Cleaning and preprocessing the data - Mapping emotions to standardized categories - Visualizing data distributions

emotion_mapping

Dictionary mapping sub-emotions to standardized emotions

Type:

dict

train_df

Processed training data

Type:

pd.DataFrame

test_df

Processed test data

Type:

pd.DataFrame

__init__()[source]
load_test_data(test_file='./../../data/test_data-0001.csv')[source]

Load and preprocess test data from a CSV file.

Parameters:

test_file (str) – Path to the test data CSV file

Returns:

Processed test data

Return type:

pd.DataFrame

load_training_data(data_dir='./../../data/raw/all groups')[source]

Load and preprocess training data from multiple CSV files.

Parameters:

data_dir (str) – Directory containing training data CSV files

Returns:

Processed training data

Return type:

pd.DataFrame

plot_distributions()[source]

Plot distributions of emotions, sub-emotions, and intensities for both training and test sets.

class emotion_clf_pipeline.data.EmotionDataset(*args, **kwargs)[source]

Bases: Dataset

Custom Dataset for emotion classification.

__init__(texts, tokenizer, features, labels=None, feature_extractor=None, max_length=128, output_tasks=None)[source]

Initialize the dataset.

Parameters:
  • texts (list) – List of text samples

  • tokenizer – BERT tokenizer

  • features (np.ndarray) – Pre-extracted features

  • labels (list, optional) – List of label tuples (emotion, sub_emotion,

  • prediction. (intensity). None for)

  • feature_extractor (FeatureExtractor, optional) – Feature extractor

  • pre-computed. (instance. Not strictly needed if features are)

  • max_length (int) – Maximum sequence length for BERT

  • output_tasks (list, optional) – List of tasks to output. Used only if

  • provided. (labels are)

emotion_clf_pipeline.data.log_class_distributions(df, output_tasks, df_name)[source]

Logs the class distribution for specified tasks in a dataframe.

Parameters:
emotion_clf_pipeline.data.main()[source]

Main function to run the data processing pipeline.

emotion_clf_pipeline.data.parse_args()[source]

Parse command-line arguments.

emotion_clf_pipeline.model module

Emotion classification model components.

Provides multi-task DEBERTA-based emotion classification with sub-emotion mapping and intensity prediction. Supports both local and Azure ML model synchronization.

class emotion_clf_pipeline.model.CustomPredictor(model, tokenizer, device, encoders_dir='models/encoders', feature_config=None)[source]

Bases: object

Multi-task emotion prediction engine.

Handles emotion classification inference by combining the trained model with feature engineering and post-processing. Maps sub-emotions to main emotions for consistent predictions.

__init__(model, tokenizer, device, encoders_dir='models/encoders', feature_config=None)[source]

Initialize emotion predictor with model and supporting components.

Parameters:
  • model (nn.Module) – Trained emotion classification model

  • tokenizer – Tokenizer for text preprocessing

  • device (torch.device) – Target device for inference

  • encoders_dir (str) – Directory containing label encoder files

  • feature_config (dict, optional) – Feature extraction configuration

post_process(df)[source]

Refine predictions by aligning sub-emotions with main emotions.

Uses probability distributions to select sub-emotions that are consistent with predicted main emotions, improving classification coherence.

Parameters:

df (pd.DataFrame) – Predictions with sub_emotion_logits column

Returns:

Refined predictions with emotion_pred_post_processed

Return type:

pd.DataFrame

predict(texts, batch_size=16)[source]

Generate emotion predictions for text inputs.

Processes texts through feature extraction, model inference, and post-processing to produce final emotion classifications.

Parameters:
  • texts (list) – List of text strings to classify

  • batch_size (int) – Batch size for inference. Defaults to 16.

Returns:

Predictions with mapped emotions and confidence scores

Return type:

pd.DataFrame

class emotion_clf_pipeline.model.DEBERTAClassifier(*args, **kwargs)[source]

Bases: Module

Multi-task DEBERTA-based emotion classifier.

Performs simultaneous classification for: - Main emotions (7 categories) - Sub-emotions (28 categories) - Emotion intensity (3 levels)

Combines DEBERTA embeddings with engineered features through projection layers.

__init__(model_name, feature_dim, num_classes, hidden_dim=256, dropout=0.1)[source]

Initialize the multi-task emotion classifier.

Parameters:
  • model_name (str) – Pretrained DEBERTA model identifier

  • feature_dim (int) – Dimension of engineered features

  • num_classes (dict) – Class counts for each task (emotion, sub_emotion, intensity)

  • hidden_dim (int) – Hidden layer dimension. Defaults to 256.

  • dropout (float) – Dropout probability. Defaults to 0.1.

forward(input_ids, attention_mask, features)[source]

Compute multi-task emotion predictions.

Parameters:
  • input_ids (torch.Tensor) – Tokenized input text

  • attention_mask (torch.Tensor) – Attention mask for input

  • features (torch.Tensor) – Engineered features

Returns:

Logits for each classification task

Return type:

dict

class emotion_clf_pipeline.model.EmotionPredictor[source]

Bases: object

High-level interface for emotion classification.

Provides a simple API for predicting emotions from text with automatic model loading, Azure ML synchronization, and feature configuration. Handles single texts or batches transparently.

__init__()[source]

Initialize predictor with lazy model loading.

ensure_best_baseline()[source]

Ensure we have the best available baseline model from Azure ML.

This is an alias for ensure_best_baseline_model() for backward compatibility. Checks Azure ML for models with better F1 scores than the current local baseline and downloads them if found.

Returns:

True if a better model was downloaded and loaded, False otherwise

Return type:

bool

ensure_best_baseline_model()[source]

Ensure we have the best available baseline model from Azure ML.

This method checks Azure ML for models with better F1 scores than the current local baseline and downloads them if found. It forces a reload of the prediction model to use the updated baseline.

Returns:

True if a better model was downloaded and loaded, False otherwise

Return type:

bool

predict(texts, feature_config=None, reload_model=False)[source]

Predict emotions for single text or batch of texts.

Automatically handles model loading, feature extraction, and result formatting. Returns structured predictions with emotion, sub-emotion, and intensity classifications.

Parameters:
  • texts (str or list) – Text(s) to classify

  • feature_config (dict, optional) – Feature extraction settings. Defaults to tfidf=True, emolex=True, others=False.

  • reload_model (bool) – Force model reload. Defaults to False.

Returns:

Prediction dict for single text, list for batch

Return type:

dict or list

class emotion_clf_pipeline.model.ModelLoader(model_name='microsoft/deberta-v3-xsmall', device=None)[source]

Bases: object

Handles DEBERTA model and tokenizer loading with device management.

Supports loading pretrained models, applying custom weights, and creating predictor instances. Provides automatic device selection (GPU/CPU).

__init__(model_name='microsoft/deberta-v3-xsmall', device=None)[source]

Initialize model loader with tokenizer.

Parameters:
  • model_name (str) – Pretrained model identifier. Defaults to ‘microsoft/deberta-v3-xsmall’.

  • device (torch.device, optional) – Target device. Auto-detects if None.

create_predictor(model, encoders_dir='models/encoders', feature_config=None)[source]

Create predictor instance for emotion classification.

Parameters:
  • model (nn.Module) – Trained emotion classification model

  • encoders_dir (str) – Directory containing label encoder files

  • feature_config (dict, optional) – Feature extraction configuration

Returns:

Ready-to-use predictor instance

Return type:

CustomPredictor

ensure_best_baseline_model()[source]

Ensure we have the best available baseline model from Azure ML.

This method checks Azure ML for models with better F1 scores than the current local baseline and downloads them if found. It forces a reload of the prediction model to use the updated baseline.

Returns:

True if a better model was downloaded and loaded, False otherwise

Return type:

bool

load_baseline_model(weights_dir='models/weights', sync_azure=True)[source]

Load stable production model with optional Azure ML sync.

Parameters:
  • weights_dir (str) – Directory containing model weights

  • sync_azure (bool) – Whether to sync with Azure ML on startup

load_dynamic_model(weights_dir='models/weights', sync_azure=True)[source]

Load latest trained model with optional Azure ML sync.

Parameters:
  • weights_dir (str) – Directory containing model weights

  • sync_azure (bool) – Whether to sync with Azure ML on startup

load_model(feature_dim, num_classes, weights_path=None, hidden_dim=256, dropout=0.1)[source]

Create and optionally load pretrained model weights.

Parameters:
  • feature_dim (int) – Dimension of engineered features

  • num_classes (dict) – Class counts for each classification task

  • weights_path (str, optional) – Path to saved model weights

  • hidden_dim (int) – Hidden layer dimension. Defaults to 256.

  • dropout (float) – Dropout probability. Defaults to 0.1.

Returns:

Loaded model ready for inference or training

Return type:

DEBERTAClassifier

Raises:
promote_dynamic_to_baseline(weights_dir='models/weights', sync_azure=True)[source]

Copies dynamic weights to baseline location, optionally syncing with Azure ML.

Parameters:
  • weights_dir (str) – Directory containing model weights

  • sync_azure (bool) – Whether to sync with Azure ML on startup

emotion_clf_pipeline.predict module

A complete end-to-end pipeline for extracting and analyzing emotional content from YouTube videos. This module orchestrates the entire workflow from audio extraction to emotion classification, providing a streamlined interface for sentiment analysis research and applications.

The pipeline supports multiple transcription services with automatic fallback mechanisms to ensure robustness in production environments.

Usage:

python predict.py “https://youtube.com/watch?v=…” –transcription_method whisper

class emotion_clf_pipeline.predict.AzureEndpointPredictor(api_key, endpoint_url, encoders_dir='models/encoders')[source]

Bases: object

A class to interact with an Azure endpoint for emotion classification. It handles API requests, decodes predictions, and post-processes sub-emotions.

__init__(api_key, endpoint_url, encoders_dir='models/encoders')[source]

Initialize with API key, endpoint URL, and encoder directory. Automatically converts private network URLs to public NGROK URLs.

decode_and_postprocess(raw_predictions)[source]

Decode raw predictions and post-process sub-emotion to ensure consistency.

get_prediction(text)[source]

Send a request to the Azure endpoint and return the raw response.

predict(text)[source]

Full workflow: get prediction, decode, and post-process. Handles double-encoded JSON from the API.

Parameters:

text (str)

Return type:

dict

predict_batch(texts)[source]

Predict emotions for multiple texts (sequential calls).

Parameters:

texts (List[str]) – List of input texts

Returns:

List of prediction results

Return type:

List[Dict[str, Any]]

emotion_clf_pipeline.predict.extract_audio_transcript(video_url)[source]

Extract transcript using speech-to-text from stt.py.

Parameters:

video_url (str)

Return type:

Dict[str, Any]

emotion_clf_pipeline.predict.extract_transcript(video_url)[source]

Extract transcript from YouTube video using subtitles (fallback to STT).

Parameters:

video_url (str)

Return type:

Dict[str, Any]

emotion_clf_pipeline.predict.get_azure_config(endpoint_url=None, api_key=None, use_ngrok=None, server_ip=None)[source]

Get Azure endpoint configuration with fallback priorities.

Priority order: 1. Explicit parameters passed to function 2. Environment variables from .env file 3. Raise error if required values missing

Parameters:
  • endpoint_url (str | None) – Azure ML endpoint URL (override .env)

  • api_key (str | None) – Azure ML API key (override .env)

  • use_ngrok (bool | None) – Use NGROK tunnel (override .env)

  • server_ip (str | None) – Server IP for NGROK (override .env)

Returns:

Dictionary with Azure configuration

Raises:

ValueError – If required configuration is missing

Return type:

Dict[str, Any]

emotion_clf_pipeline.predict.get_video_title(youtube_url)[source]

Extract video title from YouTube URL for meaningful file naming.

Video titles serve as natural identifiers for organizing processing results and enable easy correlation between source content and analysis outputs.

Parameters:

youtube_url (str) – Valid YouTube video URL

Returns:

Video title or “Unknown Title” if extraction fails

Return type:

str

Note

Gracefully handles network errors and invalid URLs to prevent pipeline interruption during batch processing scenarios.

emotion_clf_pipeline.predict.predict_emotion(texts, feature_config=None, reload_model=False)[source]

Apply emotion classification to text using trained models.

This function serves as the core intelligence of the pipeline, transforming raw text into structured emotional insights. It supports both single text analysis and batch processing for efficiency.

Parameters:
  • texts – Single text string or list of texts for analysis

  • feature_config – Optional configuration for feature extraction methods

  • reload_model – Force model reinitialization (useful for memory management)

Returns:

Emotion predictions with confidence scores.

Single dict for one text, list of dicts for multiple texts. Returns None if prediction fails.

Return type:

dict or list

Performance:

Logs processing latency for performance monitoring and optimization.

emotion_clf_pipeline.predict.predict_emotions_azure(video_url, endpoint_url=None, api_key=None, use_stt=False, chunk_size=200, use_ngrok=None, server_ip=None)[source]

Predict emotions using Azure ML endpoint with auto-loaded configuration.

Parameters:
  • video_url (str) – URL or path to video/audio file

  • endpoint_url (str | None) – Azure ML endpoint URL (overrides .env if provided)

  • api_key (str | None) – Azure ML API key (overrides .env if provided)

  • use_stt (bool) – Whether to use speech-to-text for audio

  • chunk_size (int) – Text chunk size for processing

  • use_ngrok (bool | None) – Whether to use NGROK tunnel (overrides .env if provided)

  • server_ip (str | None) – Server IP for NGROK (overrides .env if provided)

Returns:

Dictionary containing predictions and metadata

Return type:

Dict[str, Any]

emotion_clf_pipeline.predict.predict_emotions_local(video_url, model_path='models/weights/baseline_weights.pt', config_path='models/weights/model_config.json', use_stt=False, chunk_size=200)[source]

Predict emotions using local model inference.

Parameters:
  • video_url (str) – URL or path to video/audio file

  • model_path (str) – Path to model weights

  • config_path (str) – Path to model configuration

  • use_stt (bool) – Whether to use speech-to-text for audio

  • chunk_size (int) – Text chunk size for processing

Returns:

Dictionary containing predictions and metadata

Return type:

Dict[str, Any]

emotion_clf_pipeline.predict.process_text_chunks(text, model, feature_extractor, chunk_size=200, expected_feature_dim=121)[source]

Process text in chunks for local model inference.

Parameters:
  • text (str) – Input text to process

  • model – Loaded model

  • feature_extractor (FeatureExtractor) – Feature extractor instance

  • chunk_size (int) – Size of text chunks

  • expected_feature_dim (int)

Returns:

List of predictions for each chunk

Return type:

List[Dict[str, Any]]

emotion_clf_pipeline.predict.process_youtube_url_and_predict(youtube_url, transcription_method)[source]
Execute the complete emotion analysis pipeline for a YouTube video.

This is the main orchestration function that coordinates all pipeline stages:

  1. Audio extraction from YouTube (with title metadata)

  2. Speech-to-text transcription (with fallback mechanisms)

  3. Emotion classification (with temporal alignment)

  4. Results persistence (structured Excel output)

The function maintains data lineage throughout the process, ensuring that timestamps from transcription are preserved and aligned with emotion predictions for temporal analysis capabilities.

Parameters:
  • youtube_url (str) – Valid YouTube video URL for processing

  • transcription_method (str) – “assemblyAI” or “whisper” for speech recognition

Returns:

Structured emotion analysis results where each dictionary

contains temporal and emotional metadata: - start_time/end_time: Temporal boundaries of the segment - text: Transcribed speech content - emotion/sub_emotion: Classified emotional states - intensity: Emotional intensity measurement

Return type:

list[dict]

Returns empty list if essential processing steps fail.

Note

Creates necessary output directories automatically. All intermediate and final results are persisted to disk for reproducibility and further analysis.

emotion_clf_pipeline.predict.speech_to_text(transcription_method, audio_file, output_file)[source]

Convert audio to text using configurable transcription services.

Implements a robust transcription strategy with automatic fallback: - Primary: AssemblyAI (cloud-based, high accuracy) - Fallback: Whisper (local processing, privacy-preserving)

This dual-service approach ensures pipeline reliability even when external services are unavailable or API limits are reached.

Parameters:
  • transcription_method – “assemblyAI” or “whisper” for primary service

  • audio_file – Path to input audio file

  • output_file – Path where transcript will be saved

Raises:

ValueError – If transcription_method is not recognized

Note

AssemblyAI failures trigger automatic Whisper fallback. All transcription attempts are logged for debugging purposes.

emotion_clf_pipeline.predict.time_str_to_seconds(time_str)[source]

Convert time strings to seconds for numerical operations.

Handles multiple time formats commonly found in transcription outputs: - HH:MM:SS or HH:MM:SS.mmm (hours, minutes, seconds with optional milliseconds) - MM:SS or MM:SS.mmm (minutes, seconds with optional milliseconds) - Numeric values (already in seconds)

This conversion is essential for temporal analysis and synchronization between audio timestamps and emotion predictions.

Parameters:

time_str – Time in string format or numeric value

Returns:

Time converted to seconds, or 0.0 if parsing fails

Note:

Returns 0.0 for invalid inputs rather than raising exceptions to maintain pipeline robustness during batch processing.

Return type:

float

emotion_clf_pipeline.predict.transcribe_youtube_url(video_url, use_stt=False)[source]

Main transcription function that chooses between subtitle and STT methods.

Parameters:
  • video_url (str) – YouTube video URL

  • use_stt (bool) – If True, force use of speech-to-text. If False, try subtitles first.

Returns:

Dictionary containing transcript data and metadata

Return type:

Dict[str, Any]

emotion_clf_pipeline.stt module

class emotion_clf_pipeline.stt.SpeechToTextTranscriber(api_key)[source]

Bases: object

Parameters:

api_key (str)

__init__(api_key)[source]

Initialize the transcriber with an API key.

Parameters:

api_key (str) – AssemblyAI API key

process(audio_file, output_file='transcript.xlsx')[source]

Process an audio file and save the transcript.

Parameters:
  • audio_file (str) – Path to the input audio file

  • output_file (str) – Path for the output transcript file

Return type:

None

save_transcript(transcript, output_file)[source]

Save the transcript to a file (CSV/Excel) with sentences and timestamps.

Parameters:
  • transcript (assemblyai.Transcript) – AssemblyAI transcript object

  • output_file (str) – Path to save the output file

Return type:

None

transcribe_audio(file_path, config=None)[source]

Transcribe the audio file using AssemblyAI.

Parameters:
  • file_path (str) – Path to the audio file

  • config (assemblyai.TranscriptionConfig | None) – Optional transcription configuration

Returns:

AssemblyAI transcript object

Return type:

assemblyai.Transcript

class emotion_clf_pipeline.stt.WhisperTranscriber(model_size='base', force_cpu=False)[source]

Bases: object

Parameters:
  • model_size (str)

  • force_cpu (bool)

__init__(model_size='base', force_cpu=False)[source]

Initialize the WhisperTranscriber with a specific model size.

Parameters:
  • model_size (str) – Size of the model to use (“tiny”, “base”, “small”, “medium”, “large”)

  • force_cpu (bool) – If True, CPU will be used even if CUDA is available

extract_sentences(result)[source]

Extract sentences with timestamps from Whisper transcription result.

Parameters:

result (Dict) – Whisper transcription result

Returns:

List of dictionaries containing sentences and their timestamps

Return type:

List[Dict]

static format_timestamp(seconds)[source]

Format time in seconds to HH:MM:SS format.

Parameters:

seconds (float) – Time in seconds

Returns:

MM:SS format

Return type:

Formatted time string in HH

process(audio_file, output_file='transcript.xlsx', language=None)[source]

Process the audio file and generate a transcript.

Parameters:
  • audio_file (str) – Path to the input audio file

  • output_file (str) – Path for the output transcript file

  • language (str | None) – Optional language code for transcription

Return type:

None

static save_transcript(transcript_data, output_file)[source]

Save the transcript to a file (CSV/Excel).

Parameters:
  • transcript_data (List[Dict]) – List of dictionaries containing transcription data

  • output_file (str) – Path to save the output file

Return type:

None

transcribe_audio(file_path, language=None)[source]

Transcribe the audio file using Whisper.

Parameters:
  • file_path (str) – Path to the audio file

  • language (str | None) – Optional language code (e.g., “en” for English)

Returns:

Dictionary containing transcription results

Return type:

Dict

emotion_clf_pipeline.stt.check_cuda_status()[source]

Check and print detailed CUDA status information. This function helps diagnose CUDA-related issues.

Returns:

True if CUDA is available and properly configured

Return type:

bool

emotion_clf_pipeline.stt.sanitize_filename(filename, max_length=200)[source]

Sanitize a filename to be safe for all operating systems.

Removes or replaces characters that are invalid on Windows, macOS, or Linux. Also handles edge cases like reserved names and excessive length.

Parameters:
  • filename (str) – The original filename string

  • max_length (int) – Maximum allowed filename length (default: 200)

Returns:

A sanitized filename safe for cross-platform use

Return type:

str

Note

Ensures compatibility with Windows (most restrictive), macOS, and Linux filesystem naming conventions while preserving readability.

emotion_clf_pipeline.stt.save_youtube_audio(url, destination)[source]

Download the audio from a YouTube video and save it as an MP3 file.

Parameters:
  • url (str) – The URL of the YouTube video.

  • destination (str) – The directory where the audio file should be saved.

Returns:

The path to the downloaded audio file.

Return type:

str

emotion_clf_pipeline.stt.save_youtube_video(url, destination)[source]

Download a YouTube video and save it as an MP4 file.

This function downloads the highest quality progressive video stream available, falling back to adaptive streams if necessary. Progressive streams contain both video and audio in a single file.

Parameters:
  • url (str) – The YouTube video URL

  • destination (str) – The destination folder for the video file

Returns:

(video_file_path, title) - Path to saved video file and video title

Return type:

tuple

Raises:

Exception – If video download fails or no suitable streams are found

Note

Prioritizes progressive MP4 streams for best compatibility. Falls back to highest resolution adaptive stream if progressive unavailable.

emotion_clf_pipeline.train module

DeBERTa-based Multi-Task Emotion Classification Trainer.

This module implements a production-ready training framework for emotion classification models supporting multiple prediction tasks: emotion, sub-emotion, and intensity levels.

Key Features: - Multi-task learning with weighted loss functions - Automatic model checkpointing and validation-based selection - Azure ML integration for model versioning and deployment - Comprehensive evaluation metrics and visualization - Flexible feature engineering pipeline integration

The trainer handles end-to-end model lifecycle from training through evaluation and deployment, with built-in support for class imbalance and feature fusion.

class emotion_clf_pipeline.train.AzureMLLogger[source]

Bases: object

Comprehensive logging class for Azure ML integration.

Handles both MLflow and Azure ML native logging to ensure metrics and artifacts appear correctly in Azure ML job overview.

__init__()[source]

Initialize the Azure ML logger with environment detection.

complete_run(status='COMPLETED')[source]

Complete the Azure ML run.

Parameters:

status (str)

create_evaluation_plots(test_preds, test_labels, test_metrics, evaluation_dir, output_tasks)[source]

Create comprehensive evaluation plots for Azure ML visualization.

Parameters:
  • test_preds – Dictionary of test predictions per task

  • test_labels – Dictionary of test labels per task

  • test_metrics – Dictionary of test metrics per task

  • evaluation_dir – Directory to save plots

  • output_tasks – List of output tasks

end_logging()[source]

End logging session.

log_artifact(local_path, artifact_path=None)[source]

Log artifacts (files/images) to both Azure ML and MLflow.

Parameters:
  • local_path (str) – Path to local file

  • artifact_path (str) – Optional subdirectory in artifacts

log_evaluation_artifacts(evaluation_dir)[source]

Log all evaluation artifacts to Azure ML for visualization.

Parameters:

evaluation_dir – Directory containing evaluation artifacts

log_image(image_path, name=None)[source]

Log image specifically for Azure ML visualization.

Parameters:
  • image_path (str) – Path to image file

  • name (str) – Display name for the image

log_metric(key, value, step=None)[source]

Log metrics to both Azure ML and MLflow.

Parameters:
  • key (str) – Metric name

  • value (float) – Metric value

  • step (int) – Step/epoch number

log_param(key, value)[source]

Log parameters to both Azure ML and MLflow.

Parameters:

key (str)

log_table(name, data)[source]

Log table data to Azure ML.

Parameters:
start_logging(run_name=None)[source]

Start logging session.

Parameters:

run_name (str)

class emotion_clf_pipeline.train.AzureMLManager(weights_dir='models/weights')[source]

Bases: object

Unified Azure ML manager for emotion classification pipeline.

Handles all Azure ML operations including: - Model weight synchronization (download/upload) - Model promotion and versioning - Status reporting and configuration validation - Backup and recovery operations

__init__(weights_dir='models/weights')[source]

Initialize Azure ML manager.

Parameters:

weights_dir – Directory path for local model weights storage

create_backup(timestamp=None)[source]

Create timestamped backup of existing model weights.

Parameters:

timestamp – Optional timestamp string, defaults to current time

download_models(dry_run=False)[source]

Download models from Azure ML if they don’t exist locally.

Parameters:

dry_run – If True, only show what would be downloaded

Returns:

(baseline_downloaded, dynamic_downloaded)

Return type:

tuple

get_status_info()[source]

Get comprehensive status information.

Returns:

Combined configuration and model status information

Return type:

dict

handle_post_training_sync(f1_score, auto_upload=False, auto_promote_threshold=0.85)[source]

Handle sync operations after training completion.

Parameters:
  • f1_score – F1 score from training

  • auto_upload – Whether to automatically upload dynamic model

  • auto_promote_threshold – F1 threshold for auto-promotion

Returns:

Results of sync operations

Return type:

dict

print_status_report(save_to_file=None)[source]

Generate and display comprehensive status report.

Parameters:

save_to_file – Optional file path to save status as JSON

promote_dynamic_to_baseline(dry_run=False)[source]

Promote dynamic model to baseline (locally and in Azure ML).

Parameters:

dry_run – If True, only show what would be promoted

Returns:

True if promotion successful, False otherwise

Return type:

bool

sync_on_startup()[source]

Perform automatic sync operations on startup.

upload_dynamic_model(f1_score, dry_run=False)[source]

Upload dynamic model to Azure ML with F1 score metadata.

Parameters:
  • f1_score – F1 score to tag the model with

  • dry_run – If True, only show what would be uploaded

Returns:

True if upload successful, False otherwise

Return type:

bool

validate_operation(operation, f1_score=None)[source]

Validate that the requested operation can be performed.

Parameters:
  • operation – Operation to validate (‘upload’, ‘promote’, etc.)

  • f1_score – F1 score for upload operations

Returns:

True if operation is valid, False otherwise

Return type:

bool

class emotion_clf_pipeline.train.CustomTrainer(model, train_dataloader, val_dataloader, test_dataloader, device, test_set_df, class_weights_tensor, encoders_dir, output_tasks=None, learning_rate=2e-05, weight_decay=0.01, epochs=1, feature_config=None)[source]

Bases: object

Production-ready trainer for multi-task emotion classification using DeBERTa.

Manages the complete training lifecycle including data loading, model training, validation, checkpointing, and evaluation. Supports flexible task configuration and automatic model promotion based on performance thresholds.

Key Capabilities: - Multi-task learning with weighted loss aggregation - Automatic best model selection via validation metrics - Feature engineering pipeline integration - Azure ML model versioning and deployment - Class imbalance handling through weighted loss functions

Thread Safety: Not thread-safe. Use separate instances for concurrent training.

__init__(model, train_dataloader, val_dataloader, test_dataloader, device, test_set_df, class_weights_tensor, encoders_dir, output_tasks=None, learning_rate=2e-05, weight_decay=0.01, epochs=1, feature_config=None)[source]

Initialize the emotion classification trainer.

Sets up training infrastructure, loads encoders, validates model dimensions, and configures feature engineering pipeline. Automatically determines feature dimensions from training data.

Parameters:
  • model – DeBERTa classifier instance with multi-task heads

  • train_dataloader – PyTorch DataLoader for training data

  • val_dataloader – PyTorch DataLoader for validation data

  • test_dataloader – PyTorch DataLoader for test data

  • device – PyTorch device (cuda/cpu) for model execution

  • test_set_df – Pandas DataFrame containing original test data with text

  • class_weights_tensor – Tensor or dict of class weights for imbalanced data

  • encoders_dir – Directory path containing label encoder pickle files

  • output_tasks – List of pred tasks [‘emotion’, ‘sub_emotion’, ‘intensity’]

  • learning_rate – AdamW optimizer learning rate (default: 2e-5)

  • weight_decay – L2 regularization coefficient (default: 0.01)

  • epochs – Number of training epochs (default: 1)

  • feature_config – Dict specifying which engineered features to use

Raises:
  • FileNotFoundError – If encoder files are missing from encoders_dir

  • ValueError – If model dimensions don’t match encoder classes

Side Effects:
  • Loads and validates label encoders

  • Configures task-specific loss weights

  • Logs initialization status and warnings

static calculate_metrics(preds, labels, task_name='')[source]

Compute comprehensive classification metrics for model evaluation.

Calculates accuracy, F1-score, precision, and recall using weighted averaging to handle class imbalance. Generates detailed classification report with per-class statistics.

Parameters:
  • preds – Model predictions as numeric class indices

  • labels – Ground truth labels as numeric class indices

  • task_name – Descriptive name for logging context

Returns:

Metrics dictionary containing:
  • acc: Accuracy score (0-1)

  • f1: Weighted F1-score (0-1)

  • prec: Weighted precision (0-1)

  • rec: Weighted recall (0-1)

  • report: Detailed classification report string

Return type:

dict

Handles edge cases like empty datasets and length mismatches gracefully by returning zero metrics with appropriate warnings.

evaluate(dataloader, criterion_dict, is_test=False)[source]

Evaluate model performance on validation or test data.

Runs inference on provided dataset without gradient computation, collecting predictions and computing loss for all active tasks.

Parameters:
  • dataloader – PyTorch DataLoader containing evaluation data

  • criterion_dict – Task-specific loss functions for loss computation

  • is_test – Boolean flag for logging context (test vs validation)

Returns:

(avg_eval_loss, all_preds, all_labels) where:
  • avg_eval_loss: Mean loss across all evaluation batches

  • all_preds: Dict mapping task names to prediction lists

  • all_labels: Dict mapping task names to ground truth lists

Return type:

tuple

Side Effects:
  • Sets model to evaluation mode (disables dropout/batch norm)

  • Logs evaluation progress via tqdm progress bar

evaluate_final_model(model_path, evaluation_output_dir)[source]

Perform comprehensive evaluation of a trained model on test data.

Loads a trained model from disk, runs inference on the test dataset, and generates detailed evaluation reports including per-sample predictions, accuracy metrics, and exported results for analysis.

Parameters:
  • model_path – File path to saved model state dict (.pt file)

  • evaluation_output_dir – Directory for saving evaluation artifacts

Returns:

Comprehensive results with columns:
  • text: Original input text samples

  • true_{task}: Ground truth labels for each task

  • pred_{task}: Model predictions for each task

  • {task}_correct: Boolean correctness per task

  • all_correct: Boolean indicating all tasks correct (if multi-task)

Return type:

pd.DataFrame

Raises:
Side Effects:
  • Loads model weights and sets to evaluation mode

  • Creates evaluation output directory if it doesn’t exist

  • Saves detailed evaluation report as CSV file

  • Logs progress and any warnings encountered

plot_evaluation_results(results_df, output_dir)[source]

Generate comprehensive plots for the evaluation results.

Parameters:
  • results_df – DataFrame containing evaluation results

  • output_dir – Directory for saving plot artifacts

Side Effects:
  • Creates plots for per-task accuracy, confusion matrix, and sample predictions

  • Saves plots as image files in the specified directory

static print_metrics(metrics_dict, split, loss=None)[source]

Display formatted training metrics in a readable table format.

Renders metrics for all tasks in a visually appealing table with color-coded headers and consistent decimal formatting. Supports different contexts (train/validation/test) with appropriate styling.

Parameters:
  • metrics_dict – Dict mapping task names to metric dictionaries

  • split – Context string (‘Train’, ‘Val’, ‘Test’) for header styling

  • loss – Optional loss value to display above metrics table

Side Effects:
  • Prints colored headers and formatted tables to console

  • Uses tabulate library for professional table formatting

  • Applies context-appropriate terminal colors

static promote_dynamic_to_baseline(weights_dir='models/weights')[source]

Promote current dynamic model to baseline status for production use.

Copies dynamic_weights.pt to baseline_weights.pt, effectively making the current best-performing model the new production baseline. This operation is typically performed after validating model performance meets promotion criteria.

Parameters:

weights_dir (str) – Directory containing model weight files

Returns:

True if promotion successful, False if dynamic model missing

Return type:

bool

Side Effects:
  • Creates or overwrites baseline_weights.pt file

  • Logs promotion status and any errors encountered

setup_training()[source]

Initialize training components for multi-task learning.

Configures loss functions, optimizer, and learning rate scheduler for all active prediction tasks. Sets up class-weighted losses for imbalanced datasets and linear warmup scheduling.

Returns:

(criterion_dict, optimizer, scheduler) where:
  • criterion_dict: Task-specific CrossEntropyLoss functions

  • optimizer: AdamW optimizer with L2 regularization

  • scheduler: Linear warmup learning rate scheduler

Return type:

tuple

Side Effects:
  • Moves class weights to appropriate device

  • Logs successful setup completion

should_promote_to_baseline(dynamic_f1, baseline_f1, threshold=0.01)[source]

Determine whether dynamic model performance justifies baseline promotion.

Compares dynamic model F1 score against current baseline with a configurable improvement threshold to prevent frequent updates from marginal improvements. Implements a simple but effective promotion strategy based on statistical significance.

Parameters:
  • dynamic_f1 – F1 score of the newly trained dynamic model

  • baseline_f1 – F1 score of the current production baseline model

  • threshold – Minimum improvement required for promotion (default: 0.01)

Returns:

True if dynamic model should replace baseline, False otherwise

Return type:

bool

Note

Uses emotion task F1 as the primary promotion criterion. In multi-task scenarios, consider weighted combinations of task performances.

train_and_evaluate(trained_model_output_dir, metrics_output_file, weights_dir_base='models/weights')[source]

Execute complete training pipeline with validation-based model selection.

Orchestrates the full training workflow including epoch iteration, validation evaluation, best model tracking, and artifact persistence. Integrates with MLflow for experiment tracking and Azure ML for model deployment.

Parameters:
  • trained_model_output_dir – Directory path for saving the best model

  • metrics_output_file – JSON file path for training metrics storage

  • weights_dir_base – Base directory for temporary model checkpoints

Returns:

Best validation F1 scores for each task from optimal epoch

Return type:

dict

Side Effects:
  • Creates temporary directories for model checkpoints

  • Logs training progress and metrics to MLflow

  • Saves model configuration and state dict files

  • Attempts Azure ML model upload with auto-promotion

  • Cleans up temporary checkpoint files after completion

train_epoch(criterion_dict, optimizer, scheduler)[source]

Execute one complete training epoch across all batches.

Performs forward pass, loss computation, backpropagation, and optimizer updates for all configured tasks. Collects predictions and ground truth labels for comprehensive metric calculation.

Parameters:
  • criterion_dict – Task-specific loss functions (from setup_training)

  • optimizer – AdamW optimizer instance

  • scheduler – Learning rate scheduler instance

Returns:

(avg_train_loss, train_metrics_epoch) where:
  • avg_train_loss: Mean loss across all batches

  • train_metrics_epoch: Dict of metrics per task

Return type:

tuple

Side Effects:
  • Updates model parameters via backpropagation

  • Advances learning rate scheduler

  • Logs training progress via tqdm progress bar

emotion_clf_pipeline.train.main()[source]

Main function for training the model.

emotion_clf_pipeline.train.parse_arguments()[source]

Parse command line arguments for training configuration.

Returns:

Parsed arguments containing training parameters

Return type:

argparse.Namespace

emotion_clf_pipeline.transcript module

class emotion_clf_pipeline.transcript.Transcript[source]

Bases: object

__init__()[source]
download_youtube_audio()[source]
process()[source]
seconds_to_hms(seconds)[source]
transcribe_audio_with_assemblyai(audio_file_path)[source]
whisper_model(audio_file_path)[source]

Module contents