Source code for dlordinal.datasets.adience

import re
import sys
import tarfile
from pathlib import Path
from typing import Callable, Optional, Union

import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import StratifiedShuffleSplit
from torchvision.datasets.vision import VisionDataset
from tqdm import tqdm


[docs] class Adience(VisionDataset): """ Base class for the Adience dataset. Parameters ---------- root : Union[str, Path] Root directory where the datasets are stored. The Adience dataset is expected to be located under the `adience` directory inside the root directory. In the `adience` directory, the following files are expected: 1) `aligned.tar.gz`: a tar.gz file containing the images; 2) `folds`: a directory containing the folds. Each fold is expected to be a file named `fold_{f}_data.txt`, where `f` is the fold number starting from 0. These files can be downloaded from the Adience website (https://talhassner.github.io/home/projects/Adience/Adience-data.html) ranges : list, optional List of age ranges to use, by default [(0, 2), (4, 6), (8, 13), (15, 20), (25, 32), (38, 43), (48, 53), (60, 100)]. test_size : float, optional, default = 0.2 Test size. transform : Callable, optional A callable that takes in an PIL image and returns a transformed version. target_transform : Callable, optional A callable that takes in the target and transforms it. verbose : bool, optional, default = False Whether to print progress messages. Attributes ---------- root : Path Root directory where the datasets are stored. train : bool Whether to use the training or test partition. ranges : list List of age ranges to use to define the categories. test_size : float Percentage of the dataset to use for testing. transform : Callable A callable that takes in an PIL image and returns a transformed version. target_transform : Callable A callable that takes in the target and transforms it. verbose : bool Whether to print progress messages. data : list List of image paths. targets : list Contains the target of each sampel contained in the dataset. classes : list Unique classes in the dataset. """ root: Path train: bool ranges: list test_size: float transform: Optional[Callable] target_transform: Optional[Callable] verbose: bool data: list targets: list classes: list def __init__( self, root: Union[str, Path], train: bool = True, ranges: list = [ (0, 2), (4, 6), (8, 13), (15, 20), (25, 32), (38, 43), (48, 53), (60, 100), ], test_size: float = 0.2, transform: Optional[Callable] = None, target_transform: Optional[Callable] = None, verbose: bool = False, ) -> None: super().__init__() self.root = Path(root) self.train = train self.ranges = ranges self.test_size = test_size self.transform = transform self.target_transform = target_transform self.verbose = verbose self.data = [] self.targets = [] self.classes = [] self.root_adience_ = self.root / "adience" self.data_file_path_ = self.root_adience_ / "aligned.tar.gz" self.folds_path_ = self.root_adience_ / "folds" self.images_path_ = self.root_adience_ / "aligned" self.transformed_images_path_ = self.root_adience_ / "transformed" if self.train: self.partition_path_ = self.root_adience_ / "train" else: self.partition_path_ = self.root_adience_ / "test" if not self._check_input_files(): raise FileNotFoundError( "Some input files are missing. Please, check the documentation of the" " root parameter to see the expected directory structure." ) self.folds_ = [ pd.read_csv(self.folds_path_ / f"fold_{f}_data.txt", sep="\t") for f in range(5) ] self._extract_data() self._process_and_split(self.folds_) self._load_data() def _check_input_files(self) -> bool: """ Check if the input files are present. """ result = self.data_file_path_.exists() and self.folds_path_.exists() for i in range(5): result = result and (self.folds_path_ / f"fold_{i}_data.txt").exists() return result def _check_if_extracted(self) -> bool: """ Check if the tar.gz file has been extracted. """ path = self.data_file_path_.parent path = path / "aligned" return path.exists() def _check_if_transformed(self) -> bool: """ Check if the images have been transformed. """ return self.transformed_images_path_.exists() def _check_if_partitioned(self) -> bool: """ Check if the images have been partitioned. """ return self.partition_path_.exists() def _extract_data(self): """ Extract the data tar.gz file. """ if self._check_if_extracted(): if self.verbose: print("File already extracted.") return print("Extracting file...") with tarfile.open(self.data_file_path_, "r:gz") as file: path = self.data_file_path_.parent path.mkdir(exist_ok=True, parents=True) if sys.version_info >= (3, 12): file.extractall(path, members=_track_progress(file), filter="data") else: file.extractall(path, members=_track_progress(file)) def _process_and_split(self, folds: list) -> None: """ Process the folds and split the images into partitions. Parameters ---------- folds : list List of folds. """ is_transformed = self._check_if_transformed() is_partitioned = self._check_if_partitioned() if is_transformed and is_partitioned: if self.verbose: print("Files already transformed and partitioned.") return fold_dfs = list() for f, fold in enumerate(folds): notna = fold["age"].notna() n_discarded = (~notna).sum() if self.verbose: print( f"Fold {f}: discarding {n_discarded} entries" f" ({(n_discarded / len(fold)) * 100:.1f}%)" ) fold = fold.loc[notna] fold = fold.assign(age=fold["age"].map(self._assign_range)) fold = fold.dropna(subset=["age"]) fold = fold.assign(age=fold["age"].astype(int)) fold_dfs.append( pd.DataFrame( dict( path=fold.apply(_image_path_from_row, axis="columns"), age=fold["age"], ) ) ) self.df_: pd.DataFrame = pd.concat(fold_dfs, ignore_index=True) if is_transformed: if self.verbose: print("File already transformed.") else: self.transformed_images_path_.mkdir(exist_ok=True) if self.verbose: print("Resizing images...") for row in tqdm(self.df_.itertuples(), total=len(self.df_)): dst_image = self.transformed_images_path_ / row.path if dst_image.is_file(): continue src_image = self.images_path_ / row.path dst_image.parent.mkdir(exist_ok=True, parents=True) # open the source image with Image.open(src_image) as img: # calculate the new width that maintains the aspect ratio width_percent = 128 / float(img.size[1]) new_width = int((float(img.size[0]) * float(width_percent))) # resize the image using the calculated width and 128 height resized_img = img.resize((new_width, 128)) # save the resized image to the destination path resized_img.save(dst_image) if is_partitioned: if self.verbose: print("File already partitioned.") else: for c in range(len(self.ranges)): (self.partition_path_ / f"{c}").mkdir(parents=True, exist_ok=True) sss = StratifiedShuffleSplit( n_splits=1, test_size=self.test_size, random_state=0 ) train_index, test_index = next(sss.split(self.df_, self.df_["age"])) if self.train: name = "train" partition_df: pd.DataFrame = self.df_.iloc[train_index] else: name = "test" partition_df: pd.DataFrame = self.df_.iloc[test_index] for row in tqdm( partition_df.itertuples(), total=len(partition_df), leave=False, desc=name, ): image_path = self.transformed_images_path_ / row.path assert image_path.is_file() new_path = self.root_adience_ / f"{name}/{row.age}/{image_path.name}" if not new_path.exists(): new_path.symlink_to(image_path.resolve()) def _load_data(self): for cls in range(len(self.ranges)): path = self.partition_path_ / f"{cls}" for image_path in path.iterdir(): self.data.append(str(image_path)) self.targets.append(cls) self.classes = np.unique(self.targets).tolist() def _assign_range(self, age: str): """ Assign an age range to an age. Parameters ---------- age : str Age to assign a range to. """ m = re.match(r"\((\d+), *(\d+)\)", age) if m: age = (int(m.group(1)), int(m.group(2))) else: m = re.match(r"(\d+)", age) if m: age = int(m.group(0)) else: return None if age in self.ranges: return self.ranges.index(age) if isinstance(age, tuple): age_minimum, age_maximum = age for i, (range_minimum, range_maximum) in enumerate(self.ranges): if (age_minimum >= range_minimum) and (age_maximum <= range_maximum): return i return None if isinstance(age, int): for i, (range_minimum, range_maximum) in enumerate(self.ranges): if (age >= range_minimum) and (age <= range_maximum): return i return None return None def __len__(self): """Returns the number of samples in the dataset. Returns ------- int Number of samples in the dataset. Raises ------ ValueError If the data and targets have different lengths. """ if len(self.data) != len(self.targets): raise ValueError("Data and targets have different lengths.") return len(self.data) def __getitem__(self, index): """Returns the image and the target associated with the sample at the given index. If a transform is provided, the image is transformed. If a target transform is provided, the target is transformed. Parameters ---------- index : int Index of the item to return. Returns ------- tuple Tuple containing the image and the target. """ image_path = self.data[index] target = self.targets[index] image = Image.open(image_path) if self.transform is not None: image = self.transform(image) if self.target_transform is not None: target = self.target_transform(target) return image, target
def _image_path_from_row(row): """ Get the image path from a row. Parameters ---------- row : pd.Series Row to get the image path from. """ return f'{row["user_id"]}/landmark_aligned_face.{row["face_id"]}.{row["original_image"]}' def _track_progress(file): """ Track the progress of the extraction. Parameters ---------- file : tarfile.TarFile File to track the progress of. """ for member in tqdm(file, total=len(file.getmembers())): # this will be the current file being extracted # Go over each member yield member