From 9c908480fb8885902074bf2843573fd4579d0340 Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Mon, 2 Oct 2023 11:20:49 +0100 Subject: [PATCH] Init --- LICENSE | 5 + regain.py | 92 +++++ regainer.py | 1113 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1210 insertions(+) create mode 100644 LICENSE create mode 100644 regain.py create mode 100644 regainer.py diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..fea0740 --- /dev/null +++ b/LICENSE @@ -0,0 +1,5 @@ +Please see license details in each individual file. + +Some files are not authored by me + +If no license is provided in a file, assume: (c) Frederick Boniface, All Rights Reserved. \ No newline at end of file diff --git a/regain.py b/regain.py new file mode 100644 index 0000000..85596da --- /dev/null +++ b/regain.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 + +# regain - search folders below working directory and call regainer on each album found +# Copyright Frederick Boniface 2023 - fredboniface.co.uk +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import subprocess +import logging +import argparse + +FORMATS = ["flac", "m4a", "aac", "alac", "mp3", "m3a", "ogg", "opus", "oga"] +PATH_TO_REGAINER = "./regainer.py" + +def parse_arguments(): + parser = argparse.ArgumentParser(description='Call regainer.py on subdirectories containing music files') + parser.add_argument('--debug', action='store_true', help='Enable debug logging') + return parser.parse_args() + +def configureLogging(debug_mode = False): + log_level = logging.DEBUG if debug_mode else logging.INFO + logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s') + +def getWorkingDirectory(): + return os.getcwd() + +def find_files_by_extension(workdir): + file_lists = [] + + for root, _, files in os.walk(workdir): + matching_files = [os.path.join(root, file) for file in files if file.endswith(tuple(FORMATS))] + if matching_files: + file_lists.append(matching_files) + return file_lists + +def countFiles(file_list_of_lists): + count = 0 + for item in file_list_of_lists: + count = count + len(item) + return count + +if __name__ == "__main__": + args = parse_arguments() + configureLogging(args.debug) + workdir = getWorkingDirectory() + logging.info(f"regain.py WorkDir: {workdir}") + logging.debug("Debug logs enabled") + file_lists = find_files_by_extension(workdir) + logging.debug(f"File lists: {file_lists}") + logging.info(f"Files for processing: {countFiles(file_lists)}") + + cmd = ["python", PATH_TO_REGAINER, "-f"] + for sublist in file_lists: + cmd.append("-a") + for item in sublist: + cmd.append(item) + + logging.debug(f"Running command: {cmd}") + logging.info("Running regainer.py") + + if args.debug: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + output = result.stdout + error = result.stderr + # Handle the captured output as needed + print("Output:", output) + print("Error:", error) + else: + result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + logging.debug(f"regainer.py has exited with code {result.returncode}") + if result.returncode != 0: + print("You can re-run with --debug to view the output from regainer") + else: + print("Files successfully tagged") \ No newline at end of file diff --git a/regainer.py b/regainer.py new file mode 100644 index 0000000..4bec33c --- /dev/null +++ b/regainer.py @@ -0,0 +1,1113 @@ +#!/usr/bin/env python3 + +# regainer - advanced ReplayGain tagging +# Copyright 2016 Calvin Walton +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Advanced ReplayGain scanner and tagger""" + +__version__ = "1.0.0" + +import subprocess +import argparse +import asyncio +import multiprocessing +import re +import mutagen +from math import log10 +from enum import Enum +import sys +import logging +import decimal + +logger = logging.getLogger(__name__) + + +class AlbumAction(argparse.Action): + def __init__(self, option_strings, dest, **kwargs): + super(AlbumAction, self).__init__(option_strings, dest, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + if namespace.album is None: + namespace.album = list() + if option_string == "-a" or option_string == "--album": + namespace.album.append({"track": list(values), "exclude": list()}) + if option_string == "-e" or option_string == "--exclude": + if len(namespace.album) == 0: + if namespace.exclude is None: + namespace.exclude = values + else: + namespace.exclude.extend(values) + else: + namespace.album[-1]["exclude"].extend(values) + + +class TrackAction(argparse.Action): + def __init__(self, option_strings, dest, **kwargs): + super(TrackAction, self).__init__(option_strings, dest, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + if namespace.track is None: + namespace.track = list(values) + else: + namespace.track.extend(values) + + +class GainInfo: + def __init__(self, loudness=None, album_loudness=None, peak=None, album_peak=None): + self.loudness = loudness + self.album_loudness = album_loudness + self.peak = peak + self.album_peak = album_peak + + def __repr__(self): + return ( + "GainInfo(loudness={}, peak={}, album_loudness={}, album_peak={})".format( + repr(self.loudness), + repr(self.peak), + repr(self.album_loudness), + repr(self.album_peak), + ) + ) + + def __str__(self): + str = "" + str += "Track: " + if self.loudness is None: + str += "I: None" + else: + str += "I: {:.2f} LUFS".format(self.loudness) + str += ", " + if self.peak is None: + str += "Peak: None" + else: + str += "Peak: {:.2f} dBFS".format(self.peak) + str += "; Album: " + if self.album_loudness is None: + str += "I: None" + else: + str += "I: {:.2f} LUFS".format(self.album_loudness) + str += ", " + if self.album_peak is None: + str += "Peak: None" + else: + str += "Peak: {:.2f} dBFS".format(self.album_peak) + return str + + +class OggOpusMode(Enum): + R128 = 1 + """ + Write R128 gain tags as specified in the Ogg Opus encapsulation doc. + + This writes the tags R128_TRACK_GAIN and R128_ALBUM_GAIN to the file, + and removes any REPLAYGAIN tags that may be present. The R128 gain tags + use the EBU R128 -23 LUFS reference level. + + This method is the most standards compliant, but has limited application + compatibility. + """ + + REPLAYGAIN = 2 + """ + Write REPLAYGAIN tags compatible with FLAC, Vorbis, etc. + + This writes the tags REPLAYGAIN_{TRACK,ALBUM}_{GAIN,PEAK} to the file, + and removes any R128 gain tags that may be present. The REPLAYGAIN tags + use the -18 LUFS reference level from ReplayGain 2.0. + + This method is against the spirit of the specifications, but has good + application compatibility (most applications share parsing code between + Opus tags and other formats). + """ + + COMPATIBLE = 3 + """ + Write both R128 gain and REPLAYGAIN tags. + + This writes the tags from both the "standard" and "replaygain" modes at + the same time. + + This method is against the spirit of the specifications, but ensures + maximum application compatibility. Since the same computed gain value + is used to generate both sets of tags (adjusted the the appropriate + different reference levels), it doesn't matter which one the application + uses - the result will be the same. + + This is the default tagging method. + """ + + +class ID3Mode(Enum): + REPLAYGAIN = 1 + """Write REPLAYGAIN tags according to the ReplayGain 2.0 spec.""" + + RVA2 = 2 + """Use the ID3v2 RVA2 frames to store ReplayGain information.""" + + COMPATIBLE = 3 + """ + Write both REPLAYGAIN tags and RVA2 frames. + + This is the default tagging method. + """ + + +class Tagger: + REPLAYGAIN_REF = -18.0 # LUFS + R128_REF = -23.0 # LUFS + + ogg_opus_mode = OggOpusMode.COMPATIBLE + id3_mode = ID3Mode.COMPATIBLE + + def __init__(self, filename): + self.filename = filename + self.tags = GainInfo() + self.need_album_update = False + self.need_track_update = False + + rg_gain_re = re.compile(r"^\s*([+-]?\d+(?:\.\d+)?)") + + def parse_rg_gain(self, value): + m = self.rg_gain_re.match(value) + if m: + return self.REPLAYGAIN_REF - float(m.group(1)) + return None + + def format_rg_gain(self, loudness): + return "{:.2f} dB".format(self.REPLAYGAIN_REF - loudness) + + rg_peak_re = re.compile(r"^\s*([+-]?\d+(?:\.\d+)?)") + + def parse_rg_peak(self, value): + m = self.rg_peak_re.match(value) + if m: + peak = float(m.group(1)) + if peak > 0.0: + return 20.0 * log10(peak) + else: + return float("-inf") + return None + + def format_rg_peak(self, peak): + return "{:.6f}".format(10.0 ** (peak / 20.0)) + + opus_gain_re = re.compile(r"^\s*([+-]?\d{1,5})") + + def parse_opus_gain(self, value): + m = self.opus_gain_re.match(value) + if m: + return self.R128_REF - float(m.group(1)) / 256.0 + + def format_opus_gain(self, loudness, context): + gain = int((self.R128_REF - loudness) * 256.0) + clipped_gain = max(-32768, min(gain, 32767)) + + if gain != clipped_gain: + logger.warning( + "%s: Clipping OggOpus R128 %s gain adjustment %.2f dB to %.2f dB", + self.filename, + context, + float(gain) / 256, + float(clipped_gain) / 256, + ) + gain = clipped_gain + + return "{:d}".format(gain) + + def format_rva2_gain(self, loudness, context): + int_gain = decimal.Decimal.from_float( + (self.REPLAYGAIN_REF - loudness) * 512 + ).to_integral_value(decimal.ROUND_HALF_EVEN) + clipped_int_gain = max(-32768, min(int_gain, 32767)) + + if int_gain != clipped_int_gain: + logger.warning( + "%s: Clipping ID3 RVA2 %s gain adjustment %.2 dB to %.2f dB", + self.filename, + context, + float(int_gain) / 512, + float(clipped_int_gain) / 512, + ) + int_gain = clipped_int_gain + + return float(int_gain) / 512 + + def format_rva2_peak(self, peak, context): + int_peak = decimal.Decimal.from_float( + (10.0 ** (peak / 20.0)) * 32768 + ).to_integral_value(decimal.ROUND_HALF_EVEN) + clipped_int_peak = min(int_peak, 65535) + + if int_peak != clipped_int_peak: + logger.warning( + "%s: Clipping ID3 RVA2 %s peak %.6f to %.6f", + self.filename, + context, + float(int_peak) / 32768, + float(clipped_int_peak) / 32768, + ) + int_peak = clipped_int_peak + + return float(int_peak) / 32768 + + def read_gain_id3(self): + need_update = False + have_replaygain = False + have_rva2 = False + + # Load the standard REPLAYGAIN tags first + # Case-insensitive matching... + for tag in self.audio.tags.getall("TXXX"): + if tag.desc.lower() == "replaygain_track_gain": + if self.tags.loudness is None: + self.tags.loudness = self.parse_rg_gain(tag.text[0]) + have_replaygain = True + if tag.desc != "REPLAYGAIN_TRACK_GAIN": + need_update = True + elif tag.desc.lower() == "replaygain_track_peak": + if self.tags.peak is None: + self.tags.peak = self.parse_rg_peak(tag.text[0]) + have_replaygain = True + if tag.desc != "REPLAYGAIN_TRACK_PEAK": + need_update = True + elif tag.desc.lower() == "replaygain_album_gain": + if self.tags.album_loudness is None: + self.tags.album_loudness = self.parse_rg_gain(tag.text[0]) + have_replaygain = True + if tag.desc != "REPLAYGAIN_ALBUM_GAIN": + need_update = True + elif tag.desc.lower() == "replaygain_album_peak": + if self.tags.album_peak is None: + self.tags.album_peak = self.parse_rg_peak(tag.text[0]) + have_replaygain = True + if tag.desc != "REPLAYGAIN_ALBUM_GAIN": + need_update = True + + # Try loading the legacy RVA2 tags if information is missing + rva2_t = self.audio.tags.get("RVA2:track") + if rva2_t is not None and rva2_t.channel == 1: + if self.tags.loudness is None or self.tags.peak is None: + self.tags.loudness = self.REPLAYGAIN_REF - rva2_t.gain + self.tags.peak = 20.0 * log10(rva2_t.peak) + have_rva2 = True + rva2_a = self.audio.tags.get("RVA2:album") + if rva2_a is not None and rva2_a.channel == 1: + if self.tags.album_loudness is None or self.tags.album_peak is None: + self.tags.album_loudness = self.REPLAYGAIN_REF - rva2_a.gain + self.tags.album_peak = 20.0 * log10(rva2_a.peak) + have_rva2 = True + + if have_rva2 and not ( + self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE + ): + need_update = True + if have_replaygain and not ( + self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE + ): + need_update = True + if not have_rva2 and ( + self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE + ): + need_update = True + if not have_replaygain and ( + self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE + ): + need_update = True + + self.need_track_update = need_update + self.need_album_update = need_update + + return + + def read_gain_ogg_opus(self): + need_update = False + have_r128 = False + have_replaygain = False + + # Read the opus-specific 'R128' tags + r128_tg = self.audio.get("R128_TRACK_GAIN") + if r128_tg is not None: + if self.tags.loudness is None: + self.tags.loudness = self.parse_opus_gain(r128_tg[0]) + have_r128 = True + r128_ag = self.audio.get("R128_ALBUM_GAIN") + if r128_ag is not None: + if self.tags.album_loudness is None: + self.tags.album_loudness = self.parse_opus_gain(r128_ag[0]) + have_r128 = True + + # For compatibility, also read the generic replaygain tags + rg_tg = self.audio.get("REPLAYGAIN_TRACK_GAIN") + if rg_tg is not None: + if self.tags.loudness is None: + self.tags.loudness = self.parse_rg_gain(rg_tg[0]) + have_replaygain = True + rg_tp = self.audio.get("REPLAYGAIN_TRACK_PEAK") + if rg_tp is not None: + if self.tags.peak is None: + self.tags.peak = self.parse_rg_peak(rg_tp[0]) + have_replaygain = True + rg_ag = self.audio.get("REPLAYGAIN_ALBUM_GAIN") + if rg_ag is not None: + if self.tags.album_loudness is None: + self.tags.album_loudness = self.parse_rg_gain(rg_ag[0]) + have_replaygain = True + rg_ap = self.audio.get("REPLAYGAIN_ALBUM_PEAK") + if rg_ap is not None: + if self.tags.album_peak is None: + self.tags.album_peak = self.parse_rg_peak(rg_ap[0]) + have_replaygain = True + + # This is a hack, R128 gain tags don't store peak, but + # we want to mark it as valid tag even without the peak + if have_r128 and self.ogg_opus_mode is OggOpusMode.R128: + if self.tags.loudness is not None and self.tags.peak is None: + self.tags.peak = float("nan") + if self.tags.album_loudness is not None and self.tags.album_peak is None: + self.tags.album_peak = float("nan") + + if have_r128 and not ( + self.ogg_opus_mode is OggOpusMode.R128 + or self.ogg_opus_mode is OggOpusMode.COMPATIBLE + ): + need_update = True + if have_replaygain and not ( + self.ogg_opus_mode is OggOpusMode.REPLAYGAIN + or self.ogg_opus_mode is OggOpusMode.COMPATIBLE + ): + need_update = True + if not have_r128 and ( + self.ogg_opus_mode is OggOpusMode.R128 + or self.ogg_opus_mode is OggOpusMode.COMPATIBLE + ): + need_update = True + if not have_replaygain and ( + self.ogg_opus_mode is OggOpusMode.REPLAYGAIN + or self.ogg_opus_mode is OggOpusMode.COMPATIBLE + ): + need_update = True + + self.need_track_update = need_update + self.need_album_update = need_update + + def read_gain_mp4(self): + # These are the tags used by foobar2000, and are compatible with + # rockbox. + for key, value in self.audio.tags.items(): + atom_name = key[:4] + if atom_name != "----": + continue + + _, mean, name = key.split(":", 2) + + if not ( + mean == "com.apple.iTunes" or mean == "org.hydrogenaudio.replaygain" + ): + continue + + if value[0].dataformat == mutagen.mp4.AtomDataType.UTF8: + value = value[0].decode(encoding="UTF-8") + else: + continue + + name = name.lower() + if name == "replaygain_track_gain": + if self.tags.loudness is None: + self.tags.loudness = self.parse_rg_gain(value) + continue + if name == "replaygain_track_peak": + if self.tags.peak is None: + self.tags.peak = self.parse_rg_peak(value) + continue + if name == "replaygain_album_gain": + if self.tags.album_loudness is None: + self.tags.album_loudness = self.parse_rg_gain(value) + continue + if name == "replaygain_album_peak": + if self.tags.album_peak is None: + self.tags.album_peak = self.parse_rg_peak(value) + continue + + return self.tags + + def read_gain_generic(self): + rg_tg = self.audio.get("REPLAYGAIN_TRACK_GAIN") + if rg_tg is not None: + if self.tags.loudness is None: + self.tags.loudness = self.parse_rg_gain(rg_tg[0]) + rg_tp = self.audio.get("REPLAYGAIN_TRACK_PEAK") + if rg_tp is not None: + if self.tags.peak is None: + self.tags.peak = self.parse_rg_peak(rg_tp[0]) + rg_ag = self.audio.get("REPLAYGAIN_ALBUM_GAIN") + if rg_ag is not None: + if self.tags.album_loudness is None: + self.tags.album_loudness = self.parse_rg_gain(rg_ag[0]) + rg_ap = self.audio.get("REPLAYGAIN_ALBUM_PEAK") + if rg_ap is not None: + if self.tags.album_peak is None: + self.tags.album_peak = self.parse_rg_peak(rg_ap[0]) + + def read_gain(self): + self.need_track_update = False + self.need_album_update = False + self.audio = mutagen.File(self.filename) + + if isinstance(self.audio, mutagen.id3.ID3FileType) and self.audio.tags is None: + self.audio.add_tags() + + if self.audio is None or self.audio.tags is None: + raise Exception( + "Unable to determine tag format for file: {}".format(self.filename) + ) + + if isinstance(self.audio.tags, mutagen.id3.ID3): + self.read_gain_id3() + elif isinstance(self.audio, mutagen.oggopus.OggOpus): + self.read_gain_ogg_opus() + elif isinstance(self.audio, mutagen.mp4.MP4): + self.read_gain_mp4() + else: + self.read_gain_generic() + + if self.tags.album_loudness is not None or self.tags.album_peak is not None: + self.need_track_update = True + + return self.tags + + def write_gain_id3(self): + logger.debug( + "%s: Writing ID3 tags using mode %s", self.filename, self.id3_mode.name + ) + # Delete standard ReplayGain tags + to_delete = [] + for tag in self.audio.tags.getall("TXXX"): + name = tag.desc.lower() + if ( + name == "replaygain_track_gain" + or name == "replaygain_track_peak" + or name == "replaygain_album_gain" + or name == "replaygain_album_peak" + or name == "replaygain_reference_loudness" + ): + to_delete.append(tag.HashKey) + for key in to_delete: + logger.debug("%s: Removing %s", self.filename, key) + del self.audio.tags[key] + # Delete RVA2 frames + if "RVA2:track" in self.audio: + logger.debug("%s: Removing RVA2:track", self.filename) + del self.audio["RVA2:track"] + if "RVA2:album" in self.audio: + logger.debug("%s: Removing RVA2:album", self.filename) + del self.audio["RVA2:album"] + + if self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE: + if self.tags.loudness is not None: + gain = self.format_rg_gain(self.tags.loudness) + logger.debug( + "%s: Adding TXXX:REPLAYGAIN_TRACK_GAIN=%s", self.filename, gain + ) + tag = mutagen.id3.TXXX( + encoding=0, desc="REPLAYGAIN_TRACK_GAIN", text=[gain] + ) + self.audio.tags.add(tag) + + if self.tags.peak is not None: + peak = self.format_rg_peak(self.tags.peak) + logger.debug( + "%s: Adding TXXX:REPLAYGAIN_TRACK_PEAK=%s", self.filename, peak + ) + tag = mutagen.id3.TXXX( + encoding=0, desc="REPLAYGAIN_TRACK_PEAK", text=[peak] + ) + self.audio.tags.add(tag) + + if self.tags.album_loudness is not None: + gain = self.format_rg_gain(self.tags.album_loudness) + logger.debug( + "%s: Adding TXXX:REPLAYGAIN_ALBUM_GAIN=%s", self.filename, gain + ) + tag = mutagen.id3.TXXX( + encoding=0, desc="REPLAYGAIN_ALBUM_GAIN", text=[gain] + ) + self.audio.tags.add(tag) + + if self.tags.album_peak is not None: + peak = self.format_rg_peak(self.tags.album_peak) + logger.debug( + "%s: Adding TXXX:REPLAYGAIN_ALBUM_PEAK=%s", self.filename, peak + ) + tag = mutagen.id3.TXXX( + encoding=0, desc="REPLAYGAIN_ALBUM_PEAK", text=[peak] + ) + self.audio.tags.add(tag) + + if self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE: + if self.tags.loudness is not None and self.tags.peak is not None: + gain = self.REPLAYGAIN_REF - self.tags.loudness + peak = self.format_rva2_peak(self.tags.peak, "track") + logger.debug( + "%s: Adding RVA2:track={channel=1, gain=%f, peak=%f}", + self.filename, + gain, + peak, + ) + tag = mutagen.id3.RVA2(desc="track", channel=1, gain=gain, peak=peak) + self.audio.tags.add(tag) + + if ( + self.tags.album_loudness is not None + and self.tags.album_peak is not None + ): + gain = self.REPLAYGAIN_REF - self.tags.album_loudness + peak = self.format_rva2_peak(self.tags.album_peak, "album") + logger.debug( + "%s: Adding RVA2:album={channel=1, gain=%f, peak=%f}", + self.filename, + gain, + peak, + ) + tag = mutagen.id3.RVA2(desc="album", channel=1, gain=gain, peak=peak) + self.audio.tags.add(tag) + + self.audio.tags.update_to_v24() + self.audio.save() + + def write_gain_ogg_opus(self): + logger.debug( + "%s: Writing OggOpus tags using mode %s", + self.filename, + self.ogg_opus_mode.name, + ) + + self.write_gain_generic_cleanup() + + if ( + self.ogg_opus_mode is OggOpusMode.R128 + or self.ogg_opus_mode is OggOpusMode.COMPATIBLE + ): + if self.tags.loudness is not None: + gain = self.format_opus_gain(self.tags.loudness, "track") + logger.debug("%s: Adding R128_TRACK_GAIN=%s", self.filename, gain) + self.audio["R128_TRACK_GAIN"] = [gain] + + if self.tags.album_loudness is not None: + gain = self.format_opus_gain(self.tags.album_loudness, "album") + logger.debug("%s: Adding R128_ALBUM_GAIN=%s", self.filename, gain) + self.audio["R128_ALBUM_GAIN"] = [gain] + + if ( + self.ogg_opus_mode is OggOpusMode.REPLAYGAIN + or self.ogg_opus_mode is OggOpusMode.COMPATIBLE + ): + self.write_gain_generic_tags() + + self.audio.save() + + def write_gain_mp4(self): + logger.debug("%s: Writing MP4 tags", self.filename) + + to_delete = [] + for key, value in self.audio.tags.items(): + atom_name = key[:4] + if atom_name != "----": + continue + + _, mean, name = key.split(":", 2) + + if not ( + mean == "com.apple.iTunes" or mean == "org.hydrogenaudio.replaygain" + ): + continue + + if value[0].dataformat == mutagen.mp4.AtomDataType.UTF8: + value = value[0].decode(encoding="UTF-8") + else: + continue + + name = name.lower() + if ( + name == "replaygain_track_gain" + or name == "replaygain_track_peak" + or name == "replaygain_album_gain" + or name == "replaygain_album_peak" + ): + to_delete.append(key) + for key in to_delete: + logger.debug("%s: Removing %s", self.filename, key) + del self.audio.tags[key] + + # These are the tags used by foobar2000, and are compatible with + # rockbox. + if self.tags.loudness is not None: + gain = self.format_rg_gain(self.tags.loudness) + logger.debug( + "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_TRACK_GAIN=%s", + self.filename, + gain, + ) + tag = mutagen.mp4.MP4FreeForm(gain.encode(encoding="UTF-8")) + self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_TRACK_GAIN"] = [tag] + + if self.tags.peak is not None: + peak = self.format_rg_peak(self.tags.peak) + logger.debug( + "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_TRACK_PEAK=%s", + self.filename, + peak, + ) + tag = mutagen.mp4.MP4FreeForm(peak.encode(encoding="UTF-8")) + self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_TRACK_PEAK"] = [tag] + + if self.tags.album_loudness is not None: + gain = self.format_rg_gain(self.tags.album_loudness) + logger.debug( + "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_ALBUM_GAIN=%s", + self.filename, + gain, + ) + tag = mutagen.mp4.MP4FreeForm(gain.encode(encoding="UTF-8")) + self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_ALBUM_GAIN"] = [tag] + + if self.tags.album_peak is not None: + peak = self.format_rg_peak(self.tags.album_peak) + logger.debug( + "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_ALBUM_PEAK=%s", + self.filename, + peak, + ) + tag = mutagen.mp4.MP4FreeForm(peak.encode(encoding="UTF-8")) + self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_ALBUM_PEAK"] = [tag] + + self.audio.save() + + def write_gain_generic_cleanup(self): + for tag in [ + # Standard tags + "REPLAYGAIN_TRACK_GAIN", + "REPLAYGAIN_TRACK_PEAK", + "REPLAYGAIN_ALBUM_GAIN", + "REPLAYGAIN_ALBUM_PEAK", + # Unusual/old tags + "REPLAYGAIN_REFERENCE_LOUDNESS", + # OggOpus R128 gain tags (shouldn't be in other formats) + "R128_TRACK_GAIN", + "R128_ALBUM_GAIN", + ]: + if tag in self.audio: + logger.debug("%s: Removing %s", self.filename, tag) + del self.audio[tag] + + def write_gain_generic(self): + logger.debug("%s: Writing Generic tags", self.filename) + + self.write_gain_generic_cleanup() + self.write_gain_generic_tags() + self.audio.save() + + def write_gain_generic_tags(self): + if self.tags.loudness is not None: + gain = self.format_rg_gain(self.tags.loudness) + logger.debug("%s: Adding REPLAYGAIN_TRACK_GAIN=%s", self.filename, gain) + self.audio["REPLAYGAIN_TRACK_GAIN"] = [gain] + + if self.tags.peak is not None: + peak = self.format_rg_peak(self.tags.peak) + logger.debug("%s: Adding REPLAYGAIN_TRACK_PEAK=%s", self.filename, peak) + self.audio["REPLAYGAIN_TRACK_PEAK"] = [peak] + + if self.tags.album_loudness is not None: + gain = self.format_rg_gain(self.tags.album_loudness) + logger.debug("%s: Adding REPLAYGAIN_ALBUM_GAIN=%s", self.filename, gain) + self.audio["REPLAYGAIN_ALBUM_GAIN"] = [gain] + + if self.tags.album_peak is not None: + peak = self.format_rg_peak(self.tags.album_peak) + logger.debug("%s: Adding REPLAYGAIN_ALBUM_PEAK=%s", self.filename, peak) + self.audio["REPLAYGAIN_ALBUM_PEAK"] = [peak] + + def write_gain(self, tags): + self.tags = tags + if self.audio is None: + raise Exception("write_gain called without previous read_gain") + if isinstance(self.audio.tags, mutagen.id3.ID3): + return self.write_gain_id3() + if isinstance(self.audio, mutagen.oggopus.OggOpus): + return self.write_gain_ogg_opus() + if isinstance(self.audio, mutagen.mp4.MP4): + return self.write_gain_mp4() + return self.write_gain_generic() + + +class GainScanner: + i_re = re.compile(r"^\s+I:\s+(-?\d+\.\d+) LUFS$", re.M) + peak_re = re.compile(r"^\s+Peak:\s+(-?(?:\d+\.\d+|inf)) dBFS$", re.M) + + async def ffmpeg_parse_ebur128(self, *ff_opts): + ff_args = [ + "ffmpeg", + "-nostats", + "-nostdin", + "-hide_banner", + "-vn", + "-loglevel", + "info", + ] + ff_args += ff_opts + ff_args += ["-f", "null", "-"] + logger.debug("GainScanner%d: ffmpeg command: %r", id(self), ff_args) + + ffmpeg = await asyncio.create_subprocess_exec( + *ff_args, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE + ) + + result = GainInfo() + while True: + line_bytes = await ffmpeg.stderr.readline() + if len(line_bytes) == 0: + break + + line_str = str(line_bytes, errors='replace').rstrip() + logger.debug("GainScanner%d: ffmpeg: %s", id(self), line_str) + + m = self.i_re.search(line_str) + if m: + result.loudness = float(m.group(1)) + logger.debug( + "GainScanner%d: Parsed loudness: %f", id(self), result.loudness + ) + m = self.peak_re.search(line_str) + if m: + result.peak = float(m.group(1)) + logger.debug("GainScanner%d: Parsed peak: %f", id(self), result.peak) + + await ffmpeg.wait() + if ffmpeg.returncode != 0: + raise RuntimeError("ffmpeg exited with code {}".format(ffmpeg.returncode)) + + logger.debug("GainScanner%d: Result: %r", id(self), result) + + return result + + async def scan_track(self, filename): + result = await self.ffmpeg_parse_ebur128( + "-i", + "file:" + filename, + "-filter_complex", + "ebur128=framelog=verbose:peak=true[out]", + "-map", + "[out]", + ) + logger.debug("%s: Calculated track gain: %r", filename, result) + return result + + async def scan_album(self, filenames): + if len(filenames) == 0: + raise ValueError("filenames is empty") + ff_args = [] + for filename in filenames: + ff_args += ["-i", "file:" + filename] + ff_args += [ + "-filter_complex", + "concat=n={}:v=0:a=1,ebur128=framelog=verbose:peak=none[out]".format( + len(filenames) + ), + "-map", + "[out]", + ] + result = await self.ffmpeg_parse_ebur128(*ff_args) + # Move the result into the "album" fields + result = GainInfo(album_loudness=result.loudness, album_peak=result.peak) + + logger.debug( + "Album (%d tracks): Calculated album gain: %r", len(filenames), result + ) + return result + + +class Track: + def __init__(self, filename, job_sem): + self.filename = filename + self.job_sem = job_sem + self.tagger = Tagger(filename) + self.gain = GainInfo() + + async def read_tags(self): + async with self.job_sem: + loop = asyncio.get_running_loop() + self.gain = await loop.run_in_executor(None, self.tagger.read_gain) + + async def scan_gain(self): + async with self.job_sem: + gain_scanner = GainScanner() + self.gain = await gain_scanner.scan_track(self.filename) + + async def write_tags(self): + async with self.job_sem: + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self.tagger.write_gain, self.gain) + + async def scan(self, force=False, skip_save=False): + await self.read_tags() + + need_scan = False + if self.gain.loudness is None or self.gain.peak is None: + need_scan = True + if force: + need_scan = True + + need_save = self.tagger.need_track_update + + if need_scan: + await self.scan_gain() + need_save = True + + if need_save: + if not skip_save: + await self.write_tags() + + print(self.filename) + print(self.gain) + if need_scan: + print("Rescanned loudness") + if need_save: + if not skip_save: + print("Updated tags") + else: + print("Needs tag update") + print() + + +class AlbumTrack(Track): + def __init__(self, filename, job_sem, exclude): + super().__init__(filename, job_sem) + self.exclude = exclude + + +class Album: + def __init__(self, album_param, job_sem): + self.job_sem = job_sem + + self.gain = GainInfo() + self.tracks = [] + for filename in album_param["track"]: + self.tracks.append(AlbumTrack(filename, job_sem, exclude=False)) + for filename in album_param["exclude"]: + self.tracks.append(AlbumTrack(filename, job_sem, exclude=True)) + + async def read_tags(self): + track_tasks = [track.read_tags() for track in self.tracks] + await asyncio.gather(*track_tasks) + + async def scan_album_gain(self): + included = [t.filename for t in self.tracks if not t.exclude] + async with self.job_sem: + gain_scanner = GainScanner() + self.gain = await gain_scanner.scan_album(included) + + async def scan_gain(self): + album_task = asyncio.ensure_future(self.scan_album_gain()) + track_tasks = [track.scan_gain() for track in self.tracks] + + await asyncio.gather(album_task, *track_tasks) + + self.gain.album_peak = max([t.gain.peak for t in self.tracks]) + logger.debug( + "Album (%d tracks): Calculated album peak: %r", len(self.tracks), self.gain + ) + for track in self.tracks: + track.gain.album_loudness = self.gain.album_loudness + track.gain.album_peak = self.gain.album_peak + + async def write_tags(self): + track_tasks = [track.write_tags() for track in self.tracks] + await asyncio.gather(*track_tasks) + + async def scan(self, force=False, skip_save=False): + await self.read_tags() + + need_scan = False + for track in self.tracks: + if track.gain.loudness is None or track.gain.peak is None: + need_scan = True + if self.gain.album_loudness is None: + self.gain.album_loudness = track.gain.album_loudness + if self.gain.album_loudness != track.gain.album_loudness: + need_scan = True + if self.gain.album_peak is None: + self.gain.album_peak = track.gain.album_peak + if self.gain.album_peak != track.gain.album_peak: + need_scan = True + if self.gain.album_loudness is None or self.gain.album_peak is None: + need_scan = True + if force: + need_scan = True + + need_save = any([track.tagger.need_album_update for track in self.tracks]) + + if need_scan: + await self.scan_gain() + need_save = True + + if need_save: + if not skip_save: + await self.write_tags() + + print() + for track in self.tracks: + print(track.filename) + print(track.gain) + if need_scan: + print("Rescanned loudness") + if need_save: + if not skip_save: + print("Updated tags") + else: + print("Needs tag update") + + +async def main(argv=None): + parser = argparse.ArgumentParser( + description=""" + Add ReplayGain tags to files using the EBU R128 algorithm. + """, + epilog=""" + If neither --track or --album are specified, the mode used depends on + the number of files given as arguments. If a single file is given, it + will be processed in track mode. If multiple files are given, they + will be processed in album mode as a single album. + """, + ) + parser.add_argument( + "-n", + "--dry-run", + default=False, + action="store_true", + help=""" + Only calculate and display the ReplayGain values; do not actually + save the tags in the audio files. + """, + ) + parser.add_argument( + "-f", + "--force", + default=False, + action="store_true", + help=""" + Recalculate the ReplayGain values even if valid tags are already + present in the files. + """, + ) + parser.add_argument( + "--debug", + dest="log_level", + default=logging.WARNING, + action="store_const", + const=logging.DEBUG, + help=""" + Print a bunch of extra debugging output. + """, + ) + parser.add_argument( + "-j", + "--jobs", + type=int, + default=multiprocessing.cpu_count(), + help=""" + The number of operations to run in parallel. The default is + auto-detected, currently %(default)s. + """, + ) + parser.add_argument( + "-t", + "--track", + nargs="+", + default=list(), + metavar="FILE", + action=TrackAction, + help=""" + Treat the following audio files as individual tracks. + """, + ) + parser.add_argument( + "-a", + "--album", + nargs="+", + default=list(), + metavar="FILE", + action=AlbumAction, + help=""" + Treat the following audio files as part of the same album. + Each time the --album option is specified, it starts a new album. + """, + ) + parser.add_argument( + "-e", + "--exclude", + nargs="+", + default=list(), + metavar="FILE", + action=AlbumAction, + help=""" + Tag the following files as part of the current album, but do not + use their audio when calculating the value for the album ReplayGain + tag. + """, + ) + parser.add_argument("FILE", nargs="*", default=[], help=argparse.SUPPRESS) + args = parser.parse_args(argv) + + logging.basicConfig(format="%(levelname)s: %(message)s", level=args.log_level) + + logger.debug("Debug logging has been enabled") + logger.debug("Command line arguments: %r", args) + + # Handle the "loose" arguments, by turning them into tracks or albums + if len(args.FILE) + len(args.exclude) > 1 or len(args.exclude) > 0: + # Treat the initial arguments as an album + args.album.append({"track": list(args.FILE), "exclude": list(args.exclude)}) + args.FILE = None + args.exclude = None + elif len(args.FILE) > 0: + args.track.extend(args.FILE) + args.FILE = None + + if len(args.track) == 0 and len(args.album) == 0: + parser.print_usage() + sys.exit(2) + + job_sem = asyncio.BoundedSemaphore(args.jobs) + + tasks = [] + albums = [Album(album, job_sem) for album in args.album] + tasks += [album.scan(force=args.force, skip_save=args.dry_run) for album in albums] + tracks = [Track(track, job_sem) for track in args.track] + tasks += [track.scan(force=args.force, skip_save=args.dry_run) for track in tracks] + + await asyncio.gather(*tasks) + + +if __name__ == "__main__": + asyncio.run(main(sys.argv[1:])) \ No newline at end of file