diff --git a/lib/image.py b/lib/image.py index bac0668..2aed239 100644 --- a/lib/image.py +++ b/lib/image.py @@ -11,6 +11,7 @@ import sys from ast import literal_eval from bisect import bisect from concurrent import futures +from typing import Optional from zlib import crc32 import cv2 @@ -1422,7 +1423,7 @@ class ImagesSaver(ImageIO): executor.submit(self._save, *item) executor.shutdown() - def _save(self, filename, image): + def _save(self, filename: str, image: bytes, sub_folder: Optional[str]) -> None: """ Save a single image inside a ThreadPoolExecutor Parameters @@ -1430,21 +1431,28 @@ class ImagesSaver(ImageIO): filename: str The filename of the image to be saved. NB: Any folders passed in with the filename will be stripped and replaced with :attr:`location`. - image: numpy.ndarray - The image to be saved + image: bytes + The encoded image to be saved + subfolder: str or ``None`` + If the file should be saved in a subfolder in the output location, the subfolder should + be provided here. ``None`` for no subfolder. """ - filename = os.path.join(self.location, os.path.basename(filename)) + location = os.path.join(self.location, sub_folder) if sub_folder else self._location + if sub_folder and not os.path.exists(location): + os.makedirs(location) + + filename = os.path.join(location, os.path.basename(filename)) try: if self._as_bytes: with open(filename, "wb") as out_file: out_file.write(image) else: cv2.imwrite(filename, image) - logger.trace("Saved image: '%s'", filename) + logger.trace("Saved image: '%s'", filename) # type:ignore except Exception as err: # pylint: disable=broad-except logger.error("Failed to save image '%s'. Original Error: %s", filename, err) - def save(self, filename, image): + def save(self, filename: str, image: bytes, sub_folder: Optional[str] = None) -> None: """ Save the given image in the background thread Ensure that :func:`close` is called once all save operations are complete. @@ -1452,13 +1460,17 @@ class ImagesSaver(ImageIO): Parameters ---------- filename: str - The filename of the image to be saved - image: numpy.ndarray - The image to be saved + The filename of the image to be saved. NB: Any folders passed in with the filename + will be stripped and replaced with :attr:`location`. + image: bytes + The encoded image to be saved + subfolder: str, optional + If the file should be saved in a subfolder in the output location, the subfolder should + be provided here. ``None`` for no subfolder. Default: ``None`` """ self._set_thread() - logger.trace("Putting to save queue: '%s'", filename) - self._queue.put((filename, image)) + logger.trace("Putting to save queue: '%s'", filename) # type:ignore + self._queue.put((filename, image, sub_folder)) def close(self): """ Signal to the Save Threads that they should be closed and cleanly shutdown diff --git a/plugins/extract/_config.py b/plugins/extract/_config.py index d3151a2..1080a93 100644 --- a/plugins/extract/_config.py +++ b/plugins/extract/_config.py @@ -74,3 +74,11 @@ class Config(FaceswapConfig): info="Filters out faces who's landmarks are above this distance from an 'average' " "face. Values above 16 tend to be fairly safe. Values above 10 will remove more " "false positives, but may also filter out some faces at extreme angles.") + self.add_item( + section=section, + title="save_filtered", + datatype=bool, + default=False, + group="filters", + info="If enabled, saves any filtered out images into a sub-folder during the " + "extraction process. If disabled, filtered faces are deleted.") diff --git a/plugins/extract/align/_base.py b/plugins/extract/align/_base.py index 06e6854..d5c1805 100644 --- a/plugins/extract/align/_base.py +++ b/plugins/extract/align/_base.py @@ -122,7 +122,8 @@ class Aligner(Extractor): # pylint:disable=abstract-method self._output_faces: List[DetectedFace] = [] self._filter = AlignedFilter(min_scale=self.config["aligner_min_scale"], max_scale=self.config["aligner_max_scale"], - distance=self.config["aligner_distance"]) + distance=self.config["aligner_distance"], + save_output=self.config["save_filtered"]) logger.debug("Initialized %s", self.__class__.__name__) def set_normalize_method(self, @@ -211,7 +212,8 @@ class Aligner(Extractor): # pylint:disable=abstract-method logger.debug(item) # type:ignore # TODO Move to end of process not beginning - self._filter.output_counts() + if exhausted: + self._filter.output_counts() return exhausted, batch @@ -277,10 +279,11 @@ class Aligner(Extractor): # pylint:disable=abstract-method if len(self._output_faces) != self._faces_per_filename[filename]: continue - self._output_faces = self._filter(self._output_faces, min(frame.shape[:2])) + self._output_faces, folders = self._filter(self._output_faces, min(frame.shape[:2])) output = self._extract_media.pop(filename) output.add_detected_faces(self._output_faces) + output.add_sub_folders(folders) self._output_faces = [] logger.trace("Final Output: (filename: '%s', image shape: %s, " # type:ignore @@ -524,21 +527,31 @@ class AlignedFilter(): max_scale: float Filters out faces that have been aligned at above this value as a multiplier of the minimum frame dimension. Set to ``0`` for off. - distance: float: + distance: float Filters out faces that are further than this distance from an "average" face. Set to ``0`` for off. + save_output: bool + ``True`` if the filtered faces should be kept as they are being saved. ``False`` if they + should be deleted """ - def __init__(self, min_scale: float, max_scale: float, distance: float): - logger.debug("Initializing %s: (min_scale: %s, max_scale: %s, distance: %s)", - self.__class__.__name__, min_scale, max_scale, distance) + def __init__(self, + min_scale: float, + max_scale: float, + distance: float, + save_output: bool) -> None: + logger.debug("Initializing %s: (min_scale: %s, max_scale: %s, distance: %s, " + "save_output: %s)", self.__class__.__name__, min_scale, max_scale, distance, + save_output) self._min_scale = min_scale self._max_scale = max_scale self._distance = distance / 100. + self._save_output = save_output self._active = max_scale > 0.0 or min_scale > 0.0 or distance > 0.0 self._counts: Dict[str, int] = dict(min_scale=0, max_scale=0, distance=0) logger.debug("Initialized %s: ", self.__class__.__name__) - def __call__(self, faces: List[DetectedFace], minimum_dimension: int) -> List[DetectedFace]: + def __call__(self, faces: List[DetectedFace], minimum_dimension: int + ) -> Tuple[List[DetectedFace], List[Optional[str]]]: """ Apply the filter to the incoming batch Parameters @@ -550,32 +563,45 @@ class AlignedFilter(): Returns ------- - list - The filtered list of detected face objects - + detected_faces: list + The filtered list of detected face objects, if saving filtered faces has not been + selected or the full list of detected faces + sub_folders: list + List of ``Nones`` if saving filtered faces has not been selected or list of ``Nones`` + and sub folder names corresponding the filtered face location """ + sub_folders: List[Optional[str]] = [None for _ in range(len(faces))] if not self._active: - return faces + return faces, sub_folders max_size = minimum_dimension * self._max_scale min_size = minimum_dimension * self._min_scale retval: List[DetectedFace] = [] - for face in faces: + for idx, face in enumerate(faces): test = AlignedFace(landmarks=face.landmarks_xy, centering="face") if self._min_scale > 0.0 or self._max_scale > 0.0: roi = test.original_roi size = ((roi[1][0] - roi[0][0]) ** 2 + (roi[1][1] - roi[0][1]) ** 2) ** 0.5 if self._min_scale > 0.0 and size < min_size: self._counts["min_scale"] += 1 + if self._save_output: + retval.append(face) + sub_folders[idx] = "_align_filt_min_scale" continue if self._max_scale > 0.0 and size > max_size: self._counts["max_scale"] += 1 + if self._save_output: + retval.append(face) + sub_folders[idx] = "_align_filt_max_scale" continue if 0.0 < self._distance < test.average_distance: self._counts["distance"] += 1 + if self._save_output: + retval.append(face) + sub_folders[idx] = "_align_filt_distance" continue retval.append(face) - return retval + return retval, sub_folders def output_counts(self): """ Output the counts of filtered items """ diff --git a/plugins/extract/pipeline.py b/plugins/extract/pipeline.py index ab63e87..87b1b10 100644 --- a/plugins/extract/pipeline.py +++ b/plugins/extract/pipeline.py @@ -754,6 +754,7 @@ class ExtractMedia(): self._detected_faces: List["DetectedFace"] = ([] if detected_faces is None else detected_faces) self._frame_metadata: Dict[str, Any] = {} + self._sub_folders: List[Optional[str]] = [] @property def filename(self) -> str: @@ -795,6 +796,13 @@ class ExtractMedia(): assert self._frame_metadata is not None return self._frame_metadata + @property + def sub_folders(self) -> List[Optional[str]]: + """ list: The sub_folders that the faces should be output to. Used when binning filter + output is enabled. The list corresponds to the list of detected faces + """ + return self._sub_folders + def get_image_copy(self, color_format: Literal["BGR", "RGB", "GRAY"]) -> "np.ndarray": """ Get a copy of the image in the requested color format. @@ -826,6 +834,19 @@ class ExtractMedia(): [(face.left, face.right, face.top, face.bottom) for face in faces]) self._detected_faces = faces + def add_sub_folders(self, folders: List[Optional[str]]) -> None: + """ Add detected faces to the object. Called at the end of each extraction phase. + + Parameters + ---------- + folders: list + A list of str sub folder names or ``None`` if no sub folder is required. Should + correspond to the detected faces list + """ + logger.trace("Adding sub folders for filename: '%s'. " # type: ignore + "(folders: %s)", self._filename, folders,) + self._sub_folders = folders + def remove_image(self) -> None: """ Delete the image and reset :attr:`image` to ``None``. diff --git a/scripts/extract.py b/scripts/extract.py index 40638b5..bb3aca3 100644 --- a/scripts/extract.py +++ b/scripts/extract.py @@ -414,7 +414,8 @@ class _Extract(): # pylint:disable=too-few-public-methods image = encode_image(face.aligned.face, extension, metadata=meta) if saver is not None: - saver.save(output_filename, image) + sub_folder = extract_media.sub_folders[idx] + saver.save(output_filename, image, sub_folder) final_faces.append(face.to_alignment()) self._alignments.data[os.path.basename(extract_media.filename)] = dict(faces=final_faces) del extract_media