diff --git a/locales/es/LC_MESSAGES/tools.alignments.cli.mo b/locales/es/LC_MESSAGES/tools.alignments.cli.mo index 1783184..53299df 100644 Binary files a/locales/es/LC_MESSAGES/tools.alignments.cli.mo and b/locales/es/LC_MESSAGES/tools.alignments.cli.mo differ diff --git a/locales/es/LC_MESSAGES/tools.alignments.cli.po b/locales/es/LC_MESSAGES/tools.alignments.cli.po index 0287766..5407e4e 100644 --- a/locales/es/LC_MESSAGES/tools.alignments.cli.po +++ b/locales/es/LC_MESSAGES/tools.alignments.cli.po @@ -6,26 +6,26 @@ msgid "" msgstr "" "Project-Id-Version: faceswap.spanish\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2022-05-24 12:38+0100\n" -"PO-Revision-Date: 2022-05-24 12:41+0100\n" +"POT-Creation-Date: 2022-09-14 18:36+0100\n" +"PO-Revision-Date: 2022-09-14 18:38+0100\n" "Last-Translator: \n" "Language-Team: tokafondo\n" "Language: es_ES\n" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=UTF-8\n" "Content-Transfer-Encoding: 8bit\n" -"Generated-By: pygettext.py 1.5\n" -"X-Generator: Poedit 3.0\n" "Plural-Forms: nplurals=2; plural=(n != 1);\n" +"Generated-By: pygettext.py 1.5\n" +"X-Generator: Poedit 3.0.1\n" -#: tools/alignments/cli.py:15 +#: tools/alignments/cli.py:17 msgid "" "This command lets you perform various tasks pertaining to an alignments file." msgstr "" "Este comando le permite realizar varias tareas relacionadas con un archivo " "de alineación." -#: tools/alignments/cli.py:30 +#: tools/alignments/cli.py:32 msgid "" "Alignments tool\n" "This tool allows you to perform numerous actions on or using an alignments " @@ -36,16 +36,16 @@ msgstr "" "caras o una fuente de fotogramas, usando opcionalmente su correspondiente " "archivo de alineación." -#: tools/alignments/cli.py:41 +#: tools/alignments/cli.py:44 msgid " Must Pass in a frames folder/source video file (-fr)." msgstr "" " Debe indicar una carpeta de fotogramas o archivo de vídeo de origen (-fr)." -#: tools/alignments/cli.py:42 +#: tools/alignments/cli.py:45 msgid " Must Pass in a faces folder (-fc)." msgstr " Debe indicar una carpeta de caras (-fc)." -#: tools/alignments/cli.py:43 +#: tools/alignments/cli.py:46 msgid "" " Must Pass in either a frames folder/source video file OR afaces folder (-fr " "or -fc)." @@ -53,7 +53,7 @@ msgstr "" " Debe indicar una carpeta de fotogramas o archivo de vídeo de origen, o una " "carpeta de caras (-fr o -fc)." -#: tools/alignments/cli.py:45 +#: tools/alignments/cli.py:48 msgid "" " Must Pass in a frames folder/source video file AND a faces folder (-fr and -" "fc)." @@ -61,15 +61,15 @@ msgstr "" " Debe indicar una carpeta de fotogramas o archivo de vídeo de origen, y una " "carpeta de caras (-fr y -fc)." -#: tools/alignments/cli.py:47 +#: tools/alignments/cli.py:50 msgid " Use the output option (-o) to process results." msgstr " Usar la opción de salida (-o) para procesar los resultados." -#: tools/alignments/cli.py:55 tools/alignments/cli.py:94 +#: tools/alignments/cli.py:58 tools/alignments/cli.py:97 msgid "processing" msgstr "proceso" -#: tools/alignments/cli.py:57 +#: tools/alignments/cli.py:60 #, python-brace-format msgid "" "R|Choose which action you want to perform. NB: All actions require an " @@ -142,7 +142,7 @@ msgstr "" "L|'spatial': Realiza un filtrado espacial y temporal para suavizar las " "alineaciones (¡EXPERIMENTAL!)" -#: tools/alignments/cli.py:96 +#: tools/alignments/cli.py:99 msgid "" "R|How to output discovered items ('faces' and 'frames' only):\n" "L|'console': Print the list of frames to the screen. (DEFAULT)\n" @@ -158,37 +158,41 @@ msgstr "" "L|'move': Mueve los elementos descubiertos a una subcarpeta dentro del " "directorio de origen." -#: tools/alignments/cli.py:107 tools/alignments/cli.py:118 -#: tools/alignments/cli.py:125 +#: tools/alignments/cli.py:110 tools/alignments/cli.py:123 +#: tools/alignments/cli.py:130 tools/alignments/cli.py:149 msgid "data" msgstr "datos" -#: tools/alignments/cli.py:111 +#: tools/alignments/cli.py:114 msgid "" -"Full path to the alignments file to be processed. This is required for all " -"jobs except for 'from-faces' when the alignments file will be generated in " -"the specified faces folder." +"Full path to the alignments file to be processed. If you have input a " +"'frames_dir' and don't provide this option, the process will try to find the " +"alignments file at the default location. All jobs require an alignments file " +"with the exception of 'from-faces' when the alignments file will be " +"generated in the specified faces folder." msgstr "" -"Ruta completa del archivo de alineaciones a procesar. Esto es necesario para " -"todos los trabajos excepto para 'caras desde' cuando el archivo de " -"alineaciones se generará en la carpeta de caras especificada." +"Ruta completa al archivo de alineaciones a procesar. Si ingresó un " +"'frames_dir' y no proporciona esta opción, el proceso intentará encontrar el " +"archivo de alineaciones en la ubicación predeterminada. Todos los trabajos " +"requieren un archivo de alineaciones con la excepción de 'from-faces' cuando " +"el archivo de alineaciones se generará en la carpeta de caras especificada." -#: tools/alignments/cli.py:119 +#: tools/alignments/cli.py:124 msgid "Directory containing extracted faces." msgstr "Directorio que contiene las caras extraídas." -#: tools/alignments/cli.py:126 +#: tools/alignments/cli.py:131 msgid "Directory containing source frames that faces were extracted from." msgstr "" "Directorio que contiene los fotogramas de origen de los que se extrajeron " "las caras." -#: tools/alignments/cli.py:135 tools/alignments/cli.py:146 -#: tools/alignments/cli.py:156 +#: tools/alignments/cli.py:140 tools/alignments/cli.py:164 +#: tools/alignments/cli.py:174 msgid "extract" msgstr "extracción" -#: tools/alignments/cli.py:136 +#: tools/alignments/cli.py:141 msgid "" "[Extract only] Extract every 'nth' frame. This option will skip frames when " "extracting faces. For example a value of 1 will extract faces from every " @@ -199,11 +203,22 @@ msgstr "" "caras de cada fotograma, un valor de 10 extraerá las caras de cada 10 " "fotogramas." -#: tools/alignments/cli.py:147 +#: tools/alignments/cli.py:150 +msgid "" +"R|If selected then:\n" +"L|'frames_folder' should be a parent folder containing multiple videos/" +"folders of images you need to work on.\n" +"L|'faces_folder' should be a parent folder containing multiple folders of " +"faces you wish to manage.\n" +"L|'alignments_file'. should be a parent folder containing multiple alignment " +"files." +msgstr "" + +#: tools/alignments/cli.py:165 msgid "[Extract only] The output size of extracted faces." msgstr "[Sólo extracción] El tamaño de salida de las caras extraídas." -#: tools/alignments/cli.py:157 +#: tools/alignments/cli.py:175 msgid "" "[Extract only] Only extract faces that have been resized by this percent or " "more to meet the specified extract size (`-sz`, `--size`). Useful for " diff --git a/locales/tools.alignments.cli.pot b/locales/tools.alignments.cli.pot index 985132c..5e73ef8 100644 --- a/locales/tools.alignments.cli.pot +++ b/locales/tools.alignments.cli.pot @@ -8,7 +8,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2022-05-24 12:38+0100\n" +"POT-Creation-Date: 2022-09-14 18:45+0100\n" "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" "Last-Translator: FULL NAME \n" "Language-Team: LANGUAGE \n" @@ -17,47 +17,47 @@ msgstr "" "Content-Type: text/plain; charset=CHARSET\n" "Content-Transfer-Encoding: 8bit\n" -#: tools/alignments/cli.py:15 +#: tools/alignments/cli.py:17 msgid "" "This command lets you perform various tasks pertaining to an alignments file." msgstr "" -#: tools/alignments/cli.py:30 +#: tools/alignments/cli.py:32 msgid "" "Alignments tool\n" "This tool allows you to perform numerous actions on or using an alignments " "file against its corresponding faceset/frame source." msgstr "" -#: tools/alignments/cli.py:41 +#: tools/alignments/cli.py:44 msgid " Must Pass in a frames folder/source video file (-fr)." msgstr "" -#: tools/alignments/cli.py:42 +#: tools/alignments/cli.py:45 msgid " Must Pass in a faces folder (-fc)." msgstr "" -#: tools/alignments/cli.py:43 +#: tools/alignments/cli.py:46 msgid "" " Must Pass in either a frames folder/source video file OR afaces folder (-fr " "or -fc)." msgstr "" -#: tools/alignments/cli.py:45 +#: tools/alignments/cli.py:48 msgid "" " Must Pass in a frames folder/source video file AND a faces folder (-fr and -" "fc)." msgstr "" -#: tools/alignments/cli.py:47 +#: tools/alignments/cli.py:50 msgid " Use the output option (-o) to process results." msgstr "" -#: tools/alignments/cli.py:55 tools/alignments/cli.py:94 +#: tools/alignments/cli.py:58 tools/alignments/cli.py:97 msgid "processing" msgstr "" -#: tools/alignments/cli.py:57 +#: tools/alignments/cli.py:60 #, python-brace-format msgid "" "R|Choose which action you want to perform. NB: All actions require an " @@ -94,7 +94,7 @@ msgid "" "(EXPERIMENTAL!)" msgstr "" -#: tools/alignments/cli.py:96 +#: tools/alignments/cli.py:99 msgid "" "R|How to output discovered items ('faces' and 'frames' only):\n" "L|'console': Print the list of frames to the screen. (DEFAULT)\n" @@ -104,43 +104,45 @@ msgid "" "directory." msgstr "" -#: tools/alignments/cli.py:107 tools/alignments/cli.py:118 -#: tools/alignments/cli.py:125 +#: tools/alignments/cli.py:110 tools/alignments/cli.py:123 +#: tools/alignments/cli.py:130 msgid "data" msgstr "" -#: tools/alignments/cli.py:111 +#: tools/alignments/cli.py:114 msgid "" -"Full path to the alignments file to be processed. This is required for all " -"jobs except for 'from-faces' when the alignments file will be generated in " -"the specified faces folder." +"Full path to the alignments file to be processed. If you have input a " +"'frames_dir' and don't provide this option, the process will try to find the " +"alignments file at the default location. All jobs require an alignments file " +"with the exception of 'from-faces' when the alignments file will be " +"generated in the specified faces folder." msgstr "" -#: tools/alignments/cli.py:119 +#: tools/alignments/cli.py:124 msgid "Directory containing extracted faces." msgstr "" -#: tools/alignments/cli.py:126 +#: tools/alignments/cli.py:131 msgid "Directory containing source frames that faces were extracted from." msgstr "" -#: tools/alignments/cli.py:135 tools/alignments/cli.py:146 -#: tools/alignments/cli.py:156 +#: tools/alignments/cli.py:140 tools/alignments/cli.py:151 +#: tools/alignments/cli.py:161 msgid "extract" msgstr "" -#: tools/alignments/cli.py:136 +#: tools/alignments/cli.py:141 msgid "" "[Extract only] Extract every 'nth' frame. This option will skip frames when " "extracting faces. For example a value of 1 will extract faces from every " "frame, a value of 10 will extract faces from every 10th frame." msgstr "" -#: tools/alignments/cli.py:147 +#: tools/alignments/cli.py:152 msgid "[Extract only] The output size of extracted faces." msgstr "" -#: tools/alignments/cli.py:157 +#: tools/alignments/cli.py:162 msgid "" "[Extract only] Only extract faces that have been resized by this percent or " "more to meet the specified extract size (`-sz`, `--size`). Useful for " diff --git a/plugins/extract/pipeline.py b/plugins/extract/pipeline.py index c7a6f42..bbe57c0 100644 --- a/plugins/extract/pipeline.py +++ b/plugins/extract/pipeline.py @@ -12,7 +12,7 @@ plugins either in parallel or in series, giving easy access to input and output. import logging import sys -from typing import cast, Dict, Generator, List, Optional, Tuple, TYPE_CHECKING, Union +from typing import Any, cast, Dict, Generator, List, Optional, Tuple, TYPE_CHECKING, Union import cv2 @@ -54,11 +54,11 @@ class Extractor(): Parameters ---------- - detector: str + detector: str or ``None`` The name of a detector plugin as exists in :mod:`plugins.extract.detect` - aligner: str + aligner: str or ``None The name of an aligner plugin as exists in :mod:`plugins.extract.align` - masker: str or list + masker: str or list or ``None The name of a masker plugin(s) as exists in :mod:`plugins.extract.mask`. This can be a single masker or a list of multiple maskers configfile: str, optional @@ -96,9 +96,9 @@ class Extractor(): :attr:`final_pass` to indicate to the caller which phase is being processed """ def __init__(self, - detector: str, - aligner: str, - masker: Union[str, List[str]], + detector: Optional[str], + aligner: Optional[str], + masker: Optional[Union[str, List[str]]], configfile: Optional[str] = None, multiprocess: bool = False, exclude_gpus: Optional[List[int]] = None, @@ -114,8 +114,9 @@ class Extractor(): exclude_gpus, rotate_images, min_size, normalize_method, re_feed, image_is_aligned) self._instance = _get_instance() - masker = [masker] if not isinstance(masker, list) else masker - self._flow = self._set_flow(detector, aligner, masker) + maskers = [cast(Optional[str], + masker)] if not isinstance(masker, list) else cast(List[Optional[str]], masker) + self._flow = self._set_flow(detector, aligner, maskers) self._exclude_gpus = exclude_gpus # We only ever need 1 item in each queue. This is 2 items cached (1 in queue 1 waiting # for queue) at each point. Adding more just stacks RAM with no speed benefit. @@ -125,7 +126,7 @@ class Extractor(): self._vram_stats = self._get_vram_stats() self._detect = self._load_detect(detector, rotate_images, min_size, configfile) self._align = self._load_align(aligner, configfile, normalize_method, re_feed) - self._mask = [self._load_mask(mask, image_is_aligned, configfile) for mask in masker] + self._mask = [self._load_mask(mask, image_is_aligned, configfile) for mask in maskers] self._is_parallel = self._set_parallel_processing(multiprocess) self._phases = self._set_phases(multiprocess) self._phase_index = 0 @@ -381,7 +382,9 @@ class Extractor(): return retval @staticmethod - def _set_flow(detector: str, aligner: str, masker: List[str]) -> List[str]: + def _set_flow(detector: Optional[str], + aligner: Optional[str], + masker: List[Optional[str]]) -> List[str]: """ Set the flow list based on the input plugins """ logger.debug("detector: %s, aligner: %s, masker: %s", detector, aligner, masker) retval = [] @@ -536,7 +539,7 @@ class Extractor(): # << INTERNAL PLUGIN HANDLING >> # def _load_align(self, - aligner: str, + aligner: Optional[str], configfile: Optional[str], normalize_method: Optional[str], re_feed: int) -> Optional["Aligner"]: @@ -554,7 +557,7 @@ class Extractor(): return plugin def _load_detect(self, - detector: str, + detector: Optional[str], rotation: Optional[List[int]], min_size: int, configfile: Optional[str]) -> Optional["Detector"]: @@ -572,7 +575,7 @@ class Extractor(): return plugin def _load_mask(self, - masker: str, + masker: Optional[str], image_is_aligned: bool, configfile: Optional[str]) -> Optional["Masker"]: """ Set global arguments and load masker plugin """ @@ -731,6 +734,7 @@ class ExtractMedia(): self._image_shape = cast(Tuple[int, int, int], image.shape) self._detected_faces: List["DetectedFace"] = ([] if detected_faces is None else detected_faces) + self._frame_metadata: Dict[str, Any] = {} @property def filename(self) -> str: @@ -758,6 +762,20 @@ class ExtractMedia(): """list: A list of :class:`~lib.align.DetectedFace` objects in the :attr:`image`. """ return self._detected_faces + @property + def frame_metadata(self) -> dict: + """ dict: The frame metadata that has been added from an aligned image. This property + should only be called after :func:`add_frame_metadata` has been called when processing + an aligned face. For all other instances an assertion error will be raised. + + Raises + ------ + AssertionError + If frame metadata has not been populated from an aligned image + """ + assert self._frame_metadata is not None + return self._frame_metadata + def get_image_copy(self, color_format: Literal["BGR", "RGB", "GRAY"]) -> "np.ndarray": """ Get a copy of the image in the requested color format. @@ -812,6 +830,18 @@ class ExtractMedia(): self._filename, image.shape) self._image = image + def add_frame_metadata(self, metadata: Dict[str, Any]) -> None: + """ Add the source frame metadata from an aligned PNG's header data. + + metadata: dict + The contents of the 'source' field in the PNG header + """ + logger.trace("Adding PNG Source data for '%s': %s", # type:ignore + self._filename, metadata) + dims: Tuple[int, int] = metadata["source_frame_dims"] + self._image_shape = (*dims, 3) + self._frame_metadata = metadata + def _image_as_bgr(self) -> "np.ndarray": """ Get a copy of the source frame in BGR format. diff --git a/tools/alignments/alignments.py b/tools/alignments/alignments.py index 010b202..26562c4 100644 --- a/tools/alignments/alignments.py +++ b/tools/alignments/alignments.py @@ -1,13 +1,17 @@ #!/usr/bin/env python3 """ Tools for manipulating the alignments serialized file """ import logging +import os +import sys -from typing import TYPE_CHECKING +from typing import Any, TYPE_CHECKING +from lib.utils import _video_extensions from .media import AlignmentData from .jobs import (Check, Draw, Extract, FromFaces, Rename, # noqa pylint: disable=unused-import RemoveFaces, Sort, Spatial) + if TYPE_CHECKING: from argparse import Namespace @@ -27,20 +31,59 @@ class Alignments(): # pylint:disable=too-few-public-methods """ def __init__(self, arguments: "Namespace") -> None: logger.debug("Initializing %s: (arguments: '%s'", self.__class__.__name__, arguments) - self.args = arguments - job = self.args.job - self.alignments = None if job == "from-faces" else AlignmentData(self.args.alignments_file) + self._args = arguments + job = self._args.job + alignment_file = self._find_alignments() + self.alignments = None if job == "from-faces" else AlignmentData(alignment_file) logger.debug("Initialized %s", self.__class__.__name__) + def _find_alignments(self) -> str: + """ If an alignments folder is required and hasn't been provided, scan for a file based on + the video folder. + + Exits if an alignments file cannot be located + + Returns + ------- + str + The full path to an alignments file + """ + fname = self._args.alignments_file + frames = self._args.frames_dir + if fname and os.path.isfile(fname) and os.path.splitext(fname)[-1].lower() == ".fsa": + return fname + if fname: + logger.error("Not a valid alignments file: '%s'", fname) + sys.exit(1) + + if not frames or not os.path.exists(frames): + logger.error("Not a valid frames folder: '%s'. Can't scan for alignments.", frames) + sys.exit(1) + + fname = "alignments.fsa" + if os.path.isdir(frames) and os.path.exists(os.path.join(frames, fname)): + return fname + + if os.path.isdir(frames) or os.path.splitext(frames)[-1] not in _video_extensions: + logger.error("Can't find a valid alignments file in location: %s", frames) + sys.exit(1) + + fname = f"{os.path.splitext(frames)[0]}_{fname}" + if not os.path.exists(fname): + logger.error("Can't find a valid alignments file for video: %s", frames) + sys.exit(1) + + return fname + def process(self) -> None: """ The entry point for the Alignments tool from :mod:`lib.tools.alignments.cli`. Launches the selected alignments job. """ - if self.args.job in ("missing-alignments", "missing-frames", "multi-faces", "no-faces"): - job = Check + if self._args.job in ("missing-alignments", "missing-frames", "multi-faces", "no-faces"): + job: Any = Check else: - job = globals()[self.args.job.title().replace("-", "")] - job = job(self.alignments, self.args) + job = globals()[self._args.job.title().replace("-", "")] + job = job(self.alignments, self._args) logger.debug(job) job.process() diff --git a/tools/alignments/cli.py b/tools/alignments/cli.py index 4ff3a5d..58e4b25 100644 --- a/tools/alignments/cli.py +++ b/tools/alignments/cli.py @@ -3,6 +3,8 @@ import sys import gettext +from typing import Any, List, Dict + from lib.cli.args import FaceSwapArgs from lib.cli.actions import DirOrFileFullPaths, DirFullPaths, FileFullPaths, Radio, Slider @@ -30,7 +32,8 @@ class AlignmentsArgs(FaceSwapArgs): return _("Alignments tool\nThis tool allows you to perform numerous actions on or using " "an alignments file against its corresponding faceset/frame source.") - def get_argument_list(self) -> dict: + @staticmethod + def get_argument_list() -> List[Dict[str, Any]]: """ Collect the argparse argument options. Returns @@ -106,11 +109,13 @@ class AlignmentsArgs(FaceSwapArgs): type=str, group=_("data"), # hacky solution to not require alignments file if creating alignments from faces: - required="from-faces" not in sys.argv, + required=not any(val in sys.argv for val in ["from-faces", "-fr", "-frames_folder"]), filetypes="alignments", - help=_("Full path to the alignments file to be processed. This is required for all " - "jobs except for 'from-faces' when the alignments file will be generated in " - "the specified faces folder."))) + help=_("Full path to the alignments file to be processed. If you have input a " + "'frames_dir' and don't provide this option, the process will try to find the " + "alignments file at the default location. All jobs require an alignments file " + "with the exception of 'from-faces' when the alignments file will be generated " + "in the specified faces folder."))) argument_list.append(dict( opts=("-fc", "-faces_folder"), action=DirFullPaths, diff --git a/tools/mask/mask.py b/tools/mask/mask.py index 6dbb2ac..2a2c238 100644 --- a/tools/mask/mask.py +++ b/tools/mask/mask.py @@ -3,6 +3,7 @@ import logging import os import sys +from typing import Any, cast, Dict, List, Optional, Tuple, TYPE_CHECKING, Union import cv2 import numpy as np @@ -15,6 +16,11 @@ from lib.multithreading import MultiThread from lib.utils import get_folder from plugins.extract.pipeline import Extractor, ExtractMedia +if TYPE_CHECKING: + from argparse import Namespace + from lib.align.aligned_face import CenteringType + from lib.align.alignments import AlignmentFileDict + from lib.queue_manager import EventQueue logger = logging.getLogger(__name__) # pylint:disable=invalid-name @@ -31,7 +37,7 @@ class Mask(): # pylint:disable=too-few-public-methods arguments: :class:`argparse.Namespace` The :mod:`argparse` arguments as passed in from :mod:`tools.py` """ - def __init__(self, arguments): + def __init__(self, arguments: "Namespace") -> None: logger.debug("Initializing %s: (arguments: %s", self.__class__.__name__, arguments) self._update_type = arguments.processing self._input_is_faces = arguments.input_type == "faces" @@ -47,17 +53,18 @@ class Mask(): # pylint:disable=too-few-public-methods self._saver = self._set_saver(arguments) loader = FacesLoader if self._input_is_faces else ImagesLoader self._loader = loader(arguments.input) - self._faces_saver = None + self._faces_saver: Optional[ImagesSaver] = None self._alignments = Alignments(os.path.dirname(arguments.alignments), filename=os.path.basename(arguments.alignments)) self._extractor = self._get_extractor(arguments.exclude_gpus) + self._set_correct_mask_type() self._extractor_input_thread = self._feed_extractor() logger.debug("Initialized %s", self.__class__.__name__) - def _check_input(self, mask_input): + def _check_input(self, mask_input: str) -> None: """ Check the input is valid. If it isn't exit with a logged error Parameters @@ -74,7 +81,7 @@ class Mask(): # pylint:disable=too-few-public-methods sys.exit(0) logger.debug("input '%s' is valid", mask_input) - def _set_saver(self, arguments): + def _set_saver(self, arguments: "Namespace") -> Optional[ImagesSaver]: """ set the saver in a background thread Parameters @@ -100,7 +107,7 @@ class Mask(): # pylint:disable=too-few-public-methods logger.debug(saver) return saver - def _get_extractor(self, exclude_gpus): + def _get_extractor(self, exclude_gpus: List[int]) -> Optional[Extractor]: """ Obtain a Mask extractor plugin and launch it Parameters @@ -125,7 +132,22 @@ class Mask(): # pylint:disable=too-few-public-methods logger.debug(extractor) return extractor - def _feed_extractor(self): + def _set_correct_mask_type(self): + """ Some masks have multiple variants that they can be saved as depending on config options + so update the :attr:`_mask_type` accordingly + """ + if self._extractor is None or self._mask_type != "bisenet-fp": + return + + # Hacky look up into masker to get the type of mask + mask_plugin = self._extractor._mask[0] # pylint:disable=protected-access + assert mask_plugin is not None + mtype = "head" if mask_plugin.config.get("include_hair", False) else "face" + new_type = f"{self._mask_type}_{mtype}" + logger.debug("Updating '%s' to '%s'", self._mask_type, new_type) + self._mask_type = new_type + + def _feed_extractor(self) -> MultiThread: """ Feed the input queue to the Extractor from a faces folder or from source frames in a background thread @@ -134,17 +156,63 @@ class Mask(): # pylint:disable=too-few-public-methods :class:`lib.multithreading.Multithread`: The thread that is feeding the extractor. """ - masker_input = getattr(self, - "_input_{}".format("faces" if self._input_is_faces else "frames")) + masker_input = getattr(self, f"_input_{'faces' if self._input_is_faces else 'frames'}") logger.debug("masker_input: %s", masker_input) - args = tuple() if self._update_type == "output" else (self._extractor.input_queue, ) + if self._update_type == "output": + args: tuple = tuple() + else: + assert self._extractor is not None + args = (self._extractor.input_queue, ) input_thread = MultiThread(masker_input, *args, thread_count=1) input_thread.start() logger.debug(input_thread) return input_thread - def _input_faces(self, *args): + def _process_face(self, + filename: str, + image: np.ndarray, + metadata: Dict[str, Any]) -> Optional["ExtractMedia"]: + """ Process a single face when masking from face images + + filename: str + the filename currently being processed + image: :class:`numpy.ndarray` + The current face being processed + metadata: dict + The source frame metadata from the PNG header + + Returns + ------- + :class:`plugins.pipeline.ExtractMedia` or ``None`` + If the update type is 'output' then nothing is returned otherwise the extract media for + the face is returned + """ + frame_name = metadata["source"]["source_filename"] + face_index = metadata["source"]["face_index"] + alignment = self._alignments.get_faces_in_frame(frame_name) + if not alignment or face_index > len(alignment) - 1: + self._counts["skip"] += 1 + logger.warning("Skipping Face not found in alignments file: '%s'", filename) + return None + alignment = alignment[face_index] + self._counts["face"] += 1 + + if self._check_for_missing(frame_name, face_index, alignment): + return None + + detected_face = self._get_detected_face(alignment) + if self._update_type == "output": + detected_face.image = image + self._save(frame_name, face_index, detected_face) + return None + + media = ExtractMedia(filename, image, detected_faces=[detected_face]) + media.add_frame_metadata(metadata["source"]) + self._counts["update"] += 1 + return media + + def _input_faces(self, *args: Union[tuple, Tuple["EventQueue"]]) -> None: """ Input pre-aligned faces to the Extractor plugin inside a thread Parameters @@ -156,7 +224,7 @@ class Mask(): # pylint:disable=too-few-public-methods log_once = False logger.debug("args: %s", args) if self._update_type != "output": - queue = args[0] + queue = cast("EventQueue", args[0]) for filename, image, metadata in tqdm(self._loader.load(), total=self._loader.count): if not metadata: # Legacy faces. Update the headers if not log_once: @@ -174,35 +242,14 @@ class Mask(): # pylint:disable=too-few-public-methods logger.error("You can re-extract the face-set by using the Alignments Tool's " "Extract job.") break - frame_name = metadata["source"]["source_filename"] - face_index = metadata["source"]["face_index"] - alignment = self._alignments.get_faces_in_frame(frame_name) - if not alignment or face_index > len(alignment) - 1: - self._counts["skip"] += 1 - logger.warning("Skipping Face not found in alignments file: '%s'", filename) - continue - alignment = alignment[face_index] - self._counts["face"] += 1 - - if self._check_for_missing(frame_name, face_index, alignment): - continue - - detected_face = self._get_detected_face(alignment) - if self._update_type == "output": - detected_face.image = image - self._save(frame_name, face_index, detected_face) - else: - media = ExtractMedia(filename, image, detected_faces=[detected_face]) - # Hacky overload of ExtractMedia's shape parameter to apply the actual original - # frame dimension - media._image_shape = (*metadata["source"]["source_frame_dims"], 3) - setattr(media, "mask_tool_face_info", metadata["source"]) # TODO formalize + media = self._process_face(filename, image, metadata) + if media is not None: queue.put(media) - self._counts["update"] += 1 + if self._update_type != "output": queue.put("EOF") - def _input_frames(self, *args): + def _input_frames(self, *args: Union[tuple, Tuple["EventQueue"]]) -> None: """ Input frames to the Extractor plugin inside a thread Parameters @@ -213,7 +260,7 @@ class Mask(): # pylint:disable=too-few-public-methods """ logger.debug("args: %s", args) if self._update_type != "output": - queue = args[0] + queue = cast("EventQueue", args[0]) for filename, image in tqdm(self._loader.load(), total=self._loader.count): frame = os.path.basename(filename) if not self._alignments.frame_exists(frame): @@ -245,7 +292,7 @@ class Mask(): # pylint:disable=too-few-public-methods if self._update_type != "output": queue.put("EOF") - def _check_for_missing(self, frame, idx, alignment): + def _check_for_missing(self, frame: str, idx: int, alignment: "AlignmentFileDict") -> bool: """ Check if the alignment is missing the requested mask_type Parameters @@ -270,7 +317,7 @@ class Mask(): # pylint:disable=too-few-public-methods logger.debug("Mask pre-exists for face: '%s' - %s", frame, idx) return retval - def _get_output_suffix(self, arguments): + def _get_output_suffix(self, arguments: "Namespace") -> str: """ The filename suffix, based on selected output options. Parameters @@ -285,11 +332,11 @@ class Mask(): # pylint:disable=too-few-public-methods """ sfx = "mask_preview_" sfx += "face_" if not arguments.full_frame or self._input_is_faces else "frame_" - sfx += "{}.png".format(arguments.output_type) + sfx += f"{arguments.output_type}.png" return sfx @staticmethod - def _get_detected_face(alignment): + def _get_detected_face(alignment: "AlignmentFileDict") -> DetectedFace: """ Convert an alignment dict item to a detected_face object Parameters @@ -306,11 +353,12 @@ class Mask(): # pylint:disable=too-few-public-methods detected_face.from_alignment(alignment) return detected_face - def process(self): + def process(self) -> None: """ The entry point for the Mask tool from :file:`lib.tools.cli`. Runs the Mask process """ logger.debug("Starting masker process") - updater = getattr(self, "_update_{}".format("faces" if self._input_is_faces else "frames")) + updater = getattr(self, f"_update_{'faces' if self._input_is_faces else 'frames'}") if self._update_type != "output": + assert self._extractor is not None if self._input_is_faces: self._faces_saver = ImagesSaver(self._loader.location, as_bytes=True) for extractor_output in self._extractor.detected_faces(): @@ -320,6 +368,7 @@ class Mask(): # pylint:disable=too-few-public-methods self._alignments.backup() self._alignments.save() if self._input_is_faces: + assert self._faces_saver is not None self._faces_saver.close() self._extractor_input_thread.join() @@ -337,24 +386,26 @@ class Mask(): # pylint:disable=too-few-public-methods self._counts["update"], self._counts["face"]) logger.debug("Completed masker process") - def _update_faces(self, extractor_output): + def _update_faces(self, extractor_output: ExtractMedia) -> None: """ Update alignments for the mask if the input type is a faces folder If an output location has been indicated, then puts the mask preview to the save queue Parameters ---------- - extractor_output: dict + extractor_output: :class:`plugins.extract.pipeline.ExtractMedia` The output from the :class:`plugins.extract.pipeline.Extractor` object """ + assert self._faces_saver is not None for face in extractor_output.detected_faces: - frame_name = extractor_output.mask_tool_face_info["source_filename"] - face_index = extractor_output.mask_tool_face_info["face_index"] - logger.trace("Saving face: (frame: %s, face index: %s)", frame_name, face_index) + frame_name = extractor_output.frame_metadata["source_filename"] + face_index = extractor_output.frame_metadata["face_index"] + logger.trace("Saving face: (frame: %s, face index: %s)", # type: ignore + frame_name, face_index) self._alignments.update_face(frame_name, face_index, face.to_alignment()) metadata = dict(alignments=face.to_png_meta(), - source=extractor_output.mask_tool_face_info) + source=extractor_output.frame_metadata) self._faces_saver.save(extractor_output.filename, encode_image(extractor_output.image, ".png", metadata=metadata)) @@ -362,14 +413,14 @@ class Mask(): # pylint:disable=too-few-public-methods face.image = extractor_output.image self._save(frame_name, face_index, face) - def _update_frames(self, extractor_output): + def _update_frames(self, extractor_output: ExtractMedia) -> None: """ Update alignments for the mask if the input type is a frames folder or video If an output location has been indicated, then puts the mask preview to the save queue Parameters ---------- - extractor_output: dict + extractor_output: :class:`plugins.extract.pipeline.ExtractMedia` The output from the :class:`plugins.extract.pipeline.Extractor` object """ frame = os.path.basename(extractor_output.filename) @@ -379,7 +430,7 @@ class Mask(): # pylint:disable=too-few-public-methods face.image = extractor_output.image self._save(frame, idx, face) - def _save(self, frame, idx, detected_face): + def _save(self, frame: str, idx: int, detected_face: DetectedFace) -> None: """ Build the mask preview image and save Parameters @@ -391,6 +442,7 @@ class Mask(): # pylint:disable=too-few-public-methods detected_face: `lib.FacesDetect.detected_face` A detected_face object for a face """ + assert self._saver is not None if self._mask_type == "bisenet-fp": mask_types = [f"{self._mask_type}_{area}" for area in ("face", "head")] else: @@ -406,15 +458,14 @@ class Mask(): # pylint:disable=too-few-public-methods if mask_type not in detected_face.mask: # If extracting bisenet mask, then skip versions which don't exist continue - filename = os.path.join(self._saver.location, "{}_{}_{}".format( - os.path.splitext(frame)[0], - idx, - f"{mask_type}_{self._output['suffix']}")) + filename = os.path.join( + self._saver.location, + f"{os.path.splitext(frame)[0]}_{idx}_{mask_type}_{self._output['suffix']}") image = self._create_image(detected_face, mask_type) - logger.trace("filename: '%s', image_shape: %s", filename, image.shape) + logger.trace("filename: '%s', image_shape: %s", filename, image.shape) # type: ignore self._saver.save(filename, image) - def _create_image(self, detected_face, mask_type): + def _create_image(self, detected_face: DetectedFace, mask_type: str) -> np.ndarray: """ Create a mask preview image for saving out to disk Parameters @@ -433,6 +484,7 @@ class Mask(): # pylint:disable=too-few-public-methods - The masked face """ mask = detected_face.mask[mask_type] + assert detected_face.image is not None mask.set_blur_and_threshold(**self._output["opts"]) if not self._output["full_frame"] or self._input_is_faces: if self._input_is_faces: @@ -442,26 +494,28 @@ class Mask(): # pylint:disable=too-few-public-methods size=detected_face.image.shape[0], is_aligned=True).face else: - centering = "legacy" if self._alignments.version == 1.0 else mask.stored_centering + centering: "CenteringType" = ("legacy" if self._alignments.version == 1.0 + else mask.stored_centering) detected_face.load_aligned(detected_face.image, centering=centering, force=True) face = detected_face.aligned.face + assert face is not None mask = cv2.resize(detected_face.mask[mask_type].mask, (face.shape[1], face.shape[0]), interpolation=cv2.INTER_CUBIC)[..., None] else: face = np.array(detected_face.image) # cv2 fails if this comes as imageio.core.Array - mask = mask.get_full_frame_mask(face.shape[1], face.shape[0]) - mask = np.expand_dims(mask, -1) + imask = mask.get_full_frame_mask(face.shape[1], face.shape[0]) + imask = np.expand_dims(imask, -1) height, width = face.shape[:2] if self._output["type"] == "combined": - masked = (face.astype("float32") * mask.astype("float32") / 255.).astype("uint8") - mask = np.tile(mask, 3) - for img in (face, masked, mask): + masked = (face.astype("float32") * imask.astype("float32") / 255.).astype("uint8") + imask = np.tile(imask, 3) + for img in (face, masked, imask): cv2.rectangle(img, (0, 0), (width - 1, height - 1), (255, 255, 255), 1) - out_image = np.concatenate((face, masked, mask), axis=1) + out_image = np.concatenate((face, masked, imask), axis=1) elif self._output["type"] == "mask": - out_image = mask + out_image = imask elif self._output["type"] == "masked": - out_image = np.concatenate([face, mask], axis=-1) + out_image = np.concatenate([face, imask], axis=-1) return out_image