diff --git a/notebooks/.gitignore b/notebooks/.gitignore new file mode 100644 index 0000000..c5f028a --- /dev/null +++ b/notebooks/.gitignore @@ -0,0 +1,53 @@ +# Data +assets/ +data/ + +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +*.egg-info/ +*.egg +dist/ +build/ +pip-wheel-metadata/ + +# Environments +.env +*.env +.venv/ +venv/ +env/ + +# Tooling caches +.mypy_cache/ +.pytype/ +.ruff_cache/ +.pytest_cache/ +.tox/ +.nox/ +.coverage +.coverage.* +coverage.xml + +# Editors/IDEs +.vscode/ +.idea/ +.history/ +.DS_Store +Thumbs.db + +# Jupyter +.ipynb_checkpoints/ + +# Cursor +.cursor/ + +# Local runs / artifacts +local_outputs/ + +# Logs +*.log +logs/ +venv/ \ No newline at end of file diff --git a/notebooks/use-cases/subset-selection/data_preparation_and_config.ipynb b/notebooks/use-cases/subset-selection/data_preparation_and_config.ipynb new file mode 100644 index 0000000..2dc1877 --- /dev/null +++ b/notebooks/use-cases/subset-selection/data_preparation_and_config.ipynb @@ -0,0 +1,905 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e6bbdb27", + "metadata": {}, + "source": [ + "# Subset Selection Notebook 1: Data Preparation & Configuration\n", + "\n", + "## Overview\n", + "This notebook is the first step in the **Subset Selection Pipeline**. It focuses on setting up the environment, configuring all necessary parameters, and preparing the input data for processing.\n", + "\n", + "## Purpose in Subset Selection\n", + "Configuration and data preparation are critical foundation steps that enable efficient subset selection. This notebook:\n", + "1. Sets up all parameters for data processing, encoding, templates, and system resources\n", + "2. Loads and inspects input data to ensure it's properly formatted\n", + "3. Establishes the DataProcessor class and utility functions used throughout the pipeline\n", + "4. Automatically detects and configures available GPU resources for parallel processing\n", + "\n", + "## Output\n", + "- **config**: ProcessingConfig object containing all pipeline parameters\n", + "- **dataset**: Loaded and validated HuggingFace dataset\n", + "- **processor**: DataProcessor instance ready for embedding generation\n", + "- Used in Notebook 2 for embedding generation and Notebook 3 for subset selection" + ] + }, + { + "cell_type": "markdown", + "id": "38278505", + "metadata": {}, + "source": [ + "## Introduction and Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c8471a58", + "metadata": {}, + "outputs": [], + "source": [ + "# Install the necessary libraries\n", + "%pip install datasets jinja2 tqdm h5py numpy torch transformers submodlib-py==0.0.3 nbformat" + ] + }, + { + "cell_type": "markdown", + "id": "86fc8a20", + "metadata": {}, + "source": [ + "## Imports and Logging" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7e2a22fb", + "metadata": {}, + "outputs": [], + "source": [ + "# Standard Imports\n", + "from dataclasses import dataclass, field\n", + "from typing import Any, Dict, List, TypedDict, TypeVar, Union\n", + "import logging\n", + "import os\n", + "import re\n", + "\n", + "# Third Party Imports\n", + "from datasets import load_dataset, concatenate_datasets\n", + "from jinja2 import BaseLoader, Environment\n", + "from tqdm import tqdm\n", + "import torch\n", + "import numpy as np\n", + "import warnings\n", + "\n", + "# Configure logging and warnings\n", + "logging.basicConfig(\n", + " format=\"%(asctime)s - %(levelname)s - %(name)s - %(message)s\",\n", + " datefmt=\"%Y-%m-%d %H:%M:%S\",\n", + " level=logging.INFO,\n", + ")\n", + "logger = logging.getLogger(__name__)\n", + "warnings.filterwarnings(\"ignore\", category=UserWarning)" + ] + }, + { + "cell_type": "markdown", + "id": "db36b1d1", + "metadata": {}, + "source": [ + "### Core Utility Functions\n", + "\n", + "These functions provide critical infrastructure for the embedding generation and subset selection pipeline:\n", + "\n", + "1. **`get_default_num_gpus(testing_mode)`**\n", + " - **Purpose**: Auto-detects GPUs for distributed embedding generation\n", + " - **Used in**: SystemConfig initialization, parallel encoding across GPUs\n", + " - **Behavior**: Returns number of available CUDA devices; falls back to CPU in testing mode\n", + " - **Error Handling**: Raises RuntimeError if no GPU found in production mode\n", + "\n", + "2. **`retry_on_exception(func)`**\n", + " - **Purpose**: Automatic retry with cleanup for transient GPU/computation errors\n", + " - **Used in**: Embedding generation and subset selection methods (Notebooks 2 & 3)\n", + " - **Handles**: GPU OOM errors, runtime errors, value/type/index errors\n", + " - **Recovery**: Cleans GPU memory and waits before retry (configurable delay)\n", + "\n", + "3. **`display_gpu_info()`**\n", + " - **Purpose**: Display detailed information about available GPU devices\n", + " - **Shows**: GPU count, device names, memory (total/allocated/free), current device\n", + " - **Useful for**: Debugging, resource planning, monitoring GPU usage\n", + " - **Returns**: Dictionary with GPU information" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f8fcef90", + "metadata": {}, + "outputs": [], + "source": [ + "from functools import wraps\n", + "import gc\n", + "import time\n", + "\n", + "def get_default_num_gpus(testing_mode: bool = False) -> int:\n", + " \"\"\"\n", + " Get the default number of GPUs based on available CUDA devices.\n", + " \n", + " Args:\n", + " testing_mode (bool): If True, allows CPU usage with warnings. For testing only.\n", + " \"\"\"\n", + " if not torch.cuda.is_available():\n", + " if testing_mode:\n", + " logger.warning(\n", + " \"No CUDA devices detected. Running in testing mode with CPU. \"\n", + " \"Production use requires GPU acceleration.\"\n", + " )\n", + " return 1\n", + " raise RuntimeError(\n", + " \"No CUDA devices detected. This functionality requires at least one GPU.\"\n", + " )\n", + " return torch.cuda.device_count()\n", + "\n", + "\n", + "def retry_on_exception(func):\n", + " \"\"\"\n", + " Decorator to retry a function upon exception up to a maximum number of retries.\n", + " \"\"\"\n", + " @wraps(func)\n", + " def wrapper(self, *args, **kwargs):\n", + " last_exception = None\n", + " for attempt in range(self.config.system.max_retries):\n", + " try:\n", + " return func(self, *args, **kwargs)\n", + " except torch.cuda.OutOfMemoryError as e:\n", + " last_exception = e\n", + " logger.error(f\"GPU out of memory on attempt {attempt + 1}: {str(e)}\")\n", + " except RuntimeError as e:\n", + " last_exception = e\n", + " logger.error(f\"PyTorch runtime error on attempt {attempt + 1}: {str(e)}\")\n", + " except ValueError as e:\n", + " last_exception = e\n", + " logger.error(f\"Value error on attempt {attempt + 1}: {str(e)}\")\n", + " except TypeError as e:\n", + " last_exception = e\n", + " logger.error(f\"Type error on attempt {attempt + 1}: {str(e)}\")\n", + " except IndexError as e:\n", + " last_exception = e\n", + " logger.error(f\"Index error on attempt {attempt + 1}: {str(e)}\")\n", + "\n", + " if attempt < self.config.system.max_retries - 1:\n", + " logger.info(f\"Retrying in {self.config.system.retry_delay} seconds...\")\n", + " time.sleep(self.config.system.retry_delay)\n", + " gc.collect()\n", + " torch.cuda.empty_cache()\n", + "\n", + " raise last_exception\n", + "\n", + " return wrapper\n", + "\n", + "def display_gpu_info():\n", + " \"\"\"\n", + " Display detailed information about available GPU devices.\n", + " \n", + " Returns:\n", + " dict: Dictionary containing GPU information\n", + " \"\"\"\n", + " gpu_info = {\n", + " 'cuda_available': torch.cuda.is_available(),\n", + " 'gpu_count': 0,\n", + " 'gpus': [],\n", + " 'current_device': None\n", + " }\n", + " \n", + " if not torch.cuda.is_available():\n", + " print(\"\\n\" + \"=\"*60)\n", + " print(\"šŸ–„ļø GPU Information\")\n", + " print(\"=\"*60)\n", + " print(\"āŒ CUDA is not available\")\n", + " print(\"šŸ’” Running on CPU\")\n", + " print(\"=\"*60 + \"\\n\")\n", + " return gpu_info\n", + " \n", + " gpu_info['gpu_count'] = torch.cuda.device_count()\n", + " gpu_info['current_device'] = torch.cuda.current_device()\n", + " \n", + " print(\"\\n\" + \"=\"*60)\n", + " print(\"šŸ–„ļø GPU Information\")\n", + " print(\"=\"*60)\n", + " print(f\"āœ… CUDA is available\")\n", + " print(f\"šŸ“Š Number of GPUs: {gpu_info['gpu_count']}\")\n", + " print(f\"šŸŽÆ Current GPU device: {gpu_info['current_device']}\")\n", + " print(\"=\"*60)\n", + " \n", + " for i in range(gpu_info['gpu_count']):\n", + " device_props = torch.cuda.get_device_properties(i)\n", + " total_memory = device_props.total_memory / 1024**3 # Convert to GB\n", + " allocated_memory = torch.cuda.memory_allocated(i) / 1024**3\n", + " reserved_memory = torch.cuda.memory_reserved(i) / 1024**3\n", + " free_memory = total_memory - reserved_memory\n", + " \n", + " gpu_data = {\n", + " 'id': i,\n", + " 'name': device_props.name,\n", + " 'total_memory_gb': round(total_memory, 2),\n", + " 'allocated_memory_gb': round(allocated_memory, 2),\n", + " 'reserved_memory_gb': round(reserved_memory, 2),\n", + " 'free_memory_gb': round(free_memory, 2),\n", + " 'compute_capability': f\"{device_props.major}.{device_props.minor}\",\n", + " 'multi_processor_count': device_props.multi_processor_count\n", + " }\n", + " gpu_info['gpus'].append(gpu_data)\n", + " \n", + " marker = \"šŸŽÆ\" if i == gpu_info['current_device'] else \" \"\n", + " print(f\"\\n{marker} GPU {i}: {device_props.name}\")\n", + " print(f\" • Compute Capability: {device_props.major}.{device_props.minor}\")\n", + " print(f\" • Multi-processors: {device_props.multi_processor_count}\")\n", + " print(f\" • Total Memory: {total_memory:.2f} GB\")\n", + " print(f\" • Allocated: {allocated_memory:.2f} GB\")\n", + " print(f\" • Reserved: {reserved_memory:.2f} GB\")\n", + " print(f\" • Free: {free_memory:.2f} GB ({(free_memory/total_memory)*100:.1f}%)\")\n", + " \n", + " print(\"=\"*60 + \"\\n\")\n", + " return gpu_info\n", + "\n", + "gpu_info = display_gpu_info()\n", + "print(\"āœ… Utility functions defined!\")" + ] + }, + { + "cell_type": "markdown", + "id": "c6a34f45", + "metadata": {}, + "source": [ + "## Configuration Classes" + ] + }, + { + "cell_type": "markdown", + "id": "1edd20f6", + "metadata": {}, + "source": [ + "### BasicConfig Class\n", + "\n", + "Defines basic processing parameters with validation and helpful metadata.\n", + "\n", + "**Key Parameters:**\n", + "- `output_dir`: Directory where results will be saved\n", + "- `batch_size`: Number of samples processed per batch (default: 100K for efficiency)\n", + "- `num_folds`: Number of folds for cross-validation in subset selection\n", + "- `combine_files`: Whether to merge multiple input files into one dataset\n", + "- `epsilon`: Optimization parameter for submodular facility location\n", + " - Default: 160.0 (optimized for datasets >100K samples)\n", + " - For smaller datasets: use values starting from 0.1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "235619ae", + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class BasicConfig:\n", + " \"\"\"Basic configuration parameters\"\"\"\n", + " output_dir: str = \"../../assets/subset-selection/outputs\" # change this to your desired output directory\n", + " batch_size: int = 100000\n", + " num_folds: int = 50\n", + " combine_files: bool = False\n", + " epsilon: float = field(\n", + " default=160.0,\n", + " metadata={\n", + " \"advanced\": True,\n", + " \"help\": \"Epsilon parameter for the LazierThanLazyGreedy optimizer in facility location maximization. \"\n", + " \"Default of 160.0 is optimized for datasets >100k samples. \"\n", + " \"For smaller datasets, consider using much smaller values (starting from 0.1).\",\n", + " },\n", + " )\n", + "\n", + " def __post_init__(self):\n", + " \"\"\"Validate configuration after initialization\"\"\"\n", + " if not 0 < self.epsilon <= 160:\n", + " raise ValueError(\"epsilon must be between 0 and 160\")\n", + "\n", + " def validate_epsilon_for_dataset_size(self, dataset_size: int)->None:\n", + " \"\"\"\n", + " Validate epsilon parameter based on dataset size and provide appropriate warnings.\n", + "\n", + " Args:\n", + " dataset_size (int): Size of the dataset being processed\n", + " \"\"\"\n", + " if dataset_size < 100000:\n", + " logger.warning(\n", + " \"Subset selection is highly recommended to be used only with dataset sizes over 100k samples. \"\n", + " f\"Your dataset has {dataset_size:,} samples.\"\n", + " )\n", + " if self.epsilon > 1.0:\n", + " logger.warning(\n", + " f\"Current epsilon value ({self.epsilon}) may be too high for a dataset of this size. \"\n", + " \"For smaller datasets, consider using much smaller values (starting from 0.1) \"\n", + " \"to ensure proper subset selection.\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "08b0602d", + "metadata": {}, + "source": [ + "### EncoderConfig Class\n", + "\n", + "Configures the embedding encoder. Separates encoder settings from other parameters for modularity.\n", + "\n", + "**Key Parameters:**\n", + "- `instruction`: Prompt prefix for the encoder to guide embedding generation\n", + "- `encoder_type`: Type of encoder to use (e.g., \"arctic\")\n", + "- `encoder_model`: Specific model identifier (e.g., \"Snowflake/snowflake-arctic-embed-l-v2.0\")\n", + "- `testing_mode`: If True, enables development features (CPU fallback, model auto-download)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ddf5de25", + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class EncoderConfig:\n", + " \"\"\"Encoder-specific configuration parameters.\"\"\"\n", + " instruction: str = field(\n", + " default=\"Generate embeddings that capture the core meaning of user-assistant conversations, ensuring the embeddings can be clustered based on semantic similarity for subset selection.\",\n", + " metadata={\"advanced\": True},\n", + " )\n", + " encoder_type: str = field(default=\"arctic\", metadata={\"advanced\": True})\n", + " encoder_model: str = field(\n", + " default=\"Snowflake/snowflake-arctic-embed-l-v2.0\", metadata={\"advanced\": True}\n", + " )\n", + " testing_mode: bool = False" + ] + }, + { + "cell_type": "markdown", + "id": "cf7e96ce", + "metadata": {}, + "source": [ + "### TemplateConfig Class\n", + "\n", + "Manages text formatting templates to enable flexible formatting for different data structures.\n", + "\n", + "**Key Parameters:**\n", + "- `template_name`: Active template to use (e.g., \"conversation\")\n", + "- `templates`: Dictionary of available templates with Jinja2 syntax\n", + " - `default`: Simple text passthrough\n", + " - `conversation`: Formats multi-turn dialogues (user/assistant)\n", + " - `qa`: Question-answer format\n", + "\n", + "**Usage**: Templates convert structured data (dicts/lists) into plain text for embedding generation." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1db28618", + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class TemplateConfig:\n", + " \"\"\"Template-related configuration parameters.\"\"\"\n", + " template_name: str = field(default=\"conversation\", metadata={\"advanced\": True})\n", + " templates: Dict[str, str] = field(\n", + " default_factory=lambda: {\n", + " \"default\": \"{{ text }}\",\n", + " \"conversation\": \"{% for msg in messages if msg.role != 'system' %}{{ msg.role }}: {{ msg.content }}\\n{% endfor %}\",\n", + " \"qa\": \"Question: {{ question }}\\nAnswer: {{ answer }}\",\n", + " },\n", + " metadata={\"advanced\": True},\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "c51acd8c", + "metadata": {}, + "source": [ + "### SystemConfig Class\n", + "\n", + "Manages system-level configuration for handling resources and error recovery.\n", + "\n", + "**Key Parameters:**\n", + "- `num_gpus`: Auto-detects available GPUs (set automatically in `__post_init__`)\n", + "- `seed`: Random seed for reproducibility (default: 42)\n", + "- `max_retries`: Number of retry attempts for failed operations (default: 3)\n", + "- `retry_delay`: Seconds to wait between retries (default: 30)\n", + "- `testing_mode`: Enables testing features (CPU fallback, reduced validation)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7c409d71", + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class SystemConfig:\n", + " \"\"\"System-related configuration parameters.\"\"\"\n", + " num_gpus: int = field(init=False)\n", + " seed: int = field(default=42, metadata={\"advanced\": True})\n", + " max_retries: int = field(default=3, metadata={\"advanced\": True})\n", + " retry_delay: int = field(default=30, metadata={\"advanced\": True})\n", + " testing_mode: bool = field(default=False, metadata={\"advanced\": True})\n", + "\n", + " def __post_init__(self):\n", + " \"\"\"Initialize num_gpus after other fields are set.\"\"\"\n", + " self.num_gpus = get_default_num_gpus(testing_mode=self.testing_mode)" + ] + }, + { + "cell_type": "markdown", + "id": "a28c824d", + "metadata": {}, + "source": [ + "### ProcessingConfig Class\n", + "\n", + "Main configuration class that combines all other configurations. Provides a single point of configuration with comprehensive validation.\n", + "\n", + "**Required Parameters:**\n", + "- `input_files`: List of input file paths to process (JSONL format)\n", + "- `subset_sizes`: List of target subset sizes\n", + " - Use **floats** (0-1) for percentages: `[0.1, 0.05]` = 10% and 5%\n", + " - Use **integers** for absolute counts: `[1000, 500]` = 1000 and 500 samples\n", + "\n", + "**Configuration Groups:**\n", + "- `basic`: Basic processing parameters (output dir, batch size, epsilon)\n", + "- `encoder`: Encoder-specific parameters (model, instruction, testing mode)\n", + "- `template`: Template-related parameters (template name, Jinja2 templates)\n", + "- `system`: System-related parameters (GPUs, seed, retries)\n", + "\n", + "**Validation**: Automatically validates subset sizes and parameter ranges in `__post_init__`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1bb6e8a1", + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class ProcessingConfig:\n", + " \"\"\"\n", + " Configuration for subset selection with basic and advanced parameters.\n", + " \n", + " \"\"\"\n", + " # Required parameters\n", + " input_files: List[str]\n", + " subset_sizes: List[Union[int, float]]\n", + "\n", + " # Configuration groups\n", + " basic: BasicConfig = field(default_factory=BasicConfig)\n", + " encoder: EncoderConfig = field(default_factory=EncoderConfig)\n", + " template: TemplateConfig = field(default_factory=TemplateConfig)\n", + " system: SystemConfig = field(default_factory=SystemConfig)\n", + "\n", + " def __post_init__(self):\n", + " \"\"\"Validate configuration after initialization.\"\"\"\n", + " if not isinstance(self.subset_sizes, list):\n", + " raise ValueError(\"subset_sizes must be a list\")\n", + "\n", + " for size in self.subset_sizes:\n", + " if not isinstance(size, (int, float)):\n", + " raise ValueError(\"subset_sizes must contain only integers or floats\")\n", + " if isinstance(size, float) and not 0 < size <= 100:\n", + " raise ValueError(\n", + " \"Percentage values in subset_sizes must be between 0 and 100\"\n", + " )\n", + " if isinstance(size, int) and size <= 0:\n", + " raise ValueError(\"Absolute values in subset_sizes must be positive\")" + ] + }, + { + "cell_type": "markdown", + "id": "5fc14927", + "metadata": {}, + "source": [ + "## Data Processor Class\n", + "\n", + "Main processing class that handles the complete data preparation pipeline with proper error handling.\n", + "\n", + "**Core Responsibilities:**\n", + "- **Data Loading**: Loads and optionally combines multiple datasets from files\n", + "- **Text Formatting**: Applies Jinja2 templates to convert structured data to text\n", + "- **Subset Calculations**: Converts percentage/absolute subset sizes to actual sample counts\n", + "- **Configuration Management**: Maintains all configuration and processing state\n", + "- **Device Management**: Sets up GPU/CPU devices and random seeds" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cd4d5185", + "metadata": {}, + "outputs": [], + "source": [ + "class DataProcessor:\n", + " \"\"\"\n", + " Enhanced data processor with support for combined files and multiple selection methods.\n", + " \n", + " This class handles the complete pipeline:\n", + " - Data loading and formatting\n", + " - Embedding generation (to be implemented in Notebook 2)\n", + " - Subset selection (to be implemented in Notebook 3)\n", + " - Result export\n", + " \"\"\"\n", + "\n", + " def __init__(self, config: ProcessingConfig):\n", + " \"\"\"\n", + " Initializes the DataProcessor with the given configuration.\n", + "\n", + " Args:\n", + " config (ProcessingConfig): The processing configuration.\n", + " \"\"\"\n", + " self.config = config\n", + " self.env = Environment(loader=BaseLoader())\n", + " self.templates = {\n", + " k: self.env.from_string(v) for k, v in config.template.templates.items()\n", + " }\n", + " self.device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", + "\n", + " # Set random seeds\n", + " np.random.seed(config.system.seed)\n", + " torch.manual_seed(config.system.seed)\n", + " if torch.cuda.is_available():\n", + " torch.cuda.manual_seed_all(config.system.seed)\n", + "\n", + " def format_text(self, example: Dict[str, Any], format_type: str) -> str:\n", + " \"\"\"\n", + " Formats the text of an example using the specified template.\n", + "\n", + " Args:\n", + " example (Dict[str, Any]): The data example to format.\n", + " format_type (str): The key of the template to use.\n", + "\n", + " Returns:\n", + " str: The formatted text.\n", + " \"\"\"\n", + " template = self.templates.get(format_type)\n", + " if not template:\n", + " raise ValueError(f\"Unknown format type: {format_type}\")\n", + " return template.render(**example)\n", + "\n", + " def load_and_combine_datasets(self, input_files: List[str]):\n", + " \"\"\"\n", + " Load and optionally combine multiple datasets.\n", + "\n", + " Args:\n", + " input_files (List[str]): List of input file paths.\n", + "\n", + " Returns:\n", + " Combined dataset or list of individual datasets.\n", + " \"\"\"\n", + " datasets = []\n", + "\n", + " for input_file in input_files:\n", + " file_extension = input_file.split(\".\")[-1]\n", + " if file_extension == \"jsonl\":\n", + " file_extension = \"json\"\n", + " dataset = load_dataset(\n", + " file_extension, data_files=input_file, split=\"train\", cache_dir=None\n", + " )\n", + " datasets.append(dataset)\n", + "\n", + " if self.config.basic.combine_files:\n", + " logger.info(\"Combining datasets...\")\n", + " return concatenate_datasets(datasets)\n", + "\n", + " if len(datasets) > 1:\n", + " raise ValueError(\n", + " \"Multiple datasets provided but combine_files is not enabled\"\n", + " )\n", + " return datasets[0]\n", + "\n", + " def calculate_subset_size(self, total_samples: int, size_spec: Union[int, float]) -> int:\n", + " \"\"\"\n", + " Calculate the actual subset size based on the specification.\n", + " \n", + " Args:\n", + " total_samples (int): Total number of samples in the dataset.\n", + " size_spec (Union[int, float]): Size specification (percentage if float, absolute if int).\n", + "\n", + " Returns:\n", + " int: Actual number of samples to select.\n", + " \"\"\"\n", + " if isinstance(size_spec, float):\n", + " # Handle percentage (0.1 = 10%, 0.05 = 5%)\n", + " if size_spec <= 0 or size_spec > 1:\n", + " raise ValueError(\n", + " \"Percentage values must be between 0(non-inclusive) and 1(inclusive)\"\n", + " )\n", + " return max(1, int(size_spec * total_samples))\n", + " # Treat as absolute number\n", + " return min(size_spec, total_samples)\n", + "\n", + " def get_subset_name(self, size_spec: Union[int, float], actual_size: int) -> str:\n", + " \"\"\"\n", + " Generate appropriate subset name based on selection method.\n", + "\n", + " Args:\n", + " size_spec (Union[int, float]): Original size specification.\n", + " actual_size (int): Actual number of samples selected.\n", + "\n", + " Returns:\n", + " str: Descriptive name for the subset.\n", + " \"\"\"\n", + " if isinstance(size_spec, float):\n", + " # Use :g format to automatically remove trailing zeros\n", + " # 0.1 -> \"0.1\", 0.05 -> \"0.05\", 0.10 -> \"0.1\"\n", + " return f\"percent_{size_spec:g}\"\n", + " return f\"samples_{actual_size}\"\n", + "\n", + " def get_dataset_name(self, input_file: str) -> str:\n", + " \"\"\"\n", + " Get a clean dataset name from the input file path.\n", + "\n", + " Args:\n", + " input_file (str): Input file path\n", + "\n", + " Returns:\n", + " str: Clean dataset name\n", + " \"\"\"\n", + " base_name = os.path.splitext(os.path.basename(input_file))[0]\n", + " clean_name = re.sub(r\"[^\\w\\-_]\", \"_\", base_name)\n", + " return clean_name\n", + "\n", + "print(\"āœ… DataProcessor class defined successfully with all utility functions!\")" + ] + }, + { + "cell_type": "markdown", + "id": "53b01ba3", + "metadata": {}, + "source": [ + "## Configuration Setup\n", + "\n", + "Create a complete configuration for the subset selection pipeline.\n", + "\n", + "**Key Configuration Decisions:**\n", + "\n", + "1. **Input Files**: Update `input_files` path to your JSONL dataset\n", + " - Expected format: One JSON object per line with `messages` field\n", + " \n", + "2. **Subset Sizes**: `[0.1, 0.05]` creates two subsets (10% and 5% of original data)\n", + " - Use floats (0-1) for percentages of the dataset\n", + " - Use integers for absolute sample counts (e.g., `[1000, 500]`)\n", + " \n", + "3. **Batch Size**: 100,000 samples per batch balances memory usage and processing speed\n", + " - Reduce if encountering OOM errors\n", + " \n", + "4. **Epsilon**: 160.0 optimized for large datasets (>100K samples)\n", + " - Controls the trade-off between quality and speed in facility location\n", + " - For smaller datasets (<100K), use values starting from 0.1\n", + " \n", + "5. **Testing Mode**: Enabled for development (allows CPU, auto-downloads models)\n", + " - Disable for production use with pre-downloaded models and GPUs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e25a7e4a", + "metadata": {}, + "outputs": [], + "source": [ + "config = ProcessingConfig(\n", + " input_files=[\"../../assets/subset-selection/combined_cut_50x.jsonl\"], # Update with your data path\n", + " subset_sizes=[0.1, 0.05], # 10% and 5% subsets\n", + " basic=BasicConfig(\n", + " output_dir=\"../../assets/subset-selection/outputs\", # Change to your desired directory (used by Notebooks 2 & 3)\n", + " batch_size=100000,\n", + " num_folds=25,\n", + " epsilon=160.0,\n", + " combine_files=False\n", + " ),\n", + " encoder=EncoderConfig(\n", + " encoder_type=\"arctic\",\n", + " encoder_model=\"Snowflake/snowflake-arctic-embed-l-v2.0\",\n", + " testing_mode=True # Enable for notebook development\n", + " ),\n", + " template=TemplateConfig(\n", + " template_name=\"conversation\",\n", + " templates={\n", + " \"default\": \"{{ text }}\",\n", + " \"conversation\": \"{% for msg in messages if msg.role != 'system' %}{{ msg.role }}: {{ msg.content }}\\n{% endfor %}\",\n", + " \"qa\": \"Question: {{ question }}\\nAnswer: {{ answer }}\",\n", + " }\n", + " ),\n", + " system=SystemConfig(\n", + " seed=42,\n", + " testing_mode=True,\n", + " max_retries=3,\n", + " retry_delay=30\n", + " )\n", + ")\n", + "\n", + "print(\"===== Configuration created successfully! =====\")\n", + "print(f\"Input files: {config.input_files}\")\n", + "print(f\"Subset sizes: {config.subset_sizes}\")\n", + "print(f\"Output directory: {config.basic.output_dir}\")\n", + "print(f\"Encoder type: {config.encoder.encoder_type}\")\n", + "print(f\"Template name: {config.template.template_name}\")\n", + "print(f\"Number of GPUs: {config.system.num_gpus}\")\n", + "print(f\"Testing mode: {config.system.testing_mode}\")" + ] + }, + { + "cell_type": "markdown", + "id": "b701330f", + "metadata": {}, + "source": [ + "## Data Loading and Validate input data\n", + "- Checks if data file exists\n", + "- Loads dataset using HuggingFace datasets\n", + "- Shows dataset statistics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9574d84f", + "metadata": {}, + "outputs": [], + "source": [ + "print(\"šŸ“ Data Loading\")\n", + "print(\"=\" * 50)\n", + "\n", + "# Check if data file exists\n", + "data_file = config.input_files[0]\n", + "if not os.path.exists(data_file):\n", + " print(f\"āŒ Data file not found: {data_file}\")\n", + " print(\"Please update the data_file path in the configuration above\")\n", + " print(\"Example data file structure:\")\n", + " print(\"\"\"\n", + " [\n", + " {\n", + " \"messages\": [\n", + " {\"role\": \"user\", \"content\": \"Hello, how are you?\"},\n", + " {\"role\": \"assistant\", \"content\": \"I'm doing well, thank you!\"}\n", + " ]\n", + " },\n", + " ...\n", + " ]\n", + " \"\"\")\n", + "else:\n", + " print(f\"āœ… Found data file: {data_file}\")\n", + " \n", + " # Load the dataset\n", + " try:\n", + " dataset = load_dataset(\"json\", data_files=data_file, split=\"train\", cache_dir=None)\n", + " print(f\"āœ… Dataset loaded successfully!\")\n", + " print(f\"šŸ“Š Dataset size: {len(dataset):,} samples\")\n", + " \n", + " # Show file size\n", + " file_size = os.path.getsize(data_file) / 1024**2 # MB\n", + " print(f\"šŸ“ File size: {file_size:.1f} MB\")\n", + " \n", + " except Exception as e:\n", + " print(f\"āŒ Error loading dataset: {e}\")\n", + " dataset = None" + ] + }, + { + "cell_type": "markdown", + "id": "00e32d1b", + "metadata": {}, + "source": [ + "## Data Inspection - show data structure\n", + "It analyzes loaded data structure and quality\n", + "- Displays sample data\n", + "- Analyzes column structure\n", + "- Counts messages and roles\n", + "- validates epsilon for dataset size" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "341d7efc", + "metadata": {}, + "outputs": [], + "source": [ + "if dataset is not None:\n", + " print(\"šŸ” Data Inspection\")\n", + " print(\"=\" * 50)\n", + " \n", + " # Show sample data\n", + " print(\"šŸ“‹ Sample data:\")\n", + " for i in range(min(3, len(dataset))):\n", + " print(f\"\\nSample {i+1}:\")\n", + " sample = dataset[i]\n", + " for key, value in sample.items():\n", + " if isinstance(value, str) and len(value) > 100:\n", + " print(f\" {key}: {value[:100]}...\")\n", + " else:\n", + " print(f\" {key}: {value}\")\n", + " \n", + " # Analyze data structure\n", + " print(f\"\\nšŸ“Š Data structure analysis:\")\n", + " print(f\" Number of samples: {len(dataset):,}\")\n", + " \n", + " # Check column names\n", + " if hasattr(dataset, 'column_names'):\n", + " print(f\" Column names: {dataset.column_names}\")\n", + " \n", + " # Analyze message structure if it exists\n", + " if 'messages' in dataset.column_names:\n", + " message_lengths = []\n", + " role_counts = {'user': 0, 'assistant': 0, 'system': 0}\n", + " \n", + " for sample in dataset.select(range(min(1000, len(dataset)))):\n", + " if 'messages' in sample:\n", + " message_lengths.append(len(sample['messages']))\n", + " for msg in sample['messages']:\n", + " if 'role' in msg:\n", + " role_counts[msg['role']] += 1\n", + " \n", + " print(f\"Average messages per conversation: {np.mean(message_lengths):.1f}\")\n", + " print(f\"Role distribution: {role_counts}\")\n", + " \n", + " # Validate epsilon for dataset size\n", + " config.basic.validate_epsilon_for_dataset_size(len(dataset))\n", + " \n", + "else:\n", + " print(\"āŒ No dataset loaded. Please fix the data loading issue above.\")" + ] + }, + { + "cell_type": "markdown", + "id": "summary_next_steps", + "metadata": {}, + "source": [ + "### šŸŽÆ Next Steps\n", + "\n", + "**Proceed to Notebook 2: Embedding Generation**\n", + "\n", + "1. Open `embedding_generation.ipynb`\n", + "2. Run all cells sequentially\n", + "3. The notebook will:\n", + " - Import all objects from this notebook using `%run`\n", + " - Add `generate_embeddings()` method to `DataProcessor`\n", + " - Generate embeddings using Arctic encoder on GPU(s)\n", + " - Save embeddings to `{output_dir}/embeddings/embeddings.h5`" + ] + }, + { + "cell_type": "markdown", + "id": "684b41dd", + "metadata": {}, + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.23" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/use-cases/subset-selection/embedding_generation.ipynb b/notebooks/use-cases/subset-selection/embedding_generation.ipynb new file mode 100644 index 0000000..a8dc1a2 --- /dev/null +++ b/notebooks/use-cases/subset-selection/embedding_generation.ipynb @@ -0,0 +1,850 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "29b190f0", + "metadata": {}, + "source": [ + "# Subset Selection Notebook 2: Embedding Generation\n", + "\n", + "## Overview\n", + "This notebook is the second step in the **Subset Selection Pipeline**. It focuses on generating high-quality embeddings from the formatted text data prepared in Notebook 1.\n", + "\n", + "## Purpose in Subset Selection\n", + "Embeddings are vector representations of text that capture semantic meaning. In subset selection, we need embeddings to:\n", + "1. **Measure Similarity**: Calculate how similar different data samples are to each other\n", + "2. **Enable Facility Location**: The subset selection algorithm uses embeddings to identify diverse, representative samples\n", + "3. **Preserve Semantic Information**: Ensure selected subsets maintain the semantic diversity of the original dataset\n", + "\n", + "## Output\n", + "- **embeddings.h5**: Merged embedding file containing vector representations of all samples\n", + "- Used in Notebook 3 for subset selection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "96f415d5", + "metadata": {}, + "outputs": [], + "source": [ + "# Import from Notebook 1 using %run magic command\n", + "# This executes the entire Notebook 1 in the current namespace\n", + "%run \"data_preparation_and_config.ipynb\"\n", + "\n", + "# Additional imports needed for embedding generation\n", + "from multiprocessing import Pool\n", + "from typing import Optional\n", + "import logging\n", + "\n", + "# Third Party Imports (not in Notebook 1)\n", + "import h5py\n", + "from transformers import AutoModel, AutoTokenizer\n", + "import torch.nn.functional as F\n", + "from dataclasses import dataclass, field\n", + "from typing import TypedDict, Union, List, Dict, Optional, Any\n", + "\n", + "# Set up logger for this notebook\n", + "logger = logging.getLogger(__name__)\n", + "\n", + "print(\"āœ… Successfully imported from Notebook 1!\")\n", + "print(f\" • config: {type(config).__name__ if 'config' in locals() else 'Not defined'}\")\n", + "print(f\" • dataset: {len(dataset) if 'dataset' in locals() and dataset else 'None'} samples\")\n", + "\n", + "print(\"\\nšŸ“¦ Notebook 2 Components Loading...\")\n", + "print(\"=\" * 60)\n" + ] + }, + { + "cell_type": "markdown", + "id": "d922c214", + "metadata": {}, + "source": [ + "### Arctic Encoder Model Configuration\n", + "Defines the configuration structure and settings for the Snowflake Arctic embedding model.\n", + "**Classes Defined:**\n", + "1. **`ModelConfig`** (TypedDict): Schema for model-specific settings\n", + "2. **`ArcticEncoderConfig`** (Dataclass): Internal configuration for the encoder instance\n", + "3. **`MODEL_CONFIGS`** (Dict): Pre-configured settings for supported models\n", + "\n", + "**Arctic Model Settings:**\n", + "- **Pooling Method**: CLS token (first token of sequence)\n", + "- **Normalization**: L2-normalized embeddings for cosine similarity\n", + "- **Max Length**: 4096 tokens (handles long documents)\n", + "- **Default Instruction**: \"Retrieve relevant passages:\"\n", + "- **Batch Size**: 24 samples per batch\n", + " - Optimized for GPU memory on typical setups (8-24GB VRAM)\n", + " - With 4096 max length and L-v2.0 model, fits comfortably in memory\n", + " - Reduce to 12-16 if encountering OOM errors\n", + " - Can increase to 32-48 on high-memory GPUs (A100, H100)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4fc44c8b", + "metadata": {}, + "outputs": [], + "source": [ + "# Model configuration \n", + "class ModelConfig(TypedDict):\n", + " pooling_method: str\n", + " normalize_embeddings: bool\n", + " max_length: int\n", + " default_instruction: str\n", + " batch_size: int\n", + "\n", + "@dataclass\n", + "class ArcticEncoderConfig:\n", + " \"\"\"Encoder configuration for Arctic model.\"\"\"\n", + " model_name: str\n", + " model_config: ModelConfig\n", + " device: torch.device\n", + " num_gpus: int\n", + " batch_size: int\n", + " use_default_instruction: bool\n", + " use_fp16: bool\n", + " testing_mode: bool = False\n", + "\n", + "# Model configurations \n", + "MODEL_CONFIGS: Dict[str, ModelConfig] = {\n", + " \"Snowflake/snowflake-arctic-embed-l-v2.0\": {\n", + " \"pooling_method\": \"cls\",\n", + " \"normalize_embeddings\": True,\n", + " \"max_length\": 4096,\n", + " \"default_instruction\": \"Retrieve relevant passages:\",\n", + " \"batch_size\": 24,\n", + " }\n", + "}" + ] + }, + { + "cell_type": "markdown", + "id": "f9b412ba", + "metadata": {}, + "source": [ + "### ArcticEmbedEncoder Implementation\n", + "\n", + "Implements the complete Arctic embedding encoder with the following capabilities:\n", + "\n", + "**Encoder Registry:**\n", + "- `ENCODER_REGISTRY`: Maps encoder types to classes\n", + "- `get_encoder_class()`: Factory function to get encoder by type\n", + "\n", + "**Design**: Each encoder instance runs on a single GPU for parallel processing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24fa8884", + "metadata": {}, + "outputs": [], + "source": [ + "class ArcticEmbedEncoder:\n", + " \"\"\"\n", + " Arctic embedding encoder for generating high-quality text embeddings.\n", + " \"\"\"\n", + " \n", + " def __init__(\n", + " self,\n", + " model_name: str = \"Snowflake/snowflake-arctic-embed-l-v2.0\",\n", + " device: Optional[torch.device] = None,\n", + " use_fp16: bool = False,\n", + " use_default_instruction: bool = True,\n", + " testing_mode: bool = False,\n", + " ) -> None:\n", + " \"\"\"Initializes encoder with specified model\n", + " Sets up GPU device\n", + " Creates configuration\n", + " Calls _initialize_model()\n", + " \"\"\"\n", + " if model_name not in MODEL_CONFIGS:\n", + " raise ValueError(\n", + " f\"Model {model_name} not supported. Supported models: {list(MODEL_CONFIGS.keys())}\"\n", + " )\n", + "\n", + " # Use the provided device or default to CUDA\n", + " self.device = device or torch.device(\n", + " \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + " )\n", + "\n", + " # Get device ID for logging\n", + " self.device_id = self.device.index if hasattr(self.device, \"index\") else 0\n", + "\n", + " # Configuration\n", + " self.cfg = ArcticEncoderConfig(\n", + " model_name=model_name,\n", + " model_config=MODEL_CONFIGS[model_name],\n", + " device=self.device,\n", + " num_gpus=1, # Only use 1 GPU per encoder instance\n", + " batch_size=MODEL_CONFIGS[model_name][\"batch_size\"],\n", + " use_default_instruction=use_default_instruction,\n", + " use_fp16=use_fp16,\n", + " testing_mode=testing_mode,\n", + " )\n", + "\n", + " self._initialize_model()\n", + "\n", + " def _initialize_model(self) -> None:\n", + " \"\"\"Loads tokenizer and model from local cache or HuggingFace\n", + " In testing mode: downloads from HuggingFace\n", + " In production: requires pre-downloaded model\n", + " Moves model to GPU and sets to evaluation mode\n", + " \"\"\"\n", + " home_dir = os.path.expanduser(\"~\")\n", + " model_path = os.path.join(\n", + " home_dir, \".cache\", \"instructlab\", \"models\", self.cfg.model_name\n", + " )\n", + "\n", + " # In testing mode, allow direct download from HuggingFace\n", + " if hasattr(self.cfg, \"testing_mode\") and self.cfg.testing_mode:\n", + " logger.warning(\n", + " f\"Model not found locally at {model_path}. \"\n", + " \"Testing mode enabled - downloading from HuggingFace...\"\n", + " )\n", + " self.tokenizer = AutoTokenizer.from_pretrained(self.cfg.model_name)\n", + " self.model = AutoModel.from_pretrained(\n", + " self.cfg.model_name,\n", + " add_pooling_layer=False,\n", + " trust_remote_code=True,\n", + " )\n", + " else:\n", + " if not os.path.exists(model_path):\n", + " raise ValueError(\n", + " f\"Model not found in available models: {self.cfg.model_name}\\n\"\n", + " \"Please run `ilab model download` and download the necessary model\"\n", + " )\n", + "\n", + " self.tokenizer = AutoTokenizer.from_pretrained(model_path)\n", + " self.model = AutoModel.from_pretrained(\n", + " model_path,\n", + " add_pooling_layer=False,\n", + " trust_remote_code=True,\n", + " local_files_only=True,\n", + " )\n", + "\n", + " if self.cfg.use_fp16:\n", + " self.model = self.model.half()\n", + "\n", + " self.model = self.model.to(self.cfg.device)\n", + " logger.info(f\"Model loaded on device: {self.cfg.device}\")\n", + "\n", + " # Set model to evaluation mode\n", + " self.model.eval()\n", + "\n", + " def _prepare_inputs(\n", + " self, texts: Union[str, List[str]], instruction: str = \"\"\n", + " ) -> List[str]:\n", + " \"\"\"Adds instruction prefix to texts\n", + " Ensures instruction is always present\n", + " Formats inputs for the model\n", + " \"\"\"\n", + " if isinstance(texts, str):\n", + " texts = [texts]\n", + "\n", + " # Ensure we always have an instruction\n", + " if not instruction and not self.cfg.use_default_instruction:\n", + " raise ValueError(\n", + " \"An instruction must be provided when use_default_instruction is False. \"\n", + " \"Either provide an instruction or set use_default_instruction to True.\"\n", + " )\n", + "\n", + " if (\n", + " not instruction\n", + " and self.cfg.use_default_instruction\n", + " and self.cfg.model_config[\"default_instruction\"]\n", + " ):\n", + " instruction = str(self.cfg.model_config[\"default_instruction\"])\n", + "\n", + " if not instruction: # catch if default_instruction is empty\n", + " raise ValueError(\n", + " \"No instruction available. Either provide an instruction or ensure \"\n", + " \"the model config has a valid default_instruction.\"\n", + " )\n", + "\n", + " texts = [f\"{instruction}: {text}\" for text in texts]\n", + " return texts\n", + "\n", + " @torch.no_grad()\n", + " def encode(\n", + " self,\n", + " inputs: Union[str, List[str]],\n", + " instruction: str = \"\",\n", + " return_tensors: bool = True,\n", + " show_progress: bool = True,\n", + " ) -> Union[torch.Tensor, np.ndarray]:\n", + " \"\"\"Main method to generate embeddings\n", + " Tokenizes input texts\n", + " Processes in batches\n", + " Applies CLS pooling and L2 normalization\n", + " Returns PyTorch tensors or numpy arrays\n", + " \"\"\"\n", + " input_was_string = isinstance(inputs, str)\n", + " inputs = self._prepare_inputs(inputs, instruction)\n", + "\n", + " encodings = self.tokenizer(\n", + " inputs,\n", + " max_length=self.cfg.model_config[\"max_length\"],\n", + " padding=True,\n", + " truncation=True,\n", + " return_tensors=\"pt\",\n", + " ).to(self.cfg.device)\n", + "\n", + " embeddings_list = []\n", + " for i in tqdm(\n", + " range(0, len(inputs), self.cfg.batch_size),\n", + " disable=not show_progress or len(inputs) < 256,\n", + " desc=f\"Encoding on {self.device}\",\n", + " ):\n", + " batch = {k: v[i : i + self.cfg.batch_size] for k, v in encodings.items()}\n", + " outputs = self.model(**batch)\n", + " # Take the first token embedding (CLS) and normalize it\n", + " embeddings = F.normalize(outputs.last_hidden_state[:, 0], p=2, dim=1)\n", + " embeddings_list.append(embeddings.cpu())\n", + "\n", + " embeddings = torch.cat(embeddings_list, dim=0)\n", + " if input_was_string:\n", + " embeddings = embeddings[0]\n", + "\n", + " return embeddings if return_tensors else embeddings.numpy()\n", + "\n", + "\n", + "# Encoder Registry\n", + "ENCODER_REGISTRY = {\n", + " \"arctic\": ArcticEmbedEncoder,\n", + "}\n", + "\n", + "def get_encoder_class(encoder_type: str):\n", + " \"\"\"Get the encoder class based on the encoder type.\"\"\"\n", + " try:\n", + " if encoder_type not in ENCODER_REGISTRY:\n", + " supported_encoders = list(ENCODER_REGISTRY.keys())\n", + " raise ValueError(\n", + " f\"Unsupported encoder type: '{encoder_type}'. \"\n", + " f\"Supported types are: {supported_encoders}\"\n", + " )\n", + " return ENCODER_REGISTRY[encoder_type]\n", + " except Exception as e:\n", + " raise ValueError(f\"Error getting encoder class: {str(e)}\") from e\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "7be02656", + "metadata": {}, + "source": [ + "### Pairwise Similarity Computation\n", + "\n", + "Defines the `compute_pairwise_dense()` function for calculating similarities between embeddings.\n", + "\n", + "**What This Function Does:**\n", + "- Computes pairwise metrics (cosine, euclidean, RBF) between two sets of vectors\n", + "- Processes in batches to avoid GPU memory overflow\n", + "- Supports multiple similarity metrics\n", + "- Applies optional scaling (min-max or additive)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4e9556da", + "metadata": {}, + "outputs": [], + "source": [ + "def compute_pairwise_dense(\n", + " tensor1: torch.Tensor,\n", + " tensor2: Optional[torch.Tensor] = None,\n", + " batch_size: int = 10000,\n", + " metric: str = \"cosine\",\n", + " device: Optional[Union[str, torch.device]] = None,\n", + " scaling: Optional[str] = None,\n", + " kw: float = 0.1,\n", + ") -> torch.Tensor:\n", + " \"\"\"\n", + " Compute pairwise metric in batches between two sets of vectors.\n", + " This function is needed for similarity computation in subset selection (Notebook 3).\n", + " - `tensor1`, `tensor2`: Input embedding tensors\n", + " - `batch_size`: Size of processing batches (default: 10K)\n", + " - `metric`: Similarity metric (\"cosine\", \"euclidean\", \"rbf\", \"dot\")\n", + " - `device`: GPU device to use\n", + " - `scaling`: Optional scaling (\"min-max\", \"additive\", None)\n", + " \"\"\"\n", + " assert batch_size > 0, \"Batch size must be positive.\"\n", + "\n", + " if not device:\n", + " device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", + "\n", + " if tensor2 is None:\n", + " tensor2 = tensor1\n", + "\n", + " tensor1, tensor2 = tensor1.to(device), tensor2.to(device)\n", + " n_samples1, n_samples2 = tensor1.size(0), tensor2.size(0)\n", + " results = torch.zeros(n_samples1, n_samples2, device=\"cpu\")\n", + "\n", + " if metric == \"cosine\":\n", + " tensor1, tensor2 = (\n", + " F.normalize(tensor1, p=2, dim=1),\n", + " F.normalize(tensor2, p=2, dim=1),\n", + " )\n", + "\n", + " def calculate_metric(a: torch.Tensor, b: torch.Tensor, metric: str, kw: float) -> torch.Tensor:\n", + " if metric in [\"cosine\", \"dot\"]:\n", + " return torch.mm(a, b.T)\n", + " if metric == \"euclidean\":\n", + " distances = torch.cdist(a, b, p=2)\n", + " similarities = 1 / (1 + distances**2)\n", + " return similarities\n", + " if metric == \"rbf\":\n", + " distance = torch.cdist(a, b)\n", + " squared_distance = distance**2\n", + " avg_dist = torch.mean(squared_distance)\n", + " torch.div(squared_distance, kw * avg_dist, out=squared_distance)\n", + " torch.exp(-squared_distance, out=squared_distance)\n", + " return squared_distance\n", + " raise ValueError(f\"Unknown metric: {metric}\")\n", + "\n", + " for i in range(0, n_samples1, batch_size):\n", + " end_i = min(i + batch_size, n_samples1)\n", + " rows = tensor1[i:end_i]\n", + "\n", + " for j in range(0, n_samples2, batch_size):\n", + " end_j = min(j + batch_size, n_samples2)\n", + " cols = tensor2[j:end_j]\n", + " batch_results = calculate_metric(rows, cols, metric, kw).cpu()\n", + " results[i:end_i, j:end_j] = batch_results\n", + "\n", + " if scaling == \"min-max\":\n", + " min_val, max_val = results.min(), results.max()\n", + " if max_val != min_val:\n", + " results = (results - min_val) / (max_val - min_val)\n", + " elif scaling == \"additive\":\n", + " results = (results + 1) / 2\n", + "\n", + " return results" + ] + }, + { + "cell_type": "markdown", + "id": "47d15888", + "metadata": {}, + "source": [ + "### Multi-GPU Embedding Generation Functions\n", + "Implements parallel embedding generation across multiple GPUs.\n", + "\n", + "**Workflow:**\n", + "Dataset → Split into N shards → Process on N GPUs in parallel → Merge results\n", + "n-samples → GPU 0: n-samples → embeddings.h5\n", + "(or split across multiple)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "77e0ade2", + "metadata": {}, + "outputs": [], + "source": [ + "def _process_dataset_shard(args):\n", + " \"\"\"\n", + " Processes one shard of data on a specific GPU\n", + " - Creates encoder instance on assigned GPU\n", + " - Applies templates to format text\n", + " - Generates embeddings in batches\n", + " - Saves embeddings to shard-specific HDF5 file\n", + " - Includes progress bar for monitoring\n", + " \"\"\"\n", + " (\n", + " gpu_id,\n", + " dataset_shard,\n", + " output_dir,\n", + " encoder_type,\n", + " encoder_model,\n", + " instruction,\n", + " template_name,\n", + " templates,\n", + " batch_size,\n", + " testing_mode,\n", + " ) = args\n", + "\n", + " try:\n", + " # Set the GPU for this process\n", + " if torch.cuda.is_available():\n", + " torch.cuda.set_device(gpu_id)\n", + " device = f\"cuda:{gpu_id}\"\n", + " else:\n", + " device = \"cpu\"\n", + " \n", + " logger.info(f\"GPU {gpu_id} started processing {len(dataset_shard)} samples\")\n", + "\n", + " encoder_cls = get_encoder_class(encoder_type)\n", + " encoder = encoder_cls(\n", + " model_name=encoder_model,\n", + " device=torch.device(device),\n", + " testing_mode=testing_mode,\n", + " )\n", + "\n", + " # Set up Jinja environment for templating\n", + " env = Environment(loader=BaseLoader())\n", + " templates_dict = {k: env.from_string(v) for k, v in templates.items()}\n", + "\n", + " # Create shard-specific output directory\n", + " shard_dir = os.path.join(output_dir, f\"shard_{gpu_id}\")\n", + " os.makedirs(shard_dir, exist_ok=True)\n", + "\n", + " # Process batches\n", + " all_embeddings = []\n", + " batch_texts = []\n", + "\n", + " # Create progress bar\n", + " progress_bar = tqdm(\n", + " desc=f\"GPU {gpu_id} generating embeddings\",\n", + " total=len(dataset_shard),\n", + " unit=\" samples\",\n", + " position=gpu_id,\n", + " leave=True,\n", + " )\n", + "\n", + " # Process each example in the shard\n", + " for example in dataset_shard:\n", + " # Format the text using the template\n", + " template = templates_dict.get(template_name)\n", + " if not template:\n", + " raise ValueError(f\"Unknown format type: {template_name}\")\n", + "\n", + " text = template.render(**example)\n", + " batch_texts.append(text)\n", + "\n", + " # Process when batch is full or at the end\n", + " if len(batch_texts) == batch_size or example == dataset_shard[-1]:\n", + " # Generate embeddings for this batch\n", + " with torch.no_grad():\n", + " batch_embeddings = (\n", + " encoder.encode(\n", + " inputs=batch_texts,\n", + " instruction=instruction,\n", + " return_tensors=False, # Return numpy for easier handling\n", + " )\n", + " )\n", + "\n", + " all_embeddings.append(batch_embeddings)\n", + " progress_bar.update(len(batch_texts))\n", + " batch_texts = []\n", + "\n", + " # Clean up GPU memory\n", + " if torch.cuda.is_available():\n", + " torch.cuda.empty_cache()\n", + "\n", + " progress_bar.close()\n", + "\n", + " # Concatenate all batches\n", + " if not all_embeddings:\n", + " logger.warning(f\"No embeddings generated for shard on GPU {gpu_id}\")\n", + " return None\n", + "\n", + " embeddings = np.concatenate(all_embeddings, axis=0)\n", + "\n", + " # Save embeddings to file\n", + " shard_file = os.path.join(shard_dir, f\"embeddings_shard_{gpu_id}.h5\")\n", + " with h5py.File(shard_file, \"w\") as h5f:\n", + " h5f.create_dataset(\"embeddings\", data=embeddings, dtype=\"float32\")\n", + "\n", + " logger.info(f\"GPU {gpu_id} completed processing. Saved to {shard_file}\")\n", + " return shard_file\n", + "\n", + " except Exception as e:\n", + " logger.error(f\"Error processing shard on GPU {gpu_id}: {str(e)}\")\n", + " raise\n", + "\n", + "\n", + "def _merge_shard_files(shard_files, merged_file):\n", + " \"\"\"\n", + " Combines all shard files into single embeddings file\n", + " - Preserves embedding dimension and data type\n", + " - Removes shard files after merging\n", + " - Creates final `embeddings.h5` file\n", + " \"\"\"\n", + " logger.info(f\"Merging {len(shard_files)} shard files into {merged_file}\")\n", + "\n", + " # Get the shape and type of embeddings from the first shard\n", + " with h5py.File(shard_files[0], \"r\") as f:\n", + " first_embeddings = f[\"embeddings\"]\n", + " embedding_dim = first_embeddings.shape[1]\n", + " dtype = first_embeddings.dtype\n", + "\n", + " # Count total samples across all shards\n", + " total_samples = 0\n", + " for shard_file in shard_files:\n", + " with h5py.File(shard_file, \"r\") as f:\n", + " total_samples += f[\"embeddings\"].shape[0]\n", + "\n", + " # Create the merged file\n", + " with h5py.File(merged_file, \"w\") as merged_f:\n", + " merged_dataset = merged_f.create_dataset(\n", + " \"embeddings\", shape=(total_samples, embedding_dim), dtype=dtype\n", + " )\n", + "\n", + " # Copy embeddings from each shard\n", + " start_idx = 0\n", + " for shard_file in shard_files:\n", + " with h5py.File(shard_file, \"r\") as shard_f:\n", + " embeddings = shard_f[\"embeddings\"][:]\n", + " end_idx = start_idx + embeddings.shape[0]\n", + " merged_dataset[start_idx:end_idx] = embeddings\n", + " start_idx = end_idx\n", + "\n", + " # Remove shard file after merging\n", + " os.remove(shard_file)\n", + " # Remove shard directory if empty\n", + " shard_dir = os.path.dirname(shard_file)\n", + " if not os.listdir(shard_dir):\n", + " os.rmdir(shard_dir)\n", + "\n", + " logger.info(\n", + " f\"Successfully merged embeddings from {len(shard_files)} GPUs with {total_samples} total samples\"\n", + " )\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "bac23ad3", + "metadata": {}, + "source": [ + "### DataProcessor with Embedding Generation\n", + "Adds the `generate_embeddings()` method to the DataProcessor class from Notebook 1.\n", + "\n", + "**Processing Modes:**\n", + "- **Testing Mode / Single GPU**: Serial processing (notebook-friendly)\n", + "- **Production / Multi-GPU**: Parallel processing with multiprocessing.Pool\n", + "**Decorated with `@retry_on_exception`:**\n", + "- Automatically retries on GPU OOM errors\n", + "- Cleans up memory between retries\n", + "- Uses retry settings from SystemConfig" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0095215f", + "metadata": {}, + "outputs": [], + "source": [ + "@retry_on_exception\n", + "def generate_embeddings(self, dataset, output_dir: str) -> str:\n", + " \"\"\"\n", + " Generates embeddings for the dataset and saves them to the output directory,\n", + " using multiple GPUs in parallel.\n", + "\n", + " Args:\n", + " dataset: The dataset to process.\n", + " output_dir (str): The directory where embeddings will be saved.\n", + "\n", + " Returns:\n", + " str: The path to the merged embeddings file.\n", + " \"\"\"\n", + " os.makedirs(output_dir, exist_ok=True)\n", + " merged_path = os.path.join(output_dir, \"embeddings.h5\")\n", + "\n", + " # If embeddings already exist, return early\n", + " if os.path.exists(merged_path):\n", + " logger.info(f\"Embeddings file already exists in {output_dir}, skipping\")\n", + " return merged_path\n", + "\n", + " # Get number of GPUs to use\n", + " num_gpus = min(\n", + " self.config.system.num_gpus,\n", + " torch.cuda.device_count() if torch.cuda.is_available() else 1\n", + " )\n", + " logger.info(f\"Using {num_gpus} GPUs for embedding generation\")\n", + "\n", + " # Create dataset shards - one per GPU\n", + " total_samples = len(dataset)\n", + " per_gpu_samples = (total_samples + num_gpus - 1) // num_gpus # Ceiling division\n", + "\n", + " # Prepare arguments for parallel processing\n", + " args_list = []\n", + " for gpu_id in range(num_gpus):\n", + " # Calculate start and end indices for this shard\n", + " start_idx = gpu_id * per_gpu_samples\n", + " end_idx = min(start_idx + per_gpu_samples, total_samples)\n", + "\n", + " if start_idx >= total_samples:\n", + " continue # Skip if this GPU has no data to process\n", + "\n", + " # Create arguments for this GPU\n", + " args_list.append(\n", + " (\n", + " gpu_id,\n", + " dataset.select(range(start_idx, end_idx)),\n", + " output_dir,\n", + " self.config.encoder.encoder_type,\n", + " self.config.encoder.encoder_model,\n", + " self.config.encoder.instruction,\n", + " self.config.template.template_name,\n", + " self.config.template.templates,\n", + " self.config.basic.batch_size,\n", + " self.config.encoder.testing_mode,\n", + " )\n", + " )\n", + "\n", + " # Process dataset shards\n", + " # Use serial processing in testing mode (notebook-friendly)\n", + " # Use parallel processing in production mode\n", + " if self.config.encoder.testing_mode or num_gpus == 1:\n", + " logger.info(\"Processing shards serially (testing mode or single GPU)\")\n", + " shard_files = []\n", + " for args in args_list:\n", + " result = _process_dataset_shard(args)\n", + " shard_files.append(result)\n", + " else:\n", + " logger.info(f\"Processing shards in parallel with {num_gpus} workers\")\n", + " with Pool(processes=num_gpus) as pool:\n", + " shard_files = pool.map(_process_dataset_shard, args_list)\n", + "\n", + " # Filter out None values (failed shards)\n", + " shard_files = [f for f in shard_files if f is not None]\n", + "\n", + " if not shard_files:\n", + " raise ValueError(\"No embeddings were generated from any GPU\")\n", + "\n", + " # Merge all shard files\n", + " _merge_shard_files(shard_files, merged_path)\n", + "\n", + " return merged_path\n", + "\n", + "\n", + "# Add the method to DataProcessor class\n", + "DataProcessor.generate_embeddings = generate_embeddings\n", + "\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"āœ… All Notebook 2 Components Loaded Successfully!\")\n", + "print(\"=\" * 60)\n", + "print(\"šŸ“¦ Components available:\")\n", + "print(\" • Arctic Encoder classes and registry\")\n", + "print(\" • Pairwise similarity computation function\")\n", + "print(\" • Multi-GPU processing functions\")\n", + "print(\" • generate_embeddings() method added to DataProcessor\")\n", + "print(\"=\" * 60)\n", + "print(\"\\nšŸŽÆ Ready to generate embeddings!\\n\")" + ] + }, + { + "cell_type": "markdown", + "id": "8573e915", + "metadata": {}, + "source": [ + "### šŸš€ Execute Embedding Generation\n", + "\n", + "Run this cell to process your dataset and generate embeddings.\n", + "\n", + "**What happens:**\n", + "1. Validates dataset and configuration\n", + "2. Splits data across available GPUs\n", + "3. Generates embeddings using Arctic encoder\n", + "4. Merges results into single HDF5 file\n", + "5. Reports timing and performance metrics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1c706662", + "metadata": {}, + "outputs": [], + "source": [ + "# Execute Multi-GPU Embedding Generation\n", + "\n", + "if 'dataset' in locals() and dataset is not None:\n", + " print(\"šŸŽÆ Multi-GPU Embedding Generation\")\n", + " print(\"=\" * 50)\n", + " \n", + " # Create DataProcessor instance if it doesn't exist\n", + " if 'processor' not in locals():\n", + " processor = DataProcessor(config)\n", + " print(\"āœ… Created DataProcessor instance\")\n", + " \n", + " # Set up output directory\n", + " output_dir = os.path.join(config.basic.output_dir, \"embeddings\")\n", + " \n", + " print(f\"\\nšŸ“Š Dataset size: {len(dataset):,} samples\")\n", + " print(f\"šŸ’¾ Output directory: {output_dir}\")\n", + " print(f\"šŸŽÆ Number of GPUs: {config.system.num_gpus}\")\n", + " print(f\"šŸ“¦ Batch size: {config.basic.batch_size}\")\n", + " \n", + " # Generate embeddings using the extended DataProcessor\n", + " print(f\"\\nšŸš€ Starting multi-GPU embedding generation...\")\n", + " start_time = time.time()\n", + " \n", + " try:\n", + " embeddings_file = processor.generate_embeddings(dataset, output_dir)\n", + " \n", + " generation_time = time.time() - start_time\n", + " \n", + " print(f\"\\nāœ… Embedding generation completed!\")\n", + " print(f\"ā±ļø Total time: {generation_time / 60:.2f} minutes\")\n", + " print(f\"šŸŽÆ Speed: {len(dataset) / generation_time:.2f} samples/sec\")\n", + " print(f\"šŸ’¾ Embeddings saved to: {embeddings_file}\")\n", + " \n", + " # Show file size\n", + " file_size = os.path.getsize(embeddings_file) / 1024**2\n", + " print(f\"šŸ“ File size: {file_size:.1f} MB\")\n", + " \n", + " except Exception as e:\n", + " print(f\"āŒ Error during embedding generation: {e}\")\n", + " embeddings_file = None\n", + " \n", + "else:\n", + " print(\"āŒ Cannot generate embeddings without dataset\")\n", + " print(\"Please ensure Notebook 1 has been run and dataset was loaded successfully\")" + ] + }, + { + "cell_type": "markdown", + "id": "success_summary", + "metadata": {}, + "source": [ + "### šŸŽÆ Next Steps\n", + "\n", + "**Option 1: Continue to Notebook 3 (Recommended)**\n", + "1. Open `subset_selection.ipynb`\n", + "2. Run the notebook sequentially\n", + "3. Use the embeddings generated here for subset selection\n", + "4. This is the FAST path - no redundant computation!\n", + "\n", + "**Option 2: Reuse These Embeddings Later**\n", + "- The embeddings file is saved and can be loaded anytime\n", + "- Path: `{config.basic.output_dir}/embeddings/embeddings.h5`\n", + "- Use `h5py.File()` to load them in any notebook or script" + ] + }, + { + "cell_type": "markdown", + "id": "61c316f6", + "metadata": {}, + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.23" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/use-cases/subset-selection/subset_selection.ipynb b/notebooks/use-cases/subset-selection/subset_selection.ipynb new file mode 100644 index 0000000..8f513cc --- /dev/null +++ b/notebooks/use-cases/subset-selection/subset_selection.ipynb @@ -0,0 +1,913 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "2569cf0f", + "metadata": {}, + "source": [ + "# Subset Selection Notebook 3: Subset Selection & Results\n", + "\n", + "## Overview\n", + "This notebook is the **final step** in the Subset Selection Pipeline. It uses the embeddings generated in Notebook 2 to select diverse, representative subsets using the submodular optimization algorithm.\n", + "\n", + "## Purpose in Subset Selection\n", + "The Facility Location algorithm selects a subset of samples that:\n", + "1. **Maximizes Diversity**: Selects samples that are representative of the entire dataset\n", + "2. **Minimizes Redundancy**: Avoids selecting similar samples\n", + "3. **Optimizes Coverage**: Ensures all semantic regions of the dataset are represented\n", + "4. **Maintains Quality**: Preserves the distribution and characteristics of the original dataset\n", + "\n", + "## Output\n", + "- **Subset JSONL files**: Selected subsets in JSONL format (e.g., `combined_cut_50x_percent_0.1_subset.jsonl`)\n", + "- **Metadata files**: `.npz` files containing selected indices and gain scores for each subset\n", + "- **Statistics**: Summary of subset sizes, selection time, and quality metrics" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2575984b", + "metadata": {}, + "outputs": [], + "source": [ + "# Import from Notebook 2 (which already imports Notebook 1)\n", + "%run \"embedding_generation.ipynb\"\n", + "\n", + "# Additional imports for subset selection\n", + "import math\n", + "import logging\n", + "from dataclasses import dataclass, field\n", + "from typing import TypedDict, Union, List, Dict, Optional, Any\n", + "# Import submodlib for Facility Location\n", + "from submodlib import FacilityLocationFunction\n", + "\n", + "# Set up logger for this notebook\n", + "logger = logging.getLogger(__name__)\n", + "\n", + "print(\"āœ… Successfully imported from previous notebooks!\")\n", + "print(f\" • config: {type(config).__name__ if 'config' in locals() else 'Not defined'}\")\n", + "print(f\" • dataset: {len(dataset) if 'dataset' in locals() and dataset else 'None'} samples\")\n", + "print(f\" • processor: {type(processor).__name__ if 'processor' in locals() else 'Not defined'}\")\n", + "print(f\" • DataProcessor has generate_embeddings(): {hasattr(processor, 'generate_embeddings') if 'processor' in locals() else False}\")\n", + "print(f\"\\nāœ… Subset selection libraries loaded!\")\n", + "print(f\" • submodlib: FacilityLocationFunction available\")\n", + "print(f\" • All utility functions from previous notebooks available\")" + ] + }, + { + "cell_type": "markdown", + "id": "1bd0c0f2", + "metadata": {}, + "source": [ + "### GPU-Based Fold Processing\n", + "\n", + "Implements the core subset selection algorithm using facility location.\n", + "\n", + "**Why Folds?**\n", + "The dataset is split into multiple folds for three key reasons:\n", + "1. **Memory Efficiency**: Processing smaller chunks prevents GPU out-of-memory errors\n", + "2. **Parallelization**: Multiple GPUs can work on different folds simultaneously\n", + "3. **Better Coverage**: Cross-validation-like approach ensures diverse sample selection\n", + "\n", + "**How It Works:**\n", + "```\n", + "Full Dataset (N samples)\n", + "↓\n", + "Split into K folds (e.g., 25 folds)\n", + "↓\n", + "Assign folds to GPUs\n", + "↓\n", + "For each fold:\n", + "- Compute pairwise similarities\n", + "- Run facility location algorithm\n", + "- Select diverse samples with high gains\n", + "↓\n", + "Merge results from all folds\n", + "↓\n", + "Select top samples by gain scores\n", + "```\n", + "**Algorithm**: Uses LazierThanLazyGreedy optimizer for efficient submodular maximization." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3714693b", + "metadata": {}, + "outputs": [], + "source": [ + "def process_folds_with_gpu(args):\n", + " \"\"\"\n", + " Process folds on GPU or CPU with support for both percentage and absolute size specifications.\n", + " \"\"\"\n", + " (\n", + " gpu_id,\n", + " gpu_folds_info,\n", + " embeddings,\n", + " subset_sizes,\n", + " total_samples,\n", + " epsilon,\n", + " testing_mode,\n", + " ) = args\n", + "\n", + " try:\n", + " if torch.cuda.is_available():\n", + " torch.cuda.set_device(gpu_id)\n", + " device = f\"cuda:{gpu_id}\"\n", + " else:\n", + " if not testing_mode:\n", + " raise RuntimeError(\"GPU processing required but CUDA is not available\")\n", + " logger.warning(\n", + " \"Running in CPU mode for testing. Production use requires GPU acceleration.\"\n", + " )\n", + " device = \"cpu\"\n", + "\n", + " results = []\n", + " for fold_idx, fold_indices in gpu_folds_info:\n", + " try:\n", + " logger.info(f\"Processing fold {fold_idx + 1} on GPU {gpu_id}\")\n", + "\n", + " fold_embeddings = embeddings[fold_indices].to(device)\n", + "\n", + " logger.info(f\"Computing similarity matrix for fold {fold_idx + 1}\")\n", + " max_sim_mat = compute_pairwise_dense(\n", + " fold_embeddings,\n", + " batch_size=50000,\n", + " metric=\"cosine\",\n", + " device=device,\n", + " scaling=\"additive\",\n", + " )\n", + " similarity_matrix = max_sim_mat.cpu().numpy()\n", + "\n", + " subsets = {}\n", + " ds_func = FacilityLocationFunction(\n", + " n=similarity_matrix.shape[0],\n", + " sijs=similarity_matrix,\n", + " mode=\"dense\",\n", + " separate_rep=False,\n", + " )\n", + "\n", + " for size_spec in subset_sizes:\n", + " if isinstance(size_spec, float):\n", + " # Percentage-based selection\n", + " budget = max(\n", + " 1, math.ceil(size_spec * similarity_matrix.shape[0])\n", + " )\n", + " else:\n", + " # Absolute number-based selection\n", + " budget = max(\n", + " 1,\n", + " math.ceil(\n", + " size_spec * (similarity_matrix.shape[0] / total_samples)\n", + " ),\n", + " )\n", + "\n", + " logger.info(\n", + " f\"Selecting subset of size {budget} for fold {fold_idx + 1}\"\n", + " )\n", + "\n", + " subset_result = ds_func.maximize(\n", + " budget=budget,\n", + " optimizer=\"LazierThanLazyGreedy\",\n", + " epsilon=epsilon,\n", + " stopIfZeroGain=False,\n", + " stopIfNegativeGain=False,\n", + " verbose=False,\n", + " )\n", + "\n", + " subset_indices = [fold_indices[x[0]] for x in subset_result]\n", + " subset_gains = [x[1] for x in subset_result]\n", + " subsets[size_spec] = {\n", + " \"indices\": subset_indices,\n", + " \"gains\": subset_gains,\n", + " }\n", + "\n", + " results.append((fold_idx, subsets))\n", + "\n", + " except Exception as e:\n", + " logger.error(\n", + " f\"Error processing fold {fold_idx + 1} on GPU {gpu_id}: {str(e)}\"\n", + " )\n", + " raise\n", + " finally:\n", + " # Cleanup - ADDED THIS SECTION\n", + " for var in [\"ds_func\", \"similarity_matrix\", \"fold_embeddings\"]:\n", + " if var in locals():\n", + " del locals()[var]\n", + " gc.collect()\n", + " if torch.cuda.is_available():\n", + " torch.cuda.empty_cache()\n", + "\n", + " return results\n", + " except Exception as e:\n", + " logger.error(f\"Error in process_folds_with_gpu on GPU {gpu_id}: {str(e)}\")\n", + " raise\n", + "\n", + "print(\"āœ… process_folds_with_gpu function defined!\")" + ] + }, + { + "cell_type": "markdown", + "id": "14b6bf0d", + "metadata": {}, + "source": [ + "### šŸŽÆ Two Ways to Run Subset Selection\n", + "\n", + "**āœ… RECOMMENDED: Use Existing Embeddings (Fast)**\n", + "- Function: `run_subset_selection_only()`\n", + "- Use when: You've already run Notebook 2 and have embeddings\n", + "- Benefits: Much faster (minutes vs hours), no redundant computation\n", + "- What it does: Loads embeddings from Notebook 2 → Runs facility location → Saves subsets\n", + "\n", + "**šŸ”„ Alternative: Full Pipeline (Slow)**\n", + "- Function: `subset_datasets()`\n", + "- Use when: Running standalone without Notebook 2, or need fresh embeddings\n", + "- What it does: Generates embeddings → Runs facility location → Saves subsets\n", + "- Note: This regenerates embeddings even if they exist!\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "896ee2fd", + "metadata": {}, + "outputs": [], + "source": [ + "def run_subset_selection_only(\n", + " embeddings_file: str,\n", + " dataset,\n", + " output_dir: str,\n", + " subset_sizes: List[Union[int, float]],\n", + " dataset_name: str = \"combined_cut_50x\", #Change it according to your dataset\n", + " num_folds: int = 25,\n", + " epsilon: float = 160.0,\n", + " num_gpus: int = None,\n", + " testing_mode: bool = True,\n", + ") -> None:\n", + " \"\"\"\n", + " Run ONLY subset selection using pre-computed embeddings from Notebook 2.\n", + " \n", + " This function is designed for the notebook workflow where embeddings \n", + " are already generated. It skips the embedding generation step entirely.\n", + " \n", + " Args:\n", + " embeddings_file: Path to the embeddings.h5 file from Notebook 2\n", + " dataset: The dataset loaded in Notebook 1\n", + " output_dir: Where to save subset results\n", + " subset_sizes: List of subset sizes (floats for %, ints for absolute)\n", + " dataset_name: Name of the dataset (for output files)\n", + " num_folds: Number of folds for facility location\n", + " epsilon: Epsilon parameter for the optimizer\n", + " num_gpus: Number of GPUs to use (auto-detected if None)\n", + " testing_mode: Enable testing mode for CPU fallback\n", + " \"\"\"\n", + " # Verify embeddings file exists\n", + " if not os.path.exists(embeddings_file):\n", + " raise FileNotFoundError(\n", + " f\"Embeddings file not found: {embeddings_file}\\n\"\n", + " \"Please run Notebook 2 (embedding_generation.ipynb) first!\"\n", + " )\n", + " \n", + " # Auto-detect GPUs if not specified\n", + " if num_gpus is None:\n", + " num_gpus = get_default_num_gpus(testing_mode=testing_mode)\n", + " \n", + " logger.info(f\"šŸ“ Using existing embeddings from: {embeddings_file}\")\n", + " logger.info(f\"šŸŽÆ Dataset: {dataset_name} with {len(dataset)} samples\")\n", + " logger.info(f\"šŸ“Š Subset sizes: {subset_sizes}\")\n", + " logger.info(f\"šŸ”§ Folds: {num_folds}, Epsilon: {epsilon}\")\n", + " \n", + " # Create configuration (minimal, just for subset selection)\n", + " basic_config = BasicConfig(\n", + " output_dir=output_dir,\n", + " num_folds=num_folds,\n", + " epsilon=epsilon,\n", + " combine_files=False,\n", + " )\n", + " \n", + " # Validate epsilon for dataset size\n", + " basic_config.validate_epsilon_for_dataset_size(len(dataset))\n", + " \n", + " # Create system config (num_gpus is auto-detected in __post_init__)\n", + " system_config = SystemConfig(\n", + " testing_mode=testing_mode,\n", + " )\n", + " \n", + " # Override num_gpus if specified\n", + " if num_gpus is not None:\n", + " system_config.num_gpus = num_gpus\n", + " \n", + " # Create minimal config for subset selection only\n", + " config = ProcessingConfig(\n", + " input_files=[], # Not needed for this path\n", + " subset_sizes=subset_sizes,\n", + " basic=basic_config,\n", + " encoder=EncoderConfig(testing_mode=testing_mode),\n", + " template=TemplateConfig(),\n", + " system=system_config,\n", + " )\n", + " \n", + " # Initialize processor\n", + " processor = DataProcessor(config)\n", + " \n", + " try:\n", + " # Load embeddings\n", + " logger.info(\"šŸ“„ Loading embeddings...\")\n", + " with h5py.File(embeddings_file, \"r\") as f:\n", + " embeddings_data = f[\"embeddings\"][:]\n", + " if embeddings_data.size == 0:\n", + " raise ValueError(\"Embeddings file is empty!\")\n", + " embeddings = torch.tensor(embeddings_data, dtype=torch.float32)\n", + " \n", + " logger.info(f\"āœ… Loaded embeddings: shape {embeddings.shape}\")\n", + " \n", + " # Create output directory\n", + " dataset_output_dir = os.path.join(output_dir, dataset_name)\n", + " os.makedirs(dataset_output_dir, exist_ok=True)\n", + " \n", + " # Run subset selection\n", + " logger.info(\"šŸŽÆ Running subset selection...\")\n", + " start_time = time.time()\n", + " \n", + " subsets = processor.select_subsets(dataset_name, embeddings)\n", + " \n", + " selection_time = time.time() - start_time\n", + " \n", + " # Save subsets to files\n", + " logger.info(\"šŸ’¾ Saving selected subsets...\")\n", + " for size_spec, indices in subsets.items():\n", + " subset_data = dataset.select(indices)\n", + " subset_name = processor.get_subset_name(size_spec, len(indices))\n", + " \n", + " output_file = os.path.join(\n", + " dataset_output_dir,\n", + " f\"{dataset_name}_{subset_name}_subset.jsonl\",\n", + " )\n", + " \n", + " processor._save_subset(subset_data, output_file, \"dummy.jsonl\")\n", + " logger.info(f\"āœ… Saved {len(indices)} samples to {output_file}\")\n", + " \n", + " # Print summary\n", + " print(\"\\n\" + \"=\" * 70)\n", + " print(\"āœ… SUBSET SELECTION COMPLETED!\")\n", + " print(\"=\" * 70)\n", + " print(f\"ā±ļø Selection time: {selection_time / 60:.2f} minutes\")\n", + " print(f\"šŸ“Š Created {len(subsets)} subset(s):\")\n", + " for size_spec, indices in subsets.items():\n", + " subset_name = processor.get_subset_name(size_spec, len(indices))\n", + " print(f\" • {subset_name}: {len(indices)} samples\")\n", + " print(f\"šŸ’¾ Output directory: {dataset_output_dir}\")\n", + " print(\"=\" * 70)\n", + " \n", + " except Exception as e:\n", + " logger.error(f\"Error during subset selection: {str(e)}\")\n", + " raise\n", + " finally:\n", + " # Cleanup\n", + " gc.collect()\n", + " if torch.cuda.is_available():\n", + " torch.cuda.empty_cache()\n", + "\n", + "print(\"āœ… run_subset_selection_only() function defined!\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "8226b657", + "metadata": {}, + "source": [ + "### Extend DataProcessor with Subset Selection\n", + "\n", + "Adds the `select_subsets()` method to the DataProcessor class.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0fc767d9", + "metadata": {}, + "outputs": [], + "source": [ + "def select_subsets(self, dataset_name: str, embeddings: torch.Tensor) -> Dict[Union[int, float], List[int]]:\n", + " \"\"\"\n", + " Perform diverse subset selection using facility location with multi-GPU support.\n", + " \n", + " This method implements a three-phase approach to select representative subsets from \n", + " embeddings using submodular optimization. It supports both percentage-based and \n", + " absolute count specifications for subset sizes.\n", + " \n", + " **Process Overview:**\n", + " \n", + " Phase 1 - Create Folds:\n", + " - Randomly shuffles sample indices\n", + " - Splits into N folds with roughly equal sizes\n", + " - Distributes folds across available GPUs\n", + " \n", + " Phase 2 - Process Folds (Parallel/Serial):\n", + " - Testing Mode: Processes folds serially (one at a time)\n", + " - Production Mode: Processes folds in parallel using multiprocessing.Pool\n", + " - Each GPU runs facility location algorithm on its assigned folds\n", + " \n", + " Phase 3 - Aggregate Results:\n", + " - Collects selected indices and gain scores from all folds\n", + " - Sorts by gain scores (highest first)\n", + " - Selects top N samples for each target subset size\n", + " - Saves metadata to .npz files\n", + " \n", + " **Selection Modes:**\n", + " - Percentage: Float values (0.1 = 10%, 0.05 = 5% of dataset)\n", + " - Absolute: Integer values (100 = exactly 100 samples)\n", + " \n", + " Args:\n", + " dataset_name (str): Name of the dataset, used for output file naming.\n", + " embeddings (torch.Tensor): Embedding tensor of shape (num_samples, embedding_dim).\n", + " Contains the vector representations of all samples in the dataset.\n", + " \n", + " Returns:\n", + " Dict[Union[int, float], List[int]]: Dictionary mapping subset size specifications \n", + " to lists of selected sample indices. Keys are the original size_spec values \n", + " (floats for percentages, ints for absolute counts), and values are lists of \n", + " integer indices representing the selected samples.\n", + " \n", + " Output Files:\n", + " Creates metadata files in the format:\n", + " `{output_dir}/{dataset_name}_fl_{num_folds}_partitions_{subset_name}_metadata.npz`\n", + " \n", + " Each file contains:\n", + " - indices: numpy array of selected sample indices\n", + " - gains: numpy array of corresponding gain scores from facility location\n", + " \"\"\"\n", + " indices = np.arange(len(embeddings))\n", + " np.random.shuffle(indices)\n", + "\n", + " fold_size = len(embeddings) // self.config.basic.num_folds\n", + " remainder = len(embeddings) % self.config.basic.num_folds\n", + "\n", + " folds = []\n", + " start_idx = 0\n", + " for i in range(self.config.basic.num_folds):\n", + " extra = 1 if i < remainder else 0\n", + " end_idx = start_idx + fold_size + extra\n", + " folds.append(indices[start_idx:end_idx])\n", + " start_idx = end_idx\n", + "\n", + " gpu_assignments = []\n", + " folds_per_gpu = self.config.basic.num_folds // self.config.system.num_gpus\n", + " extra_folds = self.config.basic.num_folds % self.config.system.num_gpus\n", + "\n", + " start_fold = 0\n", + " for gpu_id in range(self.config.system.num_gpus):\n", + " num_folds_this_gpu = folds_per_gpu + (1 if gpu_id < extra_folds else 0)\n", + " end_fold = start_fold + num_folds_this_gpu\n", + " gpu_folds_info = [\n", + " (fold_idx, folds[fold_idx]) for fold_idx in range(start_fold, end_fold)\n", + " ]\n", + "\n", + " gpu_assignments.append(\n", + " (\n", + " gpu_id,\n", + " gpu_folds_info,\n", + " embeddings,\n", + " self.config.subset_sizes,\n", + " len(embeddings),\n", + " self.config.basic.epsilon,\n", + " self.config.system.testing_mode,\n", + " )\n", + " )\n", + " start_fold = end_fold\n", + "\n", + " # Use serial processing in testing mode (notebook-friendly)\n", + " if self.config.system.testing_mode or self.config.system.num_gpus == 1:\n", + " logger.info(\"Processing folds serially (testing mode or single GPU)\")\n", + " gpu_results = []\n", + " for args in gpu_assignments:\n", + " result = process_folds_with_gpu(args)\n", + " gpu_results.append(result)\n", + " else:\n", + " logger.info(f\"Processing folds in parallel with {self.config.system.num_gpus} workers\")\n", + " with Pool(processes=self.config.system.num_gpus) as pool:\n", + " gpu_results = pool.map(process_folds_with_gpu, gpu_assignments)\n", + "\n", + " all_results = []\n", + " for gpu_result in gpu_results:\n", + " all_results.extend(gpu_result)\n", + "\n", + " class SubsetData(TypedDict):\n", + " indices: List[int]\n", + " gains: List[float]\n", + "\n", + " combined_subsets: Dict[Union[int, float], SubsetData] = {\n", + " size: {\"indices\": [], \"gains\": []} for size in self.config.subset_sizes\n", + " }\n", + "\n", + " for fold_idx, result in all_results:\n", + " for size in self.config.subset_sizes:\n", + " combined_subsets[size][\"indices\"].extend(result[size][\"indices\"])\n", + " combined_subsets[size][\"gains\"].extend(result[size][\"gains\"])\n", + "\n", + " base_name = dataset_name\n", + " subsets = {}\n", + "\n", + " for size_spec in self.config.subset_sizes:\n", + " actual_size = self.calculate_subset_size(len(embeddings), size_spec)\n", + " logger.info(f\"Actual subset size: {actual_size}\")\n", + " sorted_indices_gains = sorted(\n", + " zip(\n", + " combined_subsets[size_spec][\"indices\"],\n", + " combined_subsets[size_spec][\"gains\"],\n", + " ),\n", + " key=lambda x: x[1],\n", + " reverse=True,\n", + " )[:actual_size]\n", + "\n", + " sorted_indices = [x[0] for x in sorted_indices_gains]\n", + " sorted_gains = [x[1] for x in sorted_indices_gains]\n", + "\n", + " subset_name = self.get_subset_name(size_spec, actual_size)\n", + " metadata_file = os.path.join(\n", + " self.config.basic.output_dir,\n", + " f\"{base_name}_fl_{self.config.basic.num_folds}_partitions_{subset_name}_metadata.npz\",\n", + " )\n", + "\n", + " np.savez(metadata_file, indices=sorted_indices, gains=sorted_gains)\n", + " logger.info(f\"Saved metadata to {metadata_file}\")\n", + " subsets[size_spec] = sorted_indices\n", + "\n", + " return subsets\n", + "\n", + "\n", + "# Add the method to DataProcessor class\n", + "DataProcessor.select_subsets = select_subsets\n", + "\n", + "print(\"āœ… select_subsets() method added to DataProcessor!\")" + ] + }, + { + "cell_type": "markdown", + "id": "53e89212", + "metadata": {}, + "source": [ + "### Add File Processing Helper Methods" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9e89ac29", + "metadata": {}, + "outputs": [], + "source": [ + "def _save_subset(self, subset_data, output_file: str, input_file: str):\n", + " \"\"\"\n", + " Save subset data to file in appropriate format.\n", + " \n", + " Args:\n", + " subset_data: The dataset subset to save\n", + " output_file (str): Output file path\n", + " input_file (str): Original input file path (for determining format)\n", + " \"\"\"\n", + " extension = input_file.split(\".\")[-1]\n", + " if extension in [\"json\", \"jsonl\"]:\n", + " subset_data.to_json(output_file, orient=\"records\", lines=True)\n", + " elif extension == \"csv\":\n", + " subset_data.to_csv(output_file, index=False)\n", + " elif extension == \"parquet\":\n", + " subset_data.to_parquet(output_file)\n", + " \n", + " logger.info(f\"Saved subset to {output_file}\")\n", + "\n", + "\n", + "def _process_single_dataset(\n", + " self,\n", + " dataset,\n", + " dataset_name: str,\n", + " output_dir: str,\n", + " input_file: str\n", + "):\n", + " \"\"\"\n", + " Process a single dataset (either combined or individual).\n", + " \n", + " This function orchestrates the complete pipeline for a single dataset:\n", + " 1. Validates epsilon parameter\n", + " 2. Generates or loads embeddings\n", + " 3. Selects subsets using Facility Location\n", + " 4. Saves selected subsets to files\n", + " \n", + " Args:\n", + " dataset: The dataset to process\n", + " dataset_name: Name of the dataset\n", + " output_dir: Output directory for results\n", + " input_file: Original input file path (for determining format)\n", + " \"\"\"\n", + " try:\n", + " # Validate epsilon based on dataset size\n", + " self.config.basic.validate_epsilon_for_dataset_size(len(dataset))\n", + "\n", + " # Create dataset-specific output directory\n", + " dataset_output_dir = os.path.join(output_dir, dataset_name)\n", + " os.makedirs(dataset_output_dir, exist_ok=True)\n", + "\n", + " logger.info(f\"Generating embeddings for {dataset_name}\")\n", + " embedding_file = self.generate_embeddings(\n", + " dataset, os.path.join(dataset_output_dir, \"embeddings\")\n", + " )\n", + "\n", + " logger.info(\"Loading embeddings for subset selection\")\n", + " with h5py.File(embedding_file, \"r\") as f:\n", + " embeddings_data = f[\"embeddings\"][:]\n", + " if embeddings_data.size == 0:\n", + " logger.warning(\n", + " f\"No embeddings generated for dataset {dataset_name}, skipping subset selection\"\n", + " )\n", + " return\n", + " embeddings = torch.tensor(embeddings_data, dtype=torch.float32)\n", + "\n", + " logger.info(\"Selecting subsets\")\n", + " subsets = self.select_subsets(dataset_name, embeddings)\n", + "\n", + " logger.info(\"Saving subsets\")\n", + " for size_spec, indices in subsets.items():\n", + " subset_data = dataset.select(indices)\n", + " subset_name = self.get_subset_name(size_spec, len(indices))\n", + "\n", + " # Create subset filename with dataset name\n", + " output_file = os.path.join(\n", + " dataset_output_dir,\n", + " f\"{dataset_name}_{subset_name}_subset.{input_file.split('.')[-1]}\",\n", + " )\n", + "\n", + " self._save_subset(subset_data, output_file, input_file)\n", + " logger.info(\n", + " f\"Saved subset with {len(indices)} samples to {output_file}\"\n", + " )\n", + "\n", + " # Clean up resources\n", + " del dataset, embeddings\n", + " gc.collect()\n", + " torch.cuda.empty_cache()\n", + "\n", + " except Exception as e:\n", + " logger.error(f\"Error processing dataset {dataset_name}: {str(e)}\")\n", + " raise\n", + "\n", + "\n", + "def process_files(self, input_files: List[str], output_dir: str):\n", + " \"\"\"\n", + " Process multiple input files with support for both combined and separate processing.\n", + " \n", + " Args:\n", + " input_files (List[str]): List of input files to process\n", + " output_dir (str): Output directory for results\n", + " \"\"\"\n", + " try:\n", + " if self.config.basic.combine_files:\n", + " # Process combined datasets\n", + " logger.info(\"Processing combined datasets...\")\n", + " dataset = self.load_and_combine_datasets(input_files)\n", + " dataset_name = \"combined_dataset\"\n", + "\n", + " # Process combined dataset\n", + " self._process_single_dataset(\n", + " dataset, dataset_name, output_dir, input_files[0]\n", + " )\n", + " else:\n", + " # Process each dataset separately\n", + " logger.info(\"Processing datasets separately...\")\n", + " for input_file in input_files:\n", + " dataset = self.load_and_combine_datasets([input_file])\n", + " dataset_name = self.get_dataset_name(input_file)\n", + " logger.info(f\"Processing dataset: {dataset_name}\")\n", + " self._process_single_dataset(\n", + " dataset, dataset_name, output_dir, input_file\n", + " )\n", + "\n", + " except Exception as e:\n", + " logger.error(f\"Error processing files: {str(e)}\")\n", + " raise\n", + "\n", + "\n", + "# Add all methods to DataProcessor class\n", + "DataProcessor._save_subset = _save_subset\n", + "DataProcessor._process_single_dataset = _process_single_dataset\n", + "DataProcessor.process_files = process_files\n", + "\n", + "print(\"āœ… Additional methods added to DataProcessor!\")" + ] + }, + { + "cell_type": "markdown", + "id": "79c575d7", + "metadata": {}, + "source": [ + "### Main Wrapper Function: subset_datasets()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "654e612a", + "metadata": {}, + "outputs": [], + "source": [ + "def subset_datasets(\n", + " input_files: List[str],\n", + " subset_sizes: List[Union[int, float]],\n", + " testing_mode: bool = False,\n", + " **kwargs: Any,\n", + ") -> None:\n", + " \"\"\"\n", + " Create subsets of datasets using facility location for diverse subset selection.\n", + " \n", + " This is the main entry point from the original codebase. It creates configuration,\n", + " initializes the processor, and runs the complete pipeline.\n", + " \n", + " Args:\n", + " input_files: List of input files to process\n", + " subset_sizes: List of subset sizes (floats 0-1 for percentage, ints for absolute count)\n", + " testing_mode: If True, allows CPU usage (for testing only)\n", + " **kwargs: Additional configuration parameters (e.g., output_dir, epsilon, num_folds)\n", + " \"\"\"\n", + " # Get system's available GPU count\n", + " available_gpus = get_default_num_gpus(testing_mode=testing_mode)\n", + "\n", + " # Create configuration groups\n", + " basic_config = BasicConfig()\n", + " encoder_config = EncoderConfig(testing_mode=testing_mode)\n", + " template_config = TemplateConfig()\n", + " system_config = SystemConfig(testing_mode=testing_mode)\n", + "\n", + " # Update configuration groups from kwargs\n", + " for key, value in kwargs.items():\n", + " if hasattr(basic_config, key):\n", + " setattr(basic_config, key, value)\n", + " elif hasattr(encoder_config, key):\n", + " setattr(encoder_config, key, value)\n", + " elif hasattr(template_config, key):\n", + " setattr(template_config, key, value)\n", + " elif hasattr(system_config, key):\n", + " setattr(system_config, key, value)\n", + "\n", + " # Ensure num_gpus doesn't exceed available GPUs\n", + " if system_config.num_gpus > available_gpus:\n", + " logger.warning(\n", + " f\"Requested {system_config.num_gpus} GPUs but only {available_gpus} available. \"\n", + " f\"Falling back to using {available_gpus} GPUs.\"\n", + " )\n", + " system_config.num_gpus = available_gpus\n", + "\n", + " # Create configuration\n", + " config = ProcessingConfig(\n", + " input_files=input_files,\n", + " subset_sizes=subset_sizes,\n", + " basic=basic_config,\n", + " encoder=encoder_config,\n", + " template=template_config,\n", + " system=system_config,\n", + " )\n", + "\n", + " try:\n", + " logger.info(f\"Processing configuration: {config}\")\n", + " processor = DataProcessor(config)\n", + " processor.process_files(input_files, config.basic.output_dir)\n", + "\n", + " except Exception as e:\n", + " logger.error(f\"Processing failed: {str(e)}\")\n", + " raise\n", + "\n", + " finally:\n", + " # Cleanup\n", + " gc.collect()\n", + " if torch.cuda.is_available():\n", + " torch.cuda.empty_cache()\n", + "\n", + "\n", + "print(\"āœ… subset_datasets() function defined!\")" + ] + }, + { + "cell_type": "markdown", + "id": "5d88e141", + "metadata": {}, + "source": [ + "## šŸš€ Execute Subset Selection\n", + "\n", + "**Choose ONE of the two options below:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6f5d6e8", + "metadata": {}, + "outputs": [], + "source": [ + "## ==================== OPTION 1: USE EXISTING EMBEDDINGS (RECOMMENDED) ====================\n", + "##\n", + "## āœ… This is the FAST approach - it uses embeddings already generated in Notebook 2\n", + "## ⚔ Takes only minutes (vs hours for full pipeline)\n", + "## šŸ’¾ Loads embeddings from: data-preparation-and-config/output/embeddings/embeddings.h5\n", + "\n", + "## **Requirements:**\n", + "## - Notebook 2 must be run first\n", + "## - Embeddings file must exist at the specified path\n", + "##\n", + "## Uncomment the code below to run:\n", + "\n", + "_option_1_success = False\n", + "try:\n", + " run_subset_selection_only(\n", + " embeddings_file='../../assets/subset-selection/outputs/embeddings/embeddings.h5', # From Notebook 2\n", + " dataset=dataset, # Loaded from Notebook 1\n", + " output_dir=\"../../assets/subset-selection/outputs\", # change this to your desired output directory\n", + " subset_sizes=[0.1, 0.05], # 10% and 5% subsets\n", + " dataset_name='combined_cut_50x',\n", + " num_folds=25,\n", + " epsilon=0.1,\n", + " testing_mode=True,\n", + " )\n", + " _option_1_success = True\n", + "except Exception as e:\n", + " print(f\"āŒ Error: {e}\")\n", + " import traceback\n", + " traceback.print_exc()\n", + "\n", + "\n", + "## ==================== OPTION 2: FULL PIPELINE FROM SCRATCH (SLOW) ====================\n", + "##\n", + "## šŸ”„ This regenerates embeddings even if they already exist\n", + "## ā° Takes hours for large datasets\n", + "## šŸŽÆ Use this only if running notebook 3 independently without notebook 2\n", + "##\n", + "## Uncomment the code below to run:\n", + "\n", + "_option_2_success = False\n", + "# try:\n", + "# subset_datasets(\n", + "# input_files=['data/combined_cut_50x.jsonl'],\n", + "# subset_sizes=[0.1, 0.05], # 10% and 5% subsets\n", + "# testing_mode=True,\n", + "# output_dir='data/output',\n", + "# num_folds=25,\n", + "# epsilon=0.1\n", + "# )\n", + "# _option_2_success = True\n", + "# except Exception as e:\n", + "# print(f\"āŒ Error: {e}\")\n", + "# import traceback\n", + "# traceback.print_exc()\n", + "\n", + "\n", + "## ==================== EXECUTION STATUS ====================\n", + "# Smart status reporting based on what actually ran\n", + "\n", + "if _option_1_success or _option_2_success:\n", + " # Success case - at least one option ran successfully\n", + " print(\"\\n\" + \"=\"*70)\n", + " print(\"āœ… SUBSET SELECTION WORKFLOW COMPLETED SUCCESSFULLY!\")\n", + " print(\"=\"*70)\n", + " if _option_1_success:\n", + " print(\"šŸ“ Executed: Option 1 (Using existing embeddings)\")\n", + " if _option_2_success:\n", + " print(\"šŸ“ Executed: Option 2 (Full pipeline from scratch)\")\n", + " \n", + "else:\n", + " # No option ran successfully or both are commented out\n", + " print(\"\\n\" + \"=\"*70)\n", + " print(\"āš ļø NO EXECUTION OPTION SELECTED OR ALL OPTIONS FAILED\")\n", + " print(\"=\"*70)\n", + " print(\"\\nšŸ“‹ Current Status:\")\n", + " print(\" • Option 1: Active but failed (check errors above)\" if not _option_1_success and '_option_1_success' in locals() else \" • Option 1: Commented out\")\n", + " print(\" • Option 2: Commented out\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81b22329", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.23" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}