* feat: download to stdout if name is `-` #493 still a wip though, somehow i need to add metadata to these streams oh well... * refactor: rework reencoding, metadata adding * chore: ignore .idea * chore: add dev requirements * test: suppress encoding warnings * feat: add reworked ffmpeg progress bar * perf: use memoryview for streaming responses * fix: fix large tracks downloading * refactor: migrate threads to asyncio (wip) * Revert "refactor: migrate threads to asyncio (wip)" This reverts commit d6e247ee2f6f8d5c57fac21de20a81dde3b2e395. * refactor: get rid of the queue * refactor: inline stderr thread, adjust buf sizes * perf: don't reencode original wavs/hls streams * refactor: mark threads as daemonic * refactor: remove unused imports * fix: reencode aacs with adts * refactor(metadata): output a bit more detailed error message * refactor: properly reencode m4a * Performance improvements * Bump version * Update README --------- Co-authored-by: 7x11x13 <x7x11x13@gmail.com>
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -7,4 +7,5 @@ __pycache__/
|
||||
.vscode
|
||||
.venv
|
||||
.env
|
||||
.coverage*
|
||||
.coverage*
|
||||
.idea
|
||||
@@ -81,7 +81,7 @@ scdl me -f
|
||||
--original-metadata Do not change metadata of original file downloads
|
||||
--no-original Do not download original file; only mp3, m4a, or opus
|
||||
--only-original Only download songs with original file available
|
||||
--name-format [format] Specify the downloaded file name format
|
||||
--name-format [format] Specify the downloaded file name format. Use "-" to download to stdout
|
||||
--playlist-name-format [format] Specify the downloaded file name format, if it is being downloaded as part of a playlist
|
||||
--client-id [id] Specify the client_id to use
|
||||
--auth-token [token] Specify the auth token to use
|
||||
|
||||
2
requirements.dev.txt
Normal file
2
requirements.dev.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
pytest
|
||||
music-tag
|
||||
@@ -1,3 +1,3 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
"""Python Soundcloud Music Downloader."""
|
||||
__version__ = "v2.9.5"
|
||||
__version__ = "v2.10.0"
|
||||
|
||||
157
scdl/metadata_assembler.py
Normal file
157
scdl/metadata_assembler.py
Normal file
@@ -0,0 +1,157 @@
|
||||
from base64 import b64encode
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Type, TypeVar, Union, Callable
|
||||
from types import MappingProxyType
|
||||
|
||||
from mutagen import FileType, flac, oggopus, id3, wave, mp3, mp4
|
||||
|
||||
|
||||
JPEG_MIME_TYPE: str = 'image/jpeg'
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MetadataInfo:
|
||||
artist: str
|
||||
title: str
|
||||
description: Optional[str]
|
||||
genre: Optional[str]
|
||||
|
||||
artwork_jpeg: Optional[bytes]
|
||||
|
||||
link: Optional[str]
|
||||
date: Optional[str]
|
||||
|
||||
album_title: Optional[str]
|
||||
album_author: Optional[str]
|
||||
album_track_num: Optional[int]
|
||||
|
||||
|
||||
def _get_flac_pic(jpeg_data: bytes) -> flac.Picture:
|
||||
pic = flac.Picture()
|
||||
pic.data = jpeg_data
|
||||
pic.mime = JPEG_MIME_TYPE
|
||||
pic.type = id3.PictureType.COVER_FRONT
|
||||
return pic
|
||||
|
||||
|
||||
def _get_apic(jpeg_data: bytes) -> id3.APIC:
|
||||
return id3.APIC(
|
||||
encoding=3,
|
||||
mime=JPEG_MIME_TYPE,
|
||||
type=3,
|
||||
desc='Cover',
|
||||
data=jpeg_data,
|
||||
)
|
||||
|
||||
|
||||
def _assemble_common(file: FileType, meta: MetadataInfo) -> None:
|
||||
file['artist'] = meta.artist
|
||||
file['title'] = meta.title
|
||||
|
||||
if meta.genre:
|
||||
file['genre'] = meta.genre
|
||||
|
||||
if meta.link:
|
||||
file['website'] = meta.link
|
||||
|
||||
if meta.date:
|
||||
file['date'] = meta.date
|
||||
|
||||
if meta.album_title:
|
||||
file['album'] = meta.album_title
|
||||
|
||||
if meta.album_author:
|
||||
file['albumartist'] = meta.album_author
|
||||
|
||||
if meta.album_track_num is not None:
|
||||
file['tracknumber'] = str(meta.album_track_num)
|
||||
|
||||
|
||||
def _assemble_flac(file: flac.FLAC, meta: MetadataInfo) -> None:
|
||||
_assemble_common(file, meta)
|
||||
|
||||
if meta.description:
|
||||
file['description'] = meta.description
|
||||
|
||||
if meta.artwork_jpeg:
|
||||
file.add_picture(_get_flac_pic(meta.artwork_jpeg))
|
||||
|
||||
|
||||
def _assemble_opus(file: oggopus.OggOpus, meta: MetadataInfo) -> None:
|
||||
_assemble_common(file, meta)
|
||||
|
||||
if meta.description:
|
||||
file['comment'] = meta.description
|
||||
|
||||
if meta.artwork_jpeg:
|
||||
pic = _get_flac_pic(meta.artwork_jpeg).write()
|
||||
file['metadata_block_picture'] = b64encode(pic).decode()
|
||||
|
||||
|
||||
def _assemble_wav_or_mp3(file: Union[wave.WAVE, mp3.MP3], meta: MetadataInfo) -> None:
|
||||
file['TIT2'] = id3.TIT2(encoding=3, text=meta.title)
|
||||
file['TPE1'] = id3.TPE1(encoding=3, text=meta.artist)
|
||||
|
||||
if meta.description:
|
||||
file['COMM'] = id3.COMM(encoding=3, lang='ENG', text=meta.description)
|
||||
|
||||
if meta.genre:
|
||||
file['TCON'] = id3.TCON(encoding=3, text=meta.genre)
|
||||
|
||||
if meta.link:
|
||||
file['WOAS'] = id3.WOAS(url=meta.link)
|
||||
|
||||
if meta.date:
|
||||
file['TDAT'] = id3.TDAT(encoding=3, text=meta.date)
|
||||
|
||||
if meta.album_title:
|
||||
file['TALB'] = id3.TALB(encoding=3, text=meta.album_title)
|
||||
|
||||
if meta.album_author:
|
||||
file['TPE2'] = id3.TPE2(encoding=3, text=meta.album_author)
|
||||
|
||||
if meta.album_track_num is not None:
|
||||
file['TRCK'] = id3.TRCK(encoding=3, text=str(meta.album_track_num))
|
||||
|
||||
if meta.artwork_jpeg:
|
||||
file['APIC'] = _get_apic(meta.artwork_jpeg)
|
||||
|
||||
|
||||
def _assemble_mp4(file: mp4.MP4, meta: MetadataInfo) -> None:
|
||||
file['\251ART'] = meta.artist
|
||||
file['\251nam'] = meta.title
|
||||
|
||||
if meta.genre:
|
||||
file['\251gen'] = meta.genre
|
||||
|
||||
if meta.link:
|
||||
file['\251cmt'] = meta.link
|
||||
|
||||
if meta.date:
|
||||
file['\251day'] = meta.date
|
||||
|
||||
if meta.album_title:
|
||||
file['\251alb'] = meta.album_title
|
||||
|
||||
if meta.album_author:
|
||||
file['aART'] = meta.album_author
|
||||
|
||||
if meta.album_track_num is not None:
|
||||
file['trkn'] = str(meta.album_track_num)
|
||||
|
||||
if meta.description:
|
||||
file['desc'] = meta.description
|
||||
|
||||
if meta.artwork_jpeg:
|
||||
file['covr'] = [mp4.MP4Cover(meta.artwork_jpeg)]
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
METADATA_ASSEMBLERS: MappingProxyType[Type[T], Callable[[T, MetadataInfo], None]] = MappingProxyType({
|
||||
flac.FLAC: _assemble_flac,
|
||||
oggopus.OggOpus: _assemble_opus,
|
||||
wave.WAVE: _assemble_wav_or_mp3,
|
||||
mp3.MP3: _assemble_wav_or_mp3,
|
||||
mp4.MP4: _assemble_mp4,
|
||||
})
|
||||
|
||||
732
scdl/scdl.py
Executable file → Normal file
732
scdl/scdl.py
Executable file → Normal file
@@ -12,7 +12,7 @@ Usage:
|
||||
[--original-name][--original-metadata][--no-original][--only-original]
|
||||
[--name-format <format>][--strict-playlist][--playlist-name-format <format>]
|
||||
[--client-id <id>][--auth-token <token>][--overwrite][--no-playlist][--opus]
|
||||
|
||||
|
||||
scdl -h | --help
|
||||
scdl --version
|
||||
|
||||
@@ -57,7 +57,7 @@ Options:
|
||||
--original-metadata Do not change metadata of original file downloads
|
||||
--no-original Do not download original file; only mp3, m4a, or opus
|
||||
--only-original Only download songs with original file available
|
||||
--name-format [format] Specify the downloaded file name format
|
||||
--name-format [format] Specify the downloaded file name format. Use "-" to download to stdout
|
||||
--playlist-name-format [format] Specify the downloaded file name format, if it is being downloaded as part of a playlist
|
||||
--client-id [id] Specify the client_id to use
|
||||
--auth-token [token] Specify the auth token to use
|
||||
@@ -68,13 +68,17 @@ Options:
|
||||
"""
|
||||
|
||||
import atexit
|
||||
import base64
|
||||
import configparser
|
||||
import contextlib
|
||||
import io
|
||||
import itertools
|
||||
import logging
|
||||
import math
|
||||
import mimetypes
|
||||
from typing import List, Optional, TypedDict
|
||||
import threading
|
||||
import tempfile
|
||||
from typing import List, Optional, TypedDict, Tuple, IO, Union
|
||||
import secrets
|
||||
|
||||
mimetypes.init()
|
||||
|
||||
@@ -83,7 +87,6 @@ import pathlib
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
import urllib.parse
|
||||
@@ -92,12 +95,6 @@ from dataclasses import asdict
|
||||
|
||||
import filelock
|
||||
import mutagen
|
||||
import mutagen.flac
|
||||
import mutagen.id3
|
||||
import mutagen.mp3
|
||||
import mutagen.mp4
|
||||
import mutagen.oggopus
|
||||
import mutagen.wave
|
||||
from mutagen.easymp4 import EasyMP4
|
||||
|
||||
EasyMP4.RegisterTextKey("website", "purl")
|
||||
@@ -110,6 +107,7 @@ from soundcloud import (BasicAlbumPlaylist, BasicTrack, MiniTrack, SoundCloud,
|
||||
from tqdm import tqdm
|
||||
|
||||
from scdl import __version__, utils
|
||||
from scdl.metadata_assembler import METADATA_ASSEMBLERS, MetadataInfo
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||||
logging.getLogger("requests").setLevel(logging.WARNING)
|
||||
@@ -117,7 +115,7 @@ logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.addFilter(utils.ColorizeFilter())
|
||||
|
||||
CHUNK_SIZE = 1024
|
||||
FFMPEG_PIPE_CHUNK_SIZE = 1024 * 1024 # 1 mb
|
||||
|
||||
fileToKeep = []
|
||||
|
||||
@@ -609,6 +607,7 @@ def download_playlist(client: SoundCloud, playlist: BasicAlbumPlaylist, **kwargs
|
||||
if not kwargs.get("no_playlist_folder"):
|
||||
os.chdir("..")
|
||||
|
||||
|
||||
def try_utime(path, filetime):
|
||||
try:
|
||||
os.utime(path, (time.time(), filetime))
|
||||
@@ -616,6 +615,22 @@ def try_utime(path, filetime):
|
||||
logger.error("Cannot update utime of file")
|
||||
|
||||
|
||||
def is_downloading_to_stdout(**kwargs) -> bool:
|
||||
return kwargs.get('name_format') == '-'
|
||||
|
||||
|
||||
def get_stdout():
|
||||
# Credits: https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/utils/_utils.py#L575
|
||||
if sys.platform == 'win32':
|
||||
import msvcrt
|
||||
|
||||
# stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
|
||||
with contextlib.suppress(io.UnsupportedOperation):
|
||||
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
|
||||
|
||||
return getattr(sys.stdout, 'buffer', sys.stdout)
|
||||
|
||||
|
||||
def get_filename(
|
||||
track: BasicTrack,
|
||||
ext: Optional[str] = None,
|
||||
@@ -623,6 +638,9 @@ def get_filename(
|
||||
playlist_info: Optional[PlaylistInfo] = None,
|
||||
**kwargs,
|
||||
):
|
||||
# Force stdout name on tracks that are being downloaded to stdout
|
||||
if is_downloading_to_stdout(**kwargs):
|
||||
return 'stdout'
|
||||
|
||||
username = track.user.username
|
||||
title = track.title.encode("utf-8", "ignore").decode("utf-8")
|
||||
@@ -649,77 +667,31 @@ def get_filename(
|
||||
return filename
|
||||
|
||||
|
||||
def run_ffmpeg_command_with_progress(
|
||||
command: List[str], input_duration_ms: int, hide_progress: bool
|
||||
):
|
||||
def is_progress_line(line: str):
|
||||
x = line.split("=")
|
||||
if len(x) != 2:
|
||||
return False
|
||||
key = x[0]
|
||||
if key not in (
|
||||
"progress",
|
||||
"speed",
|
||||
"drop_frames",
|
||||
"dup_frames",
|
||||
"out_time",
|
||||
"out_time_ms",
|
||||
"out_time_us",
|
||||
"total_size",
|
||||
"bitrate",
|
||||
):
|
||||
return False
|
||||
return True
|
||||
|
||||
command += ["-loglevel", "error", "-progress", "pipe:2", "-stats_period", "0.1"]
|
||||
with subprocess.Popen(command, stderr=subprocess.PIPE, encoding="utf-8") as p:
|
||||
err = ""
|
||||
with tqdm(
|
||||
total=input_duration_ms / 1000, disable=hide_progress, unit="s"
|
||||
) as progress:
|
||||
last_secs = 0
|
||||
for line in p.stderr:
|
||||
if not is_progress_line(line):
|
||||
err += line
|
||||
elif line.startswith("out_time_ms"):
|
||||
try:
|
||||
# actually in microseconds
|
||||
# the name is a lie
|
||||
secs = int(line.split("=")[1]) / 1_000_000
|
||||
except ValueError:
|
||||
secs = 0
|
||||
changed = secs - last_secs
|
||||
last_secs = secs
|
||||
progress.update(changed)
|
||||
progress.update(input_duration_ms / 1000 - last_secs)
|
||||
if p.returncode != 0:
|
||||
raise SoundCloudException(f"FFmpeg error: {err}")
|
||||
|
||||
|
||||
def download_original_file(
|
||||
client: SoundCloud,
|
||||
track: BasicTrack,
|
||||
title: str,
|
||||
playlist_info: Optional[PlaylistInfo] = None,
|
||||
**kwargs,
|
||||
):
|
||||
) -> Tuple[Optional[str], bool]:
|
||||
logger.info("Downloading the original file.")
|
||||
to_stdout = is_downloading_to_stdout(**kwargs)
|
||||
|
||||
# Get the requests stream
|
||||
url = client.get_track_original_download(track.id, track.secret_token)
|
||||
|
||||
if not url:
|
||||
logger.info("Could not get original download link")
|
||||
return (None, False)
|
||||
return None, False
|
||||
|
||||
r = requests.get(url, stream=True)
|
||||
if r.status_code == 401:
|
||||
logger.info("The original file has no download left.")
|
||||
return (None, False)
|
||||
return None, False
|
||||
|
||||
if r.status_code == 404:
|
||||
logger.info("Could not get name from stream - using basic name")
|
||||
return (None, False)
|
||||
return None, False
|
||||
|
||||
# Find filename
|
||||
header = r.headers.get("content-disposition")
|
||||
@@ -729,78 +701,44 @@ def download_original_file(
|
||||
else:
|
||||
raise SoundCloudException(f"Could not get filename from content-disposition header: {header}")
|
||||
|
||||
orig_filename = filename
|
||||
_, ext = os.path.splitext(filename)
|
||||
|
||||
if not kwargs.get("original_name"):
|
||||
filename, ext = os.path.splitext(filename)
|
||||
orig_filename, ext = os.path.splitext(filename)
|
||||
|
||||
# Find file extension
|
||||
mime = r.headers.get("content-type")
|
||||
ext = ext or mimetypes.guess_extension(mime)
|
||||
ext = ext or ("." + r.headers.get("x-amz-meta-file-type"))
|
||||
filename += ext
|
||||
orig_filename += ext
|
||||
|
||||
filename = get_filename(
|
||||
track, original_filename=filename, playlist_info=playlist_info, **kwargs
|
||||
track, original_filename=orig_filename, playlist_info=playlist_info, **kwargs
|
||||
)
|
||||
|
||||
logger.debug(f"filename : {filename}")
|
||||
encoding_to_flac = bool(kwargs.get("flac")) and can_convert(orig_filename)
|
||||
|
||||
if encoding_to_flac:
|
||||
filename = filename[:-4] + ".flac"
|
||||
|
||||
# Skip if file ID or filename already exists
|
||||
if already_downloaded(track, title, filename, **kwargs):
|
||||
if kwargs.get("flac") and can_convert(filename):
|
||||
filename = filename[:-4] + ".flac"
|
||||
return (filename, True)
|
||||
# We are always re-downloading to stdout
|
||||
if not to_stdout and already_downloaded(track, title, filename, **kwargs):
|
||||
return filename, True
|
||||
|
||||
# Write file
|
||||
total_length = int(r.headers.get("content-length"))
|
||||
re_encode_to_out(
|
||||
track,
|
||||
r,
|
||||
ext[1:] if not encoding_to_flac else 'flac',
|
||||
not encoding_to_flac, # copy the stream only if we aren't re-encoding to flac
|
||||
filename,
|
||||
skip_re_encoding=not encoding_to_flac,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
min_size = kwargs.get("min_size") or 0
|
||||
max_size = kwargs.get("max_size") or math.inf # max size of 0 treated as no max size
|
||||
|
||||
if not min_size <= total_length <= max_size:
|
||||
raise SoundCloudException("File not within --min-size and --max-size bounds")
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
received = 0
|
||||
temp_path = pathlib.Path(tmpdir) / "scdl-download"
|
||||
with open(temp_path, "wb") as f:
|
||||
with tqdm.wrapattr(
|
||||
f,
|
||||
"write",
|
||||
total=total_length,
|
||||
disable=bool(kwargs.get("hide_progress")),
|
||||
) as fobj:
|
||||
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
|
||||
if chunk:
|
||||
received += len(chunk)
|
||||
fobj.write(chunk)
|
||||
|
||||
if received != total_length:
|
||||
raise SoundCloudException(
|
||||
"Connection closed prematurely, download incomplete"
|
||||
)
|
||||
|
||||
src_file = temp_path
|
||||
dest_file = filename
|
||||
if kwargs.get("flac") and can_convert(filename):
|
||||
flac_path = pathlib.Path(tmpdir) / "scdl-download-flac"
|
||||
logger.info("Converting to .flac...")
|
||||
command = [
|
||||
"ffmpeg",
|
||||
"-i",
|
||||
temp_path,
|
||||
"-f",
|
||||
"flac",
|
||||
flac_path,
|
||||
]
|
||||
run_ffmpeg_command_with_progress(
|
||||
command, track.duration, bool(kwargs.get("hide_progress"))
|
||||
)
|
||||
src_file = flac_path
|
||||
dest_file = sanitize_str(filename[:-4], ".flac")
|
||||
|
||||
shutil.move(src_file, dest_file)
|
||||
|
||||
return (pathlib.Path(dest_file).name, False)
|
||||
return filename, False
|
||||
|
||||
|
||||
def get_transcoding_m3u8(client: SoundCloud, transcoding: Transcoding, **kwargs):
|
||||
@@ -829,17 +767,15 @@ def download_hls(
|
||||
title: str,
|
||||
playlist_info: Optional[PlaylistInfo] = None,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
) -> Tuple[Optional[str], bool]:
|
||||
if not track.media.transcodings:
|
||||
raise SoundCloudException(f"Track {track.permalink_url} has no transcodings available")
|
||||
|
||||
logger.debug(f"Trancodings: {track.media.transcodings}")
|
||||
logger.debug(f"Transcodings: {track.media.transcodings}")
|
||||
|
||||
transcodings = [t for t in track.media.transcodings if t.format.protocol == "hls"]
|
||||
to_stdout = is_downloading_to_stdout(**kwargs)
|
||||
|
||||
transcoding = None
|
||||
ext = None
|
||||
# ordered in terms of preference best -> worst
|
||||
valid_presets = [("mp3", ".mp3")]
|
||||
|
||||
@@ -865,31 +801,24 @@ def download_hls(
|
||||
filename = get_filename(track, ext=ext, playlist_info=playlist_info, **kwargs)
|
||||
logger.debug(f"filename : {filename}")
|
||||
# Skip if file ID or filename already exists
|
||||
if already_downloaded(track, title, filename, **kwargs):
|
||||
return (filename, True)
|
||||
if not to_stdout and already_downloaded(track, title, filename, **kwargs):
|
||||
return filename, True
|
||||
|
||||
# Get the requests stream
|
||||
url = get_transcoding_m3u8(client, transcoding, **kwargs)
|
||||
filename_path = os.path.abspath(filename)
|
||||
_, ext = os.path.splitext(filename)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
temp_path = pathlib.Path(tmpdir) / ("scdl-download" + ext)
|
||||
command = [
|
||||
"ffmpeg",
|
||||
"-i",
|
||||
url,
|
||||
"-c",
|
||||
"copy",
|
||||
temp_path,
|
||||
]
|
||||
run_ffmpeg_command_with_progress(
|
||||
command, track.duration, bool(kwargs.get("hide_progress"))
|
||||
)
|
||||
re_encode_to_out(
|
||||
track,
|
||||
url,
|
||||
preset_name if preset_name != 'aac' else 'ipod', # We are encoding aac files to m4a, so an ipod codec is used
|
||||
True, # no need to fully re-encode the whole hls stream
|
||||
filename,
|
||||
playlist_info,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
shutil.move(temp_path, filename_path)
|
||||
|
||||
return (filename, False)
|
||||
return filename, False
|
||||
|
||||
|
||||
def download_track(
|
||||
@@ -957,34 +886,32 @@ def download_track(
|
||||
|
||||
record_download_archive(track, **kwargs)
|
||||
|
||||
to_stdout = is_downloading_to_stdout(**kwargs)
|
||||
|
||||
# Skip if file ID or filename already exists
|
||||
if is_already_downloaded and not kwargs.get("force_metadata"):
|
||||
raise SoundCloudException(f"{filename} already downloaded.")
|
||||
|
||||
# If file does not exist an error occurred
|
||||
if not os.path.isfile(filename):
|
||||
# If we are downloading to stdout and reached this point, then most likely we downloaded the track
|
||||
if not os.path.isfile(filename) and not to_stdout:
|
||||
raise SoundCloudException(f"An error occurred downloading {filename}.")
|
||||
|
||||
# Try to set the metadata
|
||||
if not (downloaded_original and kwargs.get("original_metadata")) and (
|
||||
filename.endswith(".mp3")
|
||||
or filename.endswith(".flac")
|
||||
or filename.endswith(".m4a")
|
||||
or filename.endswith(".wav")
|
||||
or filename.endswith(".opus")
|
||||
):
|
||||
try:
|
||||
set_metadata(track, filename, playlist_info, **kwargs)
|
||||
except Exception:
|
||||
os.remove(filename)
|
||||
logger.exception("Error trying to set the tags...")
|
||||
raise SoundCloudException("Error trying to set the tags...")
|
||||
else:
|
||||
logger.error("This type of audio doesn't support tagging...")
|
||||
# Add metadata to an already existing file if needed
|
||||
if is_already_downloaded and kwargs.get('force_metadata'):
|
||||
with open(filename, 'rb') as f:
|
||||
file_data = io.BytesIO(f.read())
|
||||
|
||||
_add_metadata_to_stream(track, file_data, playlist_info, **kwargs)
|
||||
|
||||
with open(filename, 'wb') as f:
|
||||
file_data.seek(0)
|
||||
f.write(file_data.getbuffer())
|
||||
|
||||
# Try to change the real creation date
|
||||
filetime = int(time.mktime(track.created_at.timetuple()))
|
||||
try_utime(filename, filetime)
|
||||
if not to_stdout:
|
||||
filetime = int(time.mktime(track.created_at.timetuple()))
|
||||
try_utime(filename, filetime)
|
||||
|
||||
logger.info(f"{filename} Downloaded.\n")
|
||||
except SoundCloudException as err:
|
||||
@@ -1071,133 +998,373 @@ def record_download_archive(track: BasicTrack, **kwargs):
|
||||
logger.error(ioe)
|
||||
|
||||
|
||||
def set_metadata(
|
||||
def _try_get_artwork(url: str, size: str = 'original') -> Optional[requests.Response]:
|
||||
new_artwork_url = url.replace("large", size)
|
||||
|
||||
try:
|
||||
artwork_response = requests.get(new_artwork_url, allow_redirects=False, timeout=5)
|
||||
|
||||
if artwork_response.status_code != 200:
|
||||
return None
|
||||
|
||||
content_type = artwork_response.headers.get('Content-Type', '').lower()
|
||||
if content_type not in ('image/png', 'image/jpeg', 'image/jpg'):
|
||||
return None
|
||||
|
||||
return artwork_response
|
||||
except requests.RequestException:
|
||||
return None
|
||||
|
||||
|
||||
def build_ffmpeg_encoding_args(
|
||||
input_file: str,
|
||||
output_file: str,
|
||||
out_codec: str,
|
||||
*args,
|
||||
) -> List[str]:
|
||||
return [
|
||||
'ffmpeg',
|
||||
|
||||
# Disable all the useless stuff
|
||||
'-loglevel', 'error',
|
||||
'-hide_banner',
|
||||
|
||||
# Input stream
|
||||
'-i', input_file,
|
||||
|
||||
# Encoding
|
||||
'-f', out_codec,
|
||||
|
||||
# Progress to stderr
|
||||
'-progress', 'pipe:2',
|
||||
'-stats_period', '0.1',
|
||||
|
||||
# User provided arguments
|
||||
*args,
|
||||
|
||||
# Output file
|
||||
output_file
|
||||
]
|
||||
|
||||
|
||||
def _write_streaming_response_to_pipe(
|
||||
response: requests.Response,
|
||||
pipe: Union[IO[bytes], io.BytesIO],
|
||||
**kwargs,
|
||||
) -> None:
|
||||
total_length = int(response.headers.get("content-length"))
|
||||
|
||||
min_size = kwargs.get("min_size") or 0
|
||||
max_size = kwargs.get("max_size") or math.inf # max size of 0 treated as no max size
|
||||
|
||||
if not min_size <= total_length <= max_size:
|
||||
raise SoundCloudException("File not within --min-size and --max-size bounds")
|
||||
|
||||
logger.info('Receiving the streaming response')
|
||||
received = 0
|
||||
chunk_size = 8192
|
||||
|
||||
with memoryview(bytearray(chunk_size)) as buffer:
|
||||
for chunk in tqdm(
|
||||
iter(lambda: response.raw.read(chunk_size), b''),
|
||||
total=(total_length / chunk_size) + 1,
|
||||
disable=bool(kwargs.get('hide_progress')),
|
||||
unit='Kb',
|
||||
unit_scale=chunk_size / 1024,
|
||||
):
|
||||
if not chunk:
|
||||
break
|
||||
|
||||
buffer_view = buffer[:len(chunk)]
|
||||
buffer_view[:] = chunk
|
||||
|
||||
received += len(chunk)
|
||||
pipe.write(buffer_view)
|
||||
|
||||
pipe.flush()
|
||||
|
||||
if received != total_length:
|
||||
logger.error("connection closed prematurely, download incomplete")
|
||||
sys.exit(1)
|
||||
|
||||
if not isinstance(pipe, io.BytesIO):
|
||||
pipe.close()
|
||||
|
||||
|
||||
def _add_metadata_to_stream(
|
||||
track: BasicTrack,
|
||||
filename: str,
|
||||
stream: io.BytesIO,
|
||||
playlist_info: Optional[PlaylistInfo] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Sets the track file metadata using the Python module Mutagen
|
||||
"""
|
||||
logger.info("Setting tags...")
|
||||
artwork_url = track.artwork_url
|
||||
user = track.user
|
||||
if not artwork_url:
|
||||
artwork_url = user.avatar_url
|
||||
response = None
|
||||
) -> None:
|
||||
logger.info("Applying metadata...")
|
||||
|
||||
artwork_base_url = track.artwork_url or track.user.avatar_url
|
||||
artwork_response = None
|
||||
|
||||
if kwargs.get("original_art"):
|
||||
new_artwork_url = artwork_url.replace("large", "original")
|
||||
try:
|
||||
response = requests.get(new_artwork_url, stream=True)
|
||||
if response.headers["Content-Type"] not in (
|
||||
"image/png",
|
||||
"image/jpeg",
|
||||
"image/jpg",
|
||||
):
|
||||
response = None
|
||||
except Exception:
|
||||
pass
|
||||
if response is None:
|
||||
new_artwork_url = artwork_url.replace("large", "t500x500")
|
||||
response = requests.get(new_artwork_url, stream=True)
|
||||
if response.headers["Content-Type"] not in (
|
||||
"image/png",
|
||||
"image/jpeg",
|
||||
"image/jpg",
|
||||
):
|
||||
response = None
|
||||
if response is None:
|
||||
logger.error(f"Could not get cover art at {new_artwork_url}")
|
||||
with tempfile.NamedTemporaryFile() as out_file:
|
||||
if response:
|
||||
shutil.copyfileobj(response.raw, out_file)
|
||||
out_file.seek(0)
|
||||
artwork_response = _try_get_artwork(artwork_base_url, 'original')
|
||||
|
||||
track.date = track.created_at.strftime("%Y-%m-%d %H::%M::%S")
|
||||
if artwork_response is None:
|
||||
artwork_response = _try_get_artwork(artwork_base_url, 't500x500')
|
||||
|
||||
track.artist = user.username
|
||||
if kwargs.get("extract_artist"):
|
||||
for dash in [" - ", " − ", " – ", " — ", " ― "]:
|
||||
if dash in track.title:
|
||||
artist_title = track.title.split(dash)
|
||||
track.artist = artist_title[0].strip()
|
||||
track.title = artist_title[1].strip()
|
||||
break
|
||||
mutagen_file = mutagen.File(filename)
|
||||
mutagen_file.delete()
|
||||
if track.description:
|
||||
if mutagen_file.__class__ == mutagen.flac.FLAC:
|
||||
mutagen_file["description"] = track.description
|
||||
elif mutagen_file.__class__ == mutagen.mp3.MP3 or mutagen_file.__class__ == mutagen.wave.WAVE:
|
||||
mutagen_file["COMM"] = mutagen.id3.COMM(
|
||||
encoding=3, lang="ENG", text=track.description
|
||||
)
|
||||
elif mutagen_file.__class__ == mutagen.mp4.MP4:
|
||||
mutagen_file["\xa9cmt"] = track.description
|
||||
elif mutagen_file.__class__ == mutagen.oggopus.OggOpus:
|
||||
mutagen_file["comment"] = track.description
|
||||
if response:
|
||||
if mutagen_file.__class__ == mutagen.flac.FLAC:
|
||||
p = mutagen.flac.Picture()
|
||||
p.data = out_file.read()
|
||||
p.mime = "image/jpeg"
|
||||
p.type = mutagen.id3.PictureType.COVER_FRONT
|
||||
mutagen_file.add_picture(p)
|
||||
elif mutagen_file.__class__ == mutagen.mp3.MP3 or mutagen_file.__class__ == mutagen.wave.WAVE:
|
||||
mutagen_file["APIC"] = mutagen.id3.APIC(
|
||||
encoding=3,
|
||||
mime="image/jpeg",
|
||||
type=3,
|
||||
desc="Cover",
|
||||
data=out_file.read(),
|
||||
)
|
||||
elif mutagen_file.__class__ == mutagen.mp4.MP4:
|
||||
mutagen_file["covr"] = [mutagen.mp4.MP4Cover(out_file.read())]
|
||||
elif mutagen_file.__class__ == mutagen.oggopus.OggOpus:
|
||||
p = mutagen.flac.Picture()
|
||||
p.data = out_file.read()
|
||||
p.mime = "image/jpeg"
|
||||
p.type = mutagen.id3.PictureType.COVER_FRONT
|
||||
picture_data = p.write()
|
||||
b64_str = base64.b64encode(picture_data).decode()
|
||||
mutagen_file["metadata_block_picture"] = b64_str
|
||||
artist: str = track.user.username
|
||||
if bool(kwargs.get('extract_artist')):
|
||||
for dash in {" - ", " − ", " – ", " — ", " ― "}:
|
||||
if dash not in track.title:
|
||||
continue
|
||||
|
||||
if mutagen_file.__class__ == mutagen.wave.WAVE:
|
||||
mutagen_file["TIT2"] = mutagen.id3.TIT2(encoding=3, text=track.title)
|
||||
mutagen_file["TPE1"] = mutagen.id3.TPE1(encoding=3, text=track.artist)
|
||||
if track.genre:
|
||||
mutagen_file["TCON"] = mutagen.id3.TCON(encoding=3, text=track.genre)
|
||||
if track.permalink_url:
|
||||
mutagen_file["WOAS"] = mutagen.id3.WOAS(url=track.permalink_url)
|
||||
if track.date:
|
||||
mutagen_file["TDAT"] = mutagen.id3.TDAT(encoding=3, text=track.date)
|
||||
if playlist_info:
|
||||
if not kwargs.get("no_album_tag"):
|
||||
mutagen_file["TALB"] = mutagen.id3.TALB(encoding=3, text=playlist_info["title"])
|
||||
mutagen_file["TPE2"] = mutagen.id3.TPE2(
|
||||
encoding=3, text=playlist_info["author"]
|
||||
)
|
||||
mutagen_file["TRCK"] = mutagen.id3.TRCK(encoding=3, text=str(playlist_info["tracknumber"]))
|
||||
mutagen_file.save()
|
||||
else:
|
||||
mutagen_file.save()
|
||||
audio = mutagen.File(filename, easy=True)
|
||||
audio["title"] = track.title
|
||||
audio["artist"] = track.artist
|
||||
if track.genre:
|
||||
audio["genre"] = track.genre
|
||||
if track.permalink_url:
|
||||
audio["website"] = track.permalink_url
|
||||
if track.date:
|
||||
audio["date"] = track.date
|
||||
if playlist_info:
|
||||
if not kwargs.get("no_album_tag"):
|
||||
audio["album"] = playlist_info["title"]
|
||||
audio["albumartist"] = playlist_info["author"]
|
||||
audio["tracknumber"] = str(playlist_info["tracknumber"])
|
||||
artist_title = track.title.split(dash, maxsplit=1)
|
||||
artist = artist_title[0].strip()
|
||||
track.title = artist_title[1].strip()
|
||||
break
|
||||
|
||||
audio.save()
|
||||
album_available: bool = playlist_info and not kwargs.get("no_album_tag")
|
||||
|
||||
metadata = MetadataInfo(
|
||||
artist=artist,
|
||||
title=track.title,
|
||||
description=track.description,
|
||||
genre=track.genre,
|
||||
artwork_jpeg=artwork_response.content if artwork_response else None,
|
||||
link=track.permalink_url,
|
||||
date=track.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
album_title=playlist_info["title"] if album_available else None,
|
||||
album_author=playlist_info["author"] if album_available else None,
|
||||
album_track_num=playlist_info["tracknumber"] if album_available else None,
|
||||
)
|
||||
|
||||
mutagen_file = mutagen.File(stream)
|
||||
|
||||
handler = METADATA_ASSEMBLERS.get(type(mutagen_file), None)
|
||||
if handler is None:
|
||||
logger.error('Metadata assembling for this track is unsupported.\n'
|
||||
'Please create an issue at https://github.com/flyingrub/scdl/issues and we will look into it')
|
||||
|
||||
kwargs_no_sensitive = {k: v for k, v in kwargs.items() if k not in ('auth_token',)}
|
||||
logger.error(f'Here is the information that you should attach to your issue:\n'
|
||||
f'- Track: {track.permalink_url}\n'
|
||||
f'- First 16 bytes: {stream.getvalue()[:16].hex()}\n'
|
||||
f'- Identified as: {type(mutagen_file)}\n'
|
||||
f'- Configuration: {kwargs_no_sensitive}')
|
||||
return
|
||||
|
||||
# Delete all the existing tags and write our own tags
|
||||
stream.seek(0)
|
||||
mutagen_file.delete(stream)
|
||||
handler(mutagen_file, metadata)
|
||||
|
||||
stream.seek(0)
|
||||
mutagen_file.save(stream)
|
||||
|
||||
|
||||
def re_encode_to_out(
|
||||
track: BasicTrack,
|
||||
in_data: Union[requests.Response, str],
|
||||
out_codec: str,
|
||||
should_copy: bool,
|
||||
filename: str,
|
||||
playlist_info: Optional[PlaylistInfo] = None,
|
||||
skip_re_encoding: bool = False,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
to_stdout = is_downloading_to_stdout(**kwargs)
|
||||
|
||||
encoded = re_encode_to_buffer(
|
||||
track,
|
||||
in_data,
|
||||
out_codec,
|
||||
should_copy,
|
||||
playlist_info,
|
||||
skip_re_encoding,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
out_handle = get_stdout() if to_stdout else open(filename, 'wb')
|
||||
shutil.copyfileobj(encoded, out_handle)
|
||||
|
||||
if not to_stdout:
|
||||
out_handle.close()
|
||||
|
||||
|
||||
def _is_ffmpeg_progress_line(parameters: List[str]):
|
||||
return len(parameters) == 2 and parameters[0] in (
|
||||
"progress",
|
||||
"speed",
|
||||
"drop_frames",
|
||||
"dup_frames",
|
||||
"out_time",
|
||||
"out_time_ms",
|
||||
"out_time_us",
|
||||
"total_size",
|
||||
"bitrate",
|
||||
)
|
||||
|
||||
|
||||
def _get_ffmpeg_pipe(
|
||||
in_data: Union[requests.Response, str], # streaming response or url
|
||||
out_codec: str,
|
||||
should_copy: bool,
|
||||
output_file: str,
|
||||
) -> subprocess.Popen:
|
||||
is_url: bool = isinstance(in_data, str)
|
||||
logger.info("Creating the ffmpeg pipe...")
|
||||
|
||||
commands = build_ffmpeg_encoding_args(
|
||||
in_data if is_url else '-',
|
||||
output_file,
|
||||
out_codec,
|
||||
*(('-c', 'copy',) if should_copy else ())
|
||||
)
|
||||
|
||||
logger.debug(f"ffmpeg command: {' '.join(commands)}")
|
||||
pipe = subprocess.Popen(
|
||||
commands,
|
||||
stdin=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
bufsize=FFMPEG_PIPE_CHUNK_SIZE,
|
||||
)
|
||||
|
||||
# Wrap stderr with TextIOWrapper for automatic decoding
|
||||
pipe.stderr = io.TextIOWrapper(pipe.stderr, encoding='utf-8', errors=None)
|
||||
return pipe
|
||||
|
||||
|
||||
def _is_unsupported_codec_for_streaming(codec: str) -> bool:
|
||||
return codec in ('ipod',)
|
||||
|
||||
|
||||
def _re_encode_ffmpeg(
|
||||
in_data: Union[requests.Response, str], # streaming response or url
|
||||
out_codec: str,
|
||||
track_duration_ms: int,
|
||||
should_copy: bool,
|
||||
**kwargs,
|
||||
) -> io.BytesIO:
|
||||
streaming_supported = not _is_unsupported_codec_for_streaming(out_codec)
|
||||
|
||||
out_file_name = 'pipe:1' # stdout
|
||||
if not streaming_supported:
|
||||
out_file_name = str(pathlib.Path(tempfile.gettempdir()) / secrets.token_hex(8))
|
||||
|
||||
pipe = _get_ffmpeg_pipe(in_data, out_codec, should_copy, out_file_name)
|
||||
|
||||
logger.info('Encoding..')
|
||||
errors_output = ''
|
||||
stdout = io.BytesIO()
|
||||
|
||||
# Sadly, we have to iterate both stdout and stderr at the same times in order for things to work.
|
||||
# This is why we have 2 threads that are reading stderr, and writing stuff to stdin at the same time.
|
||||
# I don't think there is any other way how to get this working and make it as fast as it is now.
|
||||
|
||||
# A function that reads encoded track to our `stdout` BytesIO object
|
||||
def read_stdout():
|
||||
for chunk in iter(lambda: pipe.stdout.read(FFMPEG_PIPE_CHUNK_SIZE), b''):
|
||||
stdout.write(chunk)
|
||||
pipe.stdout.close()
|
||||
|
||||
stdout_thread = None
|
||||
stdin_thread = None
|
||||
|
||||
# Read from stdout only if we expect ffmpeg to write something there
|
||||
if streaming_supported:
|
||||
stdout_thread = threading.Thread(target=read_stdout, daemon=True)
|
||||
|
||||
# Stream the response to ffmpeg if needed
|
||||
if isinstance(in_data, requests.Response):
|
||||
assert pipe.stdin is not None
|
||||
stdin_thread = threading.Thread(
|
||||
target=_write_streaming_response_to_pipe,
|
||||
args=(in_data, pipe.stdin,),
|
||||
kwargs=kwargs,
|
||||
daemon=True,
|
||||
)
|
||||
|
||||
# Start the threads
|
||||
if stdout_thread:
|
||||
stdout_thread.start()
|
||||
if stdin_thread:
|
||||
stdin_thread.start()
|
||||
|
||||
# Read progress from stderr line by line
|
||||
total_sec = track_duration_ms / 1000
|
||||
with tqdm(
|
||||
total=total_sec,
|
||||
disable=bool(kwargs.get("hide_progress")),
|
||||
unit="s"
|
||||
) as progress:
|
||||
last_secs = 0
|
||||
for line in iter(pipe.stderr.readline, ''):
|
||||
parameters = line.split('=', maxsplit=1)
|
||||
if not _is_ffmpeg_progress_line(parameters):
|
||||
errors_output += line
|
||||
continue
|
||||
|
||||
if not line.startswith('out_time_ms'):
|
||||
continue
|
||||
|
||||
try:
|
||||
seconds = int(parameters[1]) / 1_000_000
|
||||
except ValueError:
|
||||
seconds = 0
|
||||
|
||||
seconds = min(seconds, total_sec) # clamp just to be sure
|
||||
changed = seconds - last_secs
|
||||
last_secs = seconds
|
||||
progress.update(changed)
|
||||
|
||||
# Wait for threads to finish
|
||||
if stdout_thread:
|
||||
stdout_thread.join()
|
||||
if stdin_thread:
|
||||
stdin_thread.join()
|
||||
|
||||
# Make sure that process has exited and get its exit code
|
||||
pipe.wait()
|
||||
if pipe.returncode != 0:
|
||||
raise SoundCloudException(f'FFmpeg error({pipe.returncode}): {errors_output}')
|
||||
|
||||
# Read from the temp file, if needed
|
||||
if not streaming_supported:
|
||||
with open(out_file_name, 'rb') as f:
|
||||
shutil.copyfileobj(f, stdout)
|
||||
os.remove(out_file_name)
|
||||
|
||||
stdout.seek(0)
|
||||
return stdout
|
||||
|
||||
|
||||
def _copy_stream(
|
||||
in_data: requests.Response, # streaming response or url
|
||||
**kwargs,
|
||||
) -> io.BytesIO:
|
||||
result = io.BytesIO()
|
||||
_write_streaming_response_to_pipe(in_data, result, **kwargs)
|
||||
result.seek(0)
|
||||
return result
|
||||
|
||||
|
||||
def re_encode_to_buffer(
|
||||
track: BasicTrack,
|
||||
in_data: Union[requests.Response, str], # streaming response or url
|
||||
out_codec: str,
|
||||
should_copy: bool,
|
||||
playlist_info: Optional[PlaylistInfo] = None,
|
||||
skip_re_encoding: bool = False,
|
||||
**kwargs,
|
||||
) -> io.BytesIO:
|
||||
if skip_re_encoding and isinstance(in_data, requests.Response):
|
||||
encoded_data = _copy_stream(in_data, **kwargs)
|
||||
else:
|
||||
encoded_data = _re_encode_ffmpeg(in_data, out_codec, track.duration, should_copy, **kwargs)
|
||||
|
||||
# Remove original metadata, add our own, and we are done
|
||||
if not kwargs.get("original_metadata"):
|
||||
_add_metadata_to_stream(track, encoded_data, playlist_info, **kwargs)
|
||||
|
||||
encoded_data.seek(0)
|
||||
return encoded_data
|
||||
|
||||
|
||||
def is_ffmpeg_available():
|
||||
@@ -1206,5 +1373,6 @@ def is_ffmpeg_available():
|
||||
"""
|
||||
return shutil.which("ffmpeg") is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -16,6 +16,57 @@ def test_original_download(tmp_path: Path):
|
||||
assert_track(tmp_path, "track.wav", "copy", "saves", None)
|
||||
|
||||
|
||||
def test_original_to_stdout(tmp_path: Path):
|
||||
os.chdir(tmp_path)
|
||||
r = call_scdl_with_auth(
|
||||
"-l",
|
||||
"https://soundcloud.com/57v/original",
|
||||
"--name-format",
|
||||
"-",
|
||||
encoding=None,
|
||||
)
|
||||
assert r.returncode == 0
|
||||
with open('track.wav', 'wb') as f:
|
||||
f.write(r.stdout)
|
||||
assert_track(tmp_path, "track.wav", "copy", "saves", None)
|
||||
|
||||
|
||||
def test_mp3_to_stdout(tmp_path: Path):
|
||||
os.chdir(tmp_path)
|
||||
r = call_scdl_with_auth(
|
||||
"-l",
|
||||
"https://soundcloud.com/one-thousand-and-one/test-track",
|
||||
"--onlymp3",
|
||||
"--name-format",
|
||||
"-",
|
||||
encoding=None,
|
||||
)
|
||||
assert r.returncode == 0
|
||||
|
||||
with open('track.mp3', 'wb') as f:
|
||||
f.write(r.stdout)
|
||||
|
||||
assert_track(tmp_path, "track.mp3")
|
||||
|
||||
|
||||
def test_flac_to_stdout(tmp_path: Path):
|
||||
os.chdir(tmp_path)
|
||||
r = call_scdl_with_auth(
|
||||
"-l",
|
||||
"https://soundcloud.com/57v/original",
|
||||
"--name-format",
|
||||
"-",
|
||||
"--flac",
|
||||
encoding=None,
|
||||
)
|
||||
|
||||
with open('track.flac', 'wb') as f:
|
||||
f.write(r.stdout)
|
||||
|
||||
assert r.returncode == 0
|
||||
assert_track(tmp_path, "track.flac", "copy", "saves", None)
|
||||
|
||||
|
||||
def test_flac(tmp_path: Path):
|
||||
os.chdir(tmp_path)
|
||||
r = call_scdl_with_auth(
|
||||
@@ -40,6 +91,7 @@ def test_m4a(tmp_path: Path):
|
||||
"--opus",
|
||||
)
|
||||
assert r.returncode == 0
|
||||
assert not (tmp_path / 'track.opus').exists(), 'Please make sure that you have a GO+ subscription'
|
||||
assert_track(
|
||||
tmp_path,
|
||||
"track.m4a",
|
||||
@@ -149,6 +201,7 @@ def test_force_metadata(tmp_path: Path):
|
||||
"track",
|
||||
"--force-metadata",
|
||||
)
|
||||
assert r.returncode == 0
|
||||
assert_track(tmp_path, "track.wav", "copy", "saves", None)
|
||||
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from soundcloud import SoundCloud
|
||||
client_id = SoundCloud().client_id
|
||||
|
||||
|
||||
def call_scdl_with_auth(*args) -> subprocess.CompletedProcess[str]:
|
||||
def call_scdl_with_auth(*args, encoding: Optional[str] = 'utf-8') -> subprocess.CompletedProcess[str]:
|
||||
auth_token = os.getenv("AUTH_TOKEN")
|
||||
assert auth_token
|
||||
args = (
|
||||
@@ -17,7 +17,8 @@ def call_scdl_with_auth(*args) -> subprocess.CompletedProcess[str]:
|
||||
+ list(args)
|
||||
+ [f"--auth-token={auth_token}", f"--client-id={client_id}"]
|
||||
)
|
||||
return subprocess.run(args, capture_output=True, encoding="utf-8")
|
||||
return subprocess.run(args, capture_output=True, encoding=encoding,
|
||||
errors='ignore' if encoding is not None else None)
|
||||
|
||||
|
||||
def assert_track(
|
||||
|
||||
Reference in New Issue
Block a user