FFCV

Source code for ffcv.fields.rgb_image

from abc import ABCMeta, abstractmethod
from dataclasses import replace
from typing import Optional, Callable, TYPE_CHECKING, Tuple, Type

import cv2
import numpy as np
from numba.typed import Dict
from PIL.Image import Image

from .base import Field, ARG_TYPE
from ..pipeline.operation import Operation
from ..pipeline.state import State
from ..pipeline.compiler import Compiler
from ..pipeline.allocation_query import AllocationQuery
from ..libffcv import imdecode, memcpy, resize_crop

if TYPE_CHECKING:
    from ..memory_managers.base import MemoryManager
    from ..reader import Reader

IMAGE_MODES = Dict()
IMAGE_MODES['jpg'] = 0
IMAGE_MODES['raw'] = 1


def encode_jpeg(numpy_image, quality):
    numpy_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)
    success, result = cv2.imencode('.jpg', numpy_image,
                                   [int(cv2.IMWRITE_JPEG_QUALITY), quality])

    if not success:
        raise ValueError("Impossible to encode image in jpeg")

    return result.reshape(-1)


def resizer(image, target_resolution):
    if target_resolution is None:
        return image
    original_size = np.array([image.shape[1], image.shape[0]])
    ratio = target_resolution / original_size.max()
    if ratio < 1:
        new_size = (ratio * original_size).astype(int)
        image = cv2.resize(image, tuple(new_size), interpolation=cv2.INTER_AREA)
    return image


def get_random_crop(height, width, scale, ratio):
    area = height * width
    log_ratio = np.log(ratio)
    for _ in range(10):
        target_area = area * np.random.uniform(scale[0], scale[1])
        aspect_ratio = np.exp(np.random.uniform(log_ratio[0], log_ratio[1]))
        w = int(round(np.sqrt(target_area * aspect_ratio)))
        h = int(round(np.sqrt(target_area / aspect_ratio)))
        if 0 < w <= width and 0 < h <= height:
            i = int(np.random.uniform(0, height - h + 1))
            j = int(np.random.uniform(0, width - w + 1))
            return i, j, h, w
    in_ratio = float(width) / float(height)
    if in_ratio < min(ratio):
        w = width
        h = int(round(w / min(ratio)))
    elif in_ratio > max(ratio):
        h = height
        w = int(round(h * max(ratio)))
    else:
        w = width
        h = height
    i = (height - h) // 2
    j = (width - w) // 2
    return i, j, h, w


def get_center_crop(height, width, _, ratio):
    s = min(height, width)
    c = int(ratio * s)
    delta_h = (height - c) // 2
    delta_w = (width - c) // 2

    return delta_h, delta_w, c, c


[docs]class SimpleRGBImageDecoder(Operation): """Most basic decoder for the :class:`~ffcv.fields.RGBImageField`. It only supports dataset with constant image resolution and will simply read (potentially decompress) and pass the images as is. """ def __init__(self): super().__init__()
[docs] def declare_state_and_memory(self, previous_state: State) -> Tuple[State, AllocationQuery]: widths = self.metadata['width'] heights = self.metadata['height'] max_width = widths.max() max_height = heights.max() min_height = heights.min() min_width = widths.min() if min_width != max_width or max_height != min_height: msg = """SimpleRGBImageDecoder only supports constant image, consider RandomResizedCropRGBImageDecoder or CenterCropRGBImageDecoder instead.""" raise TypeError(msg) biggest_shape = (max_height, max_width, 3) my_dtype = np.dtype('<u1') return ( replace(previous_state, jit_mode=True, shape=biggest_shape, dtype=my_dtype), AllocationQuery(biggest_shape, my_dtype) )
[docs] def generate_code(self) -> Callable: mem_read = self.memory_read imdecode_c = Compiler.compile(imdecode) jpg = IMAGE_MODES['jpg'] raw = IMAGE_MODES['raw'] my_range = Compiler.get_iterator() my_memcpy = Compiler.compile(memcpy) def decode(batch_indices, destination, metadata, storage_state): for dst_ix in my_range(len(batch_indices)): source_ix = batch_indices[dst_ix] field = metadata[source_ix] image_data = mem_read(field['data_ptr'], storage_state) height, width = field['height'], field['width'] if field['mode'] == jpg: imdecode_c(image_data, destination[dst_ix], height, width, height, width, 0, 0, 1, 1, False, False) else: my_memcpy(image_data, destination[dst_ix]) return destination[:len(batch_indices)] decode.is_parallel = True return decode
class ResizedCropRGBImageDecoder(SimpleRGBImageDecoder, metaclass=ABCMeta): """Abstract decoder for :class:`~ffcv.fields.RGBImageField` that performs a crop and and a resize operation. It supports both variable and constant resolution datasets. """ def __init__(self, output_size): super().__init__() self.output_size = output_size def declare_state_and_memory(self, previous_state: State) -> Tuple[State, AllocationQuery]: widths = self.metadata['width'] heights = self.metadata['height'] # We convert to uint64 to avoid overflows self.max_width = np.uint64(widths.max()) self.max_height = np.uint64(heights.max()) output_shape = (self.output_size[0], self.output_size[1], 3) my_dtype = np.dtype('<u1') return ( replace(previous_state, jit_mode=True, shape=output_shape, dtype=my_dtype), (AllocationQuery(output_shape, my_dtype), AllocationQuery((self.max_height * self.max_width * np.uint64(3),), my_dtype), ) ) def generate_code(self) -> Callable: jpg = IMAGE_MODES['jpg'] mem_read = self.memory_read my_range = Compiler.get_iterator() imdecode_c = Compiler.compile(imdecode) resize_crop_c = Compiler.compile(resize_crop) get_crop_c = Compiler.compile(self.get_crop_generator) scale = self.scale ratio = self.ratio if isinstance(scale, tuple): scale = np.array(scale) if isinstance(ratio, tuple): ratio = np.array(ratio) def decode(batch_indices, my_storage, metadata, storage_state): destination, temp_storage = my_storage for dst_ix in my_range(len(batch_indices)): source_ix = batch_indices[dst_ix] field = metadata[source_ix] image_data = mem_read(field['data_ptr'], storage_state) height = np.uint32(field['height']) width = np.uint32(field['width']) if field['mode'] == jpg: temp_buffer = temp_storage[dst_ix] imdecode_c(image_data, temp_buffer, height, width, height, width, 0, 0, 1, 1, False, False) selected_size = 3 * height * width temp_buffer = temp_buffer.reshape(-1)[:selected_size] temp_buffer = temp_buffer.reshape(height, width, 3) else: temp_buffer = image_data.reshape(height, width, 3) i, j, h, w = get_crop_c(height, width, scale, ratio) resize_crop_c(temp_buffer, i, i + h, j, j + w, destination[dst_ix]) return destination[:len(batch_indices)] decode.is_parallel = True return decode @property @abstractmethod def get_crop_generator(): raise NotImplementedError
[docs]class RandomResizedCropRGBImageDecoder(ResizedCropRGBImageDecoder): """Decoder for :class:`~ffcv.fields.RGBImageField` that performs a Random crop and and a resize operation. It supports both variable and constant resolution datasets. Parameters ---------- output_size : Tuple[int] The desired resized resolution of the images scale : Tuple[float] The range of possible ratios (in area) than can randomly sampled ratio : Tuple[float] The range of potential aspect ratios that can be randomly sampled """ def __init__(self, output_size, scale=(0.08, 1.0), ratio=(0.75, 4/3)): super().__init__(output_size) self.scale = scale self.ratio = ratio self.output_size = output_size @property def get_crop_generator(self): return get_random_crop
[docs]class CenterCropRGBImageDecoder(ResizedCropRGBImageDecoder): """Decoder for :class:`~ffcv.fields.RGBImageField` that performs a center crop followed by a resize operation. It supports both variable and constant resolution datasets. Parameters ---------- output_size : Tuple[int] The desired resized resolution of the images ratio: float ratio of (crop size) / (min side length) """ # output size: resize crop size -> output size def __init__(self, output_size, ratio): super().__init__(output_size) self.scale = None self.ratio = ratio @property def get_crop_generator(self): return get_center_crop
[docs]class RGBImageField(Field): """ A subclass of :class:`~ffcv.fields.Field` supporting RGB image data. Parameters ---------- write_mode : str, optional How to write the image data to the dataset file. Should be either 'raw' (``uint8`` pixel values), 'jpg' (compress to JPEG format), 'smart' (decide between saving pixel values and JPEG compressing based on image size), and 'proportion' (JPEG compress a random subset of the data with size specified by the ``compress_probability`` argument). By default: 'raw'. max_resolution : int, optional If specified, will resize images to have maximum side length equal to this value before saving, by default None smart_threshold : int, optional When `write_mode='smart`, will compress an image if it would take more than `smart_threshold` times to use RAW instead of jpeg. jpeg_quality : int, optional The quality parameter for JPEG encoding (ignored for ``write_mode='raw'``), by default 90 compress_probability : float, optional Ignored unless ``write_mode='proportion'``; in the latter case it is the probability with which image is JPEG-compressed, by default 0.5. """ def __init__(self, write_mode='raw', max_resolution: int = None, smart_threshold: int = None, jpeg_quality: int = 90, compress_probability: float = 0.5) -> None: self.write_mode = write_mode self.smart_threshold = smart_threshold self.max_resolution = max_resolution self.jpeg_quality = int(jpeg_quality) self.proportion = compress_probability @property def metadata_type(self) -> np.dtype: return np.dtype([ ('mode', '<u1'), ('width', '<u2'), ('height', '<u2'), ('data_ptr', '<u8'), ])
[docs] def get_decoder_class(self) -> Type[Operation]: return SimpleRGBImageDecoder
[docs] @staticmethod def from_binary(binary: ARG_TYPE) -> Field: return RGBImageField()
[docs] def to_binary(self) -> ARG_TYPE: return np.zeros(1, dtype=ARG_TYPE)[0]
[docs] def encode(self, destination, image, malloc): if isinstance(image, Image): image = np.array(image) if not isinstance(image, np.ndarray): raise TypeError(f"Unsupported image type {type(image)}") if image.dtype != np.uint8: raise ValueError("Image type has to be uint8") if image.shape[2] != 3: raise ValueError(f"Invalid shape for rgb image: {image.shape}") assert image.dtype == np.uint8 image = resizer(image, self.max_resolution) write_mode = self.write_mode as_jpg = None if write_mode == 'smart': as_jpg = encode_jpeg(image, self.jpeg_quality) write_mode = 'raw' if self.smart_threshold is not None: if image.nbytes > self.smart_threshold: write_mode = 'jpg' elif write_mode == 'proportion': if np.random.rand() < self.proportion: write_mode = 'jpg' else: write_mode = 'raw' destination['mode'] = IMAGE_MODES[write_mode] destination['height'], destination['width'] = image.shape[:2] if write_mode == 'jpg': if as_jpg is None: as_jpg = encode_jpeg(image, self.jpeg_quality) destination['data_ptr'], storage = malloc(as_jpg.nbytes) storage[:] = as_jpg elif write_mode == 'raw': image_bytes = np.ascontiguousarray(image).view('<u1').reshape(-1) destination['data_ptr'], storage = malloc(image.nbytes) storage[:] = image_bytes else: raise ValueError(f"Unsupported write mode {self.write_mode}")