Source code for deepvisiontools.data.data_reader.readers

from typing import Union, Tuple, Dict, List, Any
from pathlib import Path
from abc import ABC, abstractmethod
from deepvisiontools.formats import (
    BaseFormat,
    BboxFormat,
    InstanceMaskFormat,
    SemanticMaskFormat,
)
from deepvisiontools.formats.base_data import (
    InstanceMaskData,
    BboxData,
    SemanticMaskData,
)
from deepvisiontools import Configuration
import torch
import numpy as np
from torch import Tensor
import json
import cv2
from tqdm import tqdm
from deepvisiontools.preprocessing import load_mask

ANNOTATION_TYPE_DICT = {
    "json": "file",
    "png": "mask",
    "tif": "mask",
    "tiff": "mask",
    "jpg": "mask",
    "jpeg": "mask",
}

SUPPORTED_IMAGE_EXTENSIONS = [
    "png",
    "PNG",
    "jpg",
    "JPG",
    "jpeg",
    "JPEG",
    "tif",
    "TIF",
    "tiff",
    "TIFF",
]


[docs] class BaseReader(ABC): """Base class for readers. __len__ and __getitem__ methods must be implemented in concrete class. Your concrete class must implement concrete category_id property that returns Dict[int, str] where int is label and str category name. Your concrete class must have a class attribute describing annotation file type ("json" for json file, "png" for image etc.) You must implement export_annotation and group_export methods in concrete classes See CocoReader class for concrete implementation """ @property def annotation_file_type(cls): raise NotImplementedError @abstractmethod def __getitem__(self, index: int) -> Tuple[str, BaseFormat]: """From a given index ([0, N-1] where N is the number of images) returns image name, Format Args: index (``int``): Index of image from 0 to N-1 Returns: ``Tuple[str, Format]``: - returns image name (ex: something.jpg), target as Format """ pass @abstractmethod def __len__(self) -> int: pass @abstractmethod def export_annotation(self, image: Tensor, annotation: BaseFormat): pass @abstractmethod def group_export( sub_anns_dir: Union[str, Path], destination: Union[str, Path], categories: Dict[int, str] = None, ): pass @property @abstractmethod def category_ids(self) -> Dict[int, str]: pass @category_ids.setter @abstractmethod def category_ids(self, val): pass
DEFAULT_COCO_ANNOT = "coco_annotations.json" DEFAULT_SEMANTIC_ANNOT_PATH = "masks" # TODO check everything class SemanticReader(BaseReader): annotation_file_type = "tiff" # used for export dataset only def __init__(self, dataset_path: Union[str, Path]): dataset_path = ( dataset_path if isinstance(dataset_path, Path) else Path(dataset_path) ) # load all files paths (images and masks) self.masks_path = dataset_path / DEFAULT_SEMANTIC_ANNOT_PATH images_path = dataset_path / "images" self.images_path = images_path self.images = list(images_path.glob("*")) self.images = [ f.name for f in self.images if f.suffix[1:] in SUPPORTED_IMAGE_EXTENSIONS ] self.images = sorted(self.images) self.masks = list(self.masks_path.glob("*")) self.masks = [ f for f in self.masks if f.suffix[1:] in SUPPORTED_IMAGE_EXTENSIONS ] self.masks = sorted(self.masks) assert len(self.masks) == len( self.images ), "Not same number of masks and images. You must have the same." self.category_ids = { i + 1: str(i + 1) for i in range(Configuration().num_classes) } @property def category_ids(self): return self._category_ids @category_ids.setter def category_ids(self, val: Dict[int, str]): self._category_ids = val def __getitem__(self, index): img_name = self.images[index] assert ( Path(img_name).stem == self.masks[index].stem ), f"In Reader: index {index} leads to different name for image and mask, got {img_name} and {self.masks[index].name}" target = load_mask(self.masks[index]) target = SemanticMaskData(target) target = SemanticMaskFormat(target) return img_name, target def __len__(self): return len(self.images) def export_annotation( self, image_name, image, annotation: SemanticMaskFormat, cats ): assert isinstance( annotation, SemanticMaskFormat ), "In semanticreader : annotation must be SemanticMaskFormat" mask = annotation.data.value mask = mask.to("cpu") return image_name, mask def group_export( self, sub_anns_dir: Union[str, Path], destination: Union[str, Path], categories: Dict[int, str] = None, ): destination = "masks" sub_anns_dir.rename(sub_anns_dir.parent / destination)
[docs] class CocoReader(BaseReader): """Child class of BaseReader. Coco format reader class. Handles dataset with structure: Dataset Name -> Image_dir, coco_annotations.json Note : bboxes must be in XYWH format Args: annotation_path (Union[str, Path]): path to json file or to dataset directory. Attributes ---------- Attributes: - annot_dict (``Dict[Any, Any]``): coco dict loaded. Attributes ---------- Properties: - category_ids Dict[int, str]: label / category correspondance **Methods** """ annotation_file_type = "json" def __init__(self, annotation_path: Union[str, Path]) -> None: annotation_path = ( annotation_path if isinstance(annotation_path, Path) else Path(annotation_path) ) if annotation_path.is_dir(): annotation_path = annotation_path / DEFAULT_COCO_ANNOT with open(annotation_path) as f: self.annot_dict = json.load(f) # if items of annot_dict are dict convert them to lists if isinstance(self.annot_dict["categories"], dict): self.annot_dict["categories"] = [v for v in self.annot_dict["categories"]] if isinstance(self.annot_dict["images"], dict): self.annot_dict["images"] = [v for v in self.annot_dict["images"]] if isinstance(self.annot_dict["annotations"], dict): self.annot_dict["annotations"] = [v for v in self.annot_dict["annotations"]] # compute reindexer dict for imgs self._imgs_reindex_dict = { i: im["id"] for i, im in enumerate(self.annot_dict["images"]) } # compute reindexer dict for labels / categories ordered_keys = [cat["name"] for cat in self.annot_dict["categories"]] ordered_keys.sort() self._category_converter = {} for i, name in enumerate(ordered_keys): for cat in self.annot_dict["categories"]: if name == cat["name"]: self._category_converter.update({i: cat}) self._label_converter = { k["id"]: i for i, k in self._category_converter.items() } # generate a correspondance between new labels and cat name self.category_ids = {i: k["name"] for i, k in self._category_converter.items()} @property def category_ids(self): return self._category_ids @category_ids.setter def category_ids(self, val: Dict[int, str]): self._category_ids = val
[docs] def get_img_anns(self, index: int) -> Tuple[str, Tuple[int, int], List[dict]]: """return from index image as img name, spatial size as Tuple[int, int] (h, w) and all annotations for given image index Args: index (int) Returns: Tuple[str, Tuple[int, int], List[dict]]: img_name, spatial_size, list of coco anns """ image = [ img for img in self.annot_dict["images"] if img["id"] == self._imgs_reindex_dict[index] ] spatial_size = (image[0]["height"], image[0]["width"]) image = image[0]["file_name"] anns = [ ann for ann in self.annot_dict["annotations"] if ann["image_id"] == self._imgs_reindex_dict[index] ] return image, spatial_size, anns
def __getitem__(self, index: int) -> Tuple[str, BaseFormat]: """From a given index ([0, N-1] where N is the number of images) returns image name, Format Args: index (``int``): Index of image from 0 to N-1 Returns: ``Tuple[str, Format]``: - returns image name (ex: something.jpg), target as Format """ image_name, spatial_size, anns = self.get_img_anns(index) # handle empty anns if anns == []: if Configuration().data_type == "bbox": format = BboxFormat.empty(spatial_size) if Configuration().data_type == "instance_mask": format = InstanceMaskFormat.empty(spatial_size) elif Configuration().data_type == "bbox": labels = torch.tensor( [ self._label_converter[ann["category_id"]] for ann in anns if "bbox" in ann.keys() ] ) bboxes = [ann["bbox"] for ann in anns if "bbox" in ann.keys()] # handle empty if bboxes == []: format = BboxFormat.empty(spatial_size) else: bboxes = [torch.tensor(box)[None, :] for box in bboxes] bboxes = BboxData( torch.cat(bboxes), format="XYWH", canvas_size=spatial_size ) format = BboxFormat(bboxes, labels) elif Configuration().data_type == "instance_mask": labels = torch.tensor( [ self._label_converter[ann["category_id"]] for ann in anns if "segmentation" in ann.keys() ] ) if labels == []: format = InstanceMaskFormat.empty(spatial_size) else: object_masks = [ self.segment2mask(ann, spatial_size) for ann in anns if "segmentation" in ann.keys() ] # reindex objects and stack them with id in [1 ... N_obj] range object_masks = [m * (i + 1) for i, m in enumerate(object_masks)] mask = np.max(np.stack(object_masks, axis=0), axis=0) mask = torch.tensor(mask).long() format = InstanceMaskFormat(InstanceMaskData(mask), labels) return image_name, format
[docs] def segment2mask( self, ann: Dict[Any, Any], spatial_size: Tuple[int, int] ) -> np.ndarray: """Convert segment to object mask. Args: ann (Dict[Any, Any]): coco format annotation dict spatial_size (Tuple[int, int]): size of image as (h, w) Returns: np.ndarray: mask """ segments = ann["segmentation"] # handle RLE coding if isinstance(segments, dict): assert ( "counts" in segments.keys() ), f"Segmentation {segments} is dict but not RLE encoded." mask: np.ndarray = self.rleToMask(segments, spatial_size) else: mask = np.zeros((spatial_size[0], spatial_size[1], 3)) segments = [ np.array(seg, dtype=np.int32).reshape(-1, 2) for seg in segments if len(seg) > 4 ] assert segments != [], "Empty annotation after filtering segment length" for poly in segments: cv2.drawContours(mask, [poly], -1, color=(1), thickness=-1) mask = mask[:, :, 0] mask: np.ndarray = mask.astype(np.uint64) return mask
[docs] def rleToMask(self, rle: str, shape: Tuple[int, int]) -> np.ndarray: """convert rle encoding to binary mask Args: rle (str) shape (Tuple[int, int]) Returns: np.ndarray: mask """ rle = rle["counts"] # force even number of elem if not len(rle) % 2 == 0: rle.append(0) width, height = shape[:2] mask = np.zeros(width * height).astype(np.uint64) array = np.asarray(rle) starts = array[0::2] lengths = array[1::2] current_position = 0 for i, start in enumerate(starts): current_position += start mask[current_position : int(current_position + lengths[i])] = 1 current_position += lengths[i] return mask.reshape(height, width).T
def __len__(self) -> int: return len(self.annot_dict["images"])
[docs] def export_annotation( self, image_name: str, image: Tensor, format: BaseFormat, categories: Dict[int, str], ) -> Tuple[str, Dict[Any, Any]]: """from image, image name, categories and target (as BaseFormat) returns a writeable coco dict. Args: image_name (str) image (Tensor) format (BaseFormat) categories (Dict[int, str]): Dict of label / category name correspondance Returns: Tuple[str, Dict[Any, Any]]: image_name, coco dict """ format.device = "cpu" images = [ { "file_name": image_name, "height": image.shape[-2], "width": image.shape[-1], "id": 0, } ] categories = [{"name": k, "id": v} for v, k in categories.items()] anns = [] for i in range(format.nb_object): obj, _ = format[i] # if no object, stop loop if obj.nb_object == 0: break # retrieve mask/box and label label = obj.labels.item() obj.device = "cpu" ann_dict = {"id": 0, "image_id": 0, "category_id": label} if isinstance(obj, InstanceMaskFormat): bboxes: BboxData = BboxData.from_mask(obj.data) bboxes.format = "XYWH" bboxes.device = "cpu" bboxes: Tensor = bboxes.value bboxes_coco = bboxes.numpy()[0] bboxes_coco = [int(b) for b in bboxes_coco] contours, _ = cv2.findContours( obj.data.value.numpy().astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE, ) contours = [c.reshape(-1).tolist() for c in contours] ann_dict.update({"bbox": bboxes_coco, "segmentation": contours}) elif isinstance(obj, BboxFormat): obj.data.format = "XYWH" bboxes = obj.data.value bboxes_coco = bboxes.numpy()[0] bboxes_coco = [int(b) for b in bboxes_coco] ann_dict.update({"bbox": bboxes_coco}) anns.append(ann_dict) coco_dict = {"categories": categories, "images": images, "annotations": anns} return image_name, coco_dict
def group_export( self, sub_anns_dir: Union[str, Path], destination: Union[str, Path], categories: Dict[int, str] = None, ): destination = destination / "coco_annotations.json" categories = self.category_ids if categories == None else categories """Final layer of export dataset function. Here looks at all individual json files and merge them to coco_annotations.json Args: sub_anns_dir (Union[str, Path]) destination (Union[str, Path]) categories (Dict[int, str]) """ categories = [{"name": k, "id": v} for v, k in categories.items()] grouped_dict = {"categories": categories, "images": [], "annotations": []} jsons_list = sub_anns_dir.glob("*.json") jsons_list = [json.as_posix() for json in jsons_list] img_idx = 0 ann_idx = 0 for j_file in tqdm(jsons_list, desc="Grouping jsons : "): with open(j_file) as f: d = json.load(f) d["images"][0]["id"] = img_idx grouped_dict["images"].append(d["images"][0]) for i in range(len(d["annotations"])): d["annotations"][i]["id"] = ann_idx d["annotations"][i]["image_id"] = img_idx grouped_dict["annotations"].append(d["annotations"][i]) ann_idx += 1 img_idx += 1 with open(destination, "w") as f: json.dump(grouped_dict, f)