mirror of
https://github.com/zebrajr/faceswap.git
synced 2026-01-15 12:15:15 +00:00
Extract filter: Allow saving of filtered images
This commit is contained in:
34
lib/image.py
34
lib/image.py
@@ -11,6 +11,7 @@ import sys
|
||||
from ast import literal_eval
|
||||
from bisect import bisect
|
||||
from concurrent import futures
|
||||
from typing import Optional
|
||||
from zlib import crc32
|
||||
|
||||
import cv2
|
||||
@@ -1422,7 +1423,7 @@ class ImagesSaver(ImageIO):
|
||||
executor.submit(self._save, *item)
|
||||
executor.shutdown()
|
||||
|
||||
def _save(self, filename, image):
|
||||
def _save(self, filename: str, image: bytes, sub_folder: Optional[str]) -> None:
|
||||
""" Save a single image inside a ThreadPoolExecutor
|
||||
|
||||
Parameters
|
||||
@@ -1430,21 +1431,28 @@ class ImagesSaver(ImageIO):
|
||||
filename: str
|
||||
The filename of the image to be saved. NB: Any folders passed in with the filename
|
||||
will be stripped and replaced with :attr:`location`.
|
||||
image: numpy.ndarray
|
||||
The image to be saved
|
||||
image: bytes
|
||||
The encoded image to be saved
|
||||
subfolder: str or ``None``
|
||||
If the file should be saved in a subfolder in the output location, the subfolder should
|
||||
be provided here. ``None`` for no subfolder.
|
||||
"""
|
||||
filename = os.path.join(self.location, os.path.basename(filename))
|
||||
location = os.path.join(self.location, sub_folder) if sub_folder else self._location
|
||||
if sub_folder and not os.path.exists(location):
|
||||
os.makedirs(location)
|
||||
|
||||
filename = os.path.join(location, os.path.basename(filename))
|
||||
try:
|
||||
if self._as_bytes:
|
||||
with open(filename, "wb") as out_file:
|
||||
out_file.write(image)
|
||||
else:
|
||||
cv2.imwrite(filename, image)
|
||||
logger.trace("Saved image: '%s'", filename)
|
||||
logger.trace("Saved image: '%s'", filename) # type:ignore
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
logger.error("Failed to save image '%s'. Original Error: %s", filename, err)
|
||||
|
||||
def save(self, filename, image):
|
||||
def save(self, filename: str, image: bytes, sub_folder: Optional[str] = None) -> None:
|
||||
""" Save the given image in the background thread
|
||||
|
||||
Ensure that :func:`close` is called once all save operations are complete.
|
||||
@@ -1452,13 +1460,17 @@ class ImagesSaver(ImageIO):
|
||||
Parameters
|
||||
----------
|
||||
filename: str
|
||||
The filename of the image to be saved
|
||||
image: numpy.ndarray
|
||||
The image to be saved
|
||||
The filename of the image to be saved. NB: Any folders passed in with the filename
|
||||
will be stripped and replaced with :attr:`location`.
|
||||
image: bytes
|
||||
The encoded image to be saved
|
||||
subfolder: str, optional
|
||||
If the file should be saved in a subfolder in the output location, the subfolder should
|
||||
be provided here. ``None`` for no subfolder. Default: ``None``
|
||||
"""
|
||||
self._set_thread()
|
||||
logger.trace("Putting to save queue: '%s'", filename)
|
||||
self._queue.put((filename, image))
|
||||
logger.trace("Putting to save queue: '%s'", filename) # type:ignore
|
||||
self._queue.put((filename, image, sub_folder))
|
||||
|
||||
def close(self):
|
||||
""" Signal to the Save Threads that they should be closed and cleanly shutdown
|
||||
|
||||
@@ -74,3 +74,11 @@ class Config(FaceswapConfig):
|
||||
info="Filters out faces who's landmarks are above this distance from an 'average' "
|
||||
"face. Values above 16 tend to be fairly safe. Values above 10 will remove more "
|
||||
"false positives, but may also filter out some faces at extreme angles.")
|
||||
self.add_item(
|
||||
section=section,
|
||||
title="save_filtered",
|
||||
datatype=bool,
|
||||
default=False,
|
||||
group="filters",
|
||||
info="If enabled, saves any filtered out images into a sub-folder during the "
|
||||
"extraction process. If disabled, filtered faces are deleted.")
|
||||
|
||||
@@ -122,7 +122,8 @@ class Aligner(Extractor): # pylint:disable=abstract-method
|
||||
self._output_faces: List[DetectedFace] = []
|
||||
self._filter = AlignedFilter(min_scale=self.config["aligner_min_scale"],
|
||||
max_scale=self.config["aligner_max_scale"],
|
||||
distance=self.config["aligner_distance"])
|
||||
distance=self.config["aligner_distance"],
|
||||
save_output=self.config["save_filtered"])
|
||||
logger.debug("Initialized %s", self.__class__.__name__)
|
||||
|
||||
def set_normalize_method(self,
|
||||
@@ -211,7 +212,8 @@ class Aligner(Extractor): # pylint:disable=abstract-method
|
||||
logger.debug(item) # type:ignore
|
||||
|
||||
# TODO Move to end of process not beginning
|
||||
self._filter.output_counts()
|
||||
if exhausted:
|
||||
self._filter.output_counts()
|
||||
|
||||
return exhausted, batch
|
||||
|
||||
@@ -277,10 +279,11 @@ class Aligner(Extractor): # pylint:disable=abstract-method
|
||||
if len(self._output_faces) != self._faces_per_filename[filename]:
|
||||
continue
|
||||
|
||||
self._output_faces = self._filter(self._output_faces, min(frame.shape[:2]))
|
||||
self._output_faces, folders = self._filter(self._output_faces, min(frame.shape[:2]))
|
||||
|
||||
output = self._extract_media.pop(filename)
|
||||
output.add_detected_faces(self._output_faces)
|
||||
output.add_sub_folders(folders)
|
||||
self._output_faces = []
|
||||
|
||||
logger.trace("Final Output: (filename: '%s', image shape: %s, " # type:ignore
|
||||
@@ -524,21 +527,31 @@ class AlignedFilter():
|
||||
max_scale: float
|
||||
Filters out faces that have been aligned at above this value as a multiplier of the
|
||||
minimum frame dimension. Set to ``0`` for off.
|
||||
distance: float:
|
||||
distance: float
|
||||
Filters out faces that are further than this distance from an "average" face. Set to
|
||||
``0`` for off.
|
||||
save_output: bool
|
||||
``True`` if the filtered faces should be kept as they are being saved. ``False`` if they
|
||||
should be deleted
|
||||
"""
|
||||
def __init__(self, min_scale: float, max_scale: float, distance: float):
|
||||
logger.debug("Initializing %s: (min_scale: %s, max_scale: %s, distance: %s)",
|
||||
self.__class__.__name__, min_scale, max_scale, distance)
|
||||
def __init__(self,
|
||||
min_scale: float,
|
||||
max_scale: float,
|
||||
distance: float,
|
||||
save_output: bool) -> None:
|
||||
logger.debug("Initializing %s: (min_scale: %s, max_scale: %s, distance: %s, "
|
||||
"save_output: %s)", self.__class__.__name__, min_scale, max_scale, distance,
|
||||
save_output)
|
||||
self._min_scale = min_scale
|
||||
self._max_scale = max_scale
|
||||
self._distance = distance / 100.
|
||||
self._save_output = save_output
|
||||
self._active = max_scale > 0.0 or min_scale > 0.0 or distance > 0.0
|
||||
self._counts: Dict[str, int] = dict(min_scale=0, max_scale=0, distance=0)
|
||||
logger.debug("Initialized %s: ", self.__class__.__name__)
|
||||
|
||||
def __call__(self, faces: List[DetectedFace], minimum_dimension: int) -> List[DetectedFace]:
|
||||
def __call__(self, faces: List[DetectedFace], minimum_dimension: int
|
||||
) -> Tuple[List[DetectedFace], List[Optional[str]]]:
|
||||
""" Apply the filter to the incoming batch
|
||||
|
||||
Parameters
|
||||
@@ -550,32 +563,45 @@ class AlignedFilter():
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
The filtered list of detected face objects
|
||||
|
||||
detected_faces: list
|
||||
The filtered list of detected face objects, if saving filtered faces has not been
|
||||
selected or the full list of detected faces
|
||||
sub_folders: list
|
||||
List of ``Nones`` if saving filtered faces has not been selected or list of ``Nones``
|
||||
and sub folder names corresponding the filtered face location
|
||||
"""
|
||||
sub_folders: List[Optional[str]] = [None for _ in range(len(faces))]
|
||||
if not self._active:
|
||||
return faces
|
||||
return faces, sub_folders
|
||||
|
||||
max_size = minimum_dimension * self._max_scale
|
||||
min_size = minimum_dimension * self._min_scale
|
||||
retval: List[DetectedFace] = []
|
||||
for face in faces:
|
||||
for idx, face in enumerate(faces):
|
||||
test = AlignedFace(landmarks=face.landmarks_xy, centering="face")
|
||||
if self._min_scale > 0.0 or self._max_scale > 0.0:
|
||||
roi = test.original_roi
|
||||
size = ((roi[1][0] - roi[0][0]) ** 2 + (roi[1][1] - roi[0][1]) ** 2) ** 0.5
|
||||
if self._min_scale > 0.0 and size < min_size:
|
||||
self._counts["min_scale"] += 1
|
||||
if self._save_output:
|
||||
retval.append(face)
|
||||
sub_folders[idx] = "_align_filt_min_scale"
|
||||
continue
|
||||
if self._max_scale > 0.0 and size > max_size:
|
||||
self._counts["max_scale"] += 1
|
||||
if self._save_output:
|
||||
retval.append(face)
|
||||
sub_folders[idx] = "_align_filt_max_scale"
|
||||
continue
|
||||
if 0.0 < self._distance < test.average_distance:
|
||||
self._counts["distance"] += 1
|
||||
if self._save_output:
|
||||
retval.append(face)
|
||||
sub_folders[idx] = "_align_filt_distance"
|
||||
continue
|
||||
retval.append(face)
|
||||
return retval
|
||||
return retval, sub_folders
|
||||
|
||||
def output_counts(self):
|
||||
""" Output the counts of filtered items """
|
||||
|
||||
@@ -754,6 +754,7 @@ class ExtractMedia():
|
||||
self._detected_faces: List["DetectedFace"] = ([] if detected_faces is None
|
||||
else detected_faces)
|
||||
self._frame_metadata: Dict[str, Any] = {}
|
||||
self._sub_folders: List[Optional[str]] = []
|
||||
|
||||
@property
|
||||
def filename(self) -> str:
|
||||
@@ -795,6 +796,13 @@ class ExtractMedia():
|
||||
assert self._frame_metadata is not None
|
||||
return self._frame_metadata
|
||||
|
||||
@property
|
||||
def sub_folders(self) -> List[Optional[str]]:
|
||||
""" list: The sub_folders that the faces should be output to. Used when binning filter
|
||||
output is enabled. The list corresponds to the list of detected faces
|
||||
"""
|
||||
return self._sub_folders
|
||||
|
||||
def get_image_copy(self, color_format: Literal["BGR", "RGB", "GRAY"]) -> "np.ndarray":
|
||||
""" Get a copy of the image in the requested color format.
|
||||
|
||||
@@ -826,6 +834,19 @@ class ExtractMedia():
|
||||
[(face.left, face.right, face.top, face.bottom) for face in faces])
|
||||
self._detected_faces = faces
|
||||
|
||||
def add_sub_folders(self, folders: List[Optional[str]]) -> None:
|
||||
""" Add detected faces to the object. Called at the end of each extraction phase.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
folders: list
|
||||
A list of str sub folder names or ``None`` if no sub folder is required. Should
|
||||
correspond to the detected faces list
|
||||
"""
|
||||
logger.trace("Adding sub folders for filename: '%s'. " # type: ignore
|
||||
"(folders: %s)", self._filename, folders,)
|
||||
self._sub_folders = folders
|
||||
|
||||
def remove_image(self) -> None:
|
||||
""" Delete the image and reset :attr:`image` to ``None``.
|
||||
|
||||
|
||||
@@ -414,7 +414,8 @@ class _Extract(): # pylint:disable=too-few-public-methods
|
||||
image = encode_image(face.aligned.face, extension, metadata=meta)
|
||||
|
||||
if saver is not None:
|
||||
saver.save(output_filename, image)
|
||||
sub_folder = extract_media.sub_folders[idx]
|
||||
saver.save(output_filename, image, sub_folder)
|
||||
final_faces.append(face.to_alignment())
|
||||
self._alignments.data[os.path.basename(extract_media.filename)] = dict(faces=final_faces)
|
||||
del extract_media
|
||||
|
||||
Reference in New Issue
Block a user