#!/usr/bin/env python3 # regainer - advanced ReplayGain tagging # Copyright 2016 Calvin Walton # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """Advanced ReplayGain scanner and tagger""" __version__ = "1.0.0" import subprocess import argparse import asyncio import multiprocessing import re import mutagen from math import log10 from enum import Enum import sys import logging import decimal logger = logging.getLogger(__name__) class AlbumAction(argparse.Action): def __init__(self, option_strings, dest, **kwargs): super(AlbumAction, self).__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, values, option_string=None): if namespace.album is None: namespace.album = list() if option_string == "-a" or option_string == "--album": namespace.album.append({"track": list(values), "exclude": list()}) if option_string == "-e" or option_string == "--exclude": if len(namespace.album) == 0: if namespace.exclude is None: namespace.exclude = values else: namespace.exclude.extend(values) else: namespace.album[-1]["exclude"].extend(values) class TrackAction(argparse.Action): def __init__(self, option_strings, dest, **kwargs): super(TrackAction, self).__init__(option_strings, dest, **kwargs) def __call__(self, parser, namespace, values, option_string=None): if namespace.track is None: namespace.track = list(values) else: namespace.track.extend(values) class GainInfo: def __init__(self, loudness=None, album_loudness=None, peak=None, album_peak=None): self.loudness = loudness self.album_loudness = album_loudness self.peak = peak self.album_peak = album_peak def __repr__(self): return ( "GainInfo(loudness={}, peak={}, album_loudness={}, album_peak={})".format( repr(self.loudness), repr(self.peak), repr(self.album_loudness), repr(self.album_peak), ) ) def __str__(self): str = "" str += "Track: " if self.loudness is None: str += "I: None" else: str += "I: {:.2f} LUFS".format(self.loudness) str += ", " if self.peak is None: str += "Peak: None" else: str += "Peak: {:.2f} dBFS".format(self.peak) str += "; Album: " if self.album_loudness is None: str += "I: None" else: str += "I: {:.2f} LUFS".format(self.album_loudness) str += ", " if self.album_peak is None: str += "Peak: None" else: str += "Peak: {:.2f} dBFS".format(self.album_peak) return str class OggOpusMode(Enum): R128 = 1 """ Write R128 gain tags as specified in the Ogg Opus encapsulation doc. This writes the tags R128_TRACK_GAIN and R128_ALBUM_GAIN to the file, and removes any REPLAYGAIN tags that may be present. The R128 gain tags use the EBU R128 -23 LUFS reference level. This method is the most standards compliant, but has limited application compatibility. """ REPLAYGAIN = 2 """ Write REPLAYGAIN tags compatible with FLAC, Vorbis, etc. This writes the tags REPLAYGAIN_{TRACK,ALBUM}_{GAIN,PEAK} to the file, and removes any R128 gain tags that may be present. The REPLAYGAIN tags use the -18 LUFS reference level from ReplayGain 2.0. This method is against the spirit of the specifications, but has good application compatibility (most applications share parsing code between Opus tags and other formats). """ COMPATIBLE = 3 """ Write both R128 gain and REPLAYGAIN tags. This writes the tags from both the "standard" and "replaygain" modes at the same time. This method is against the spirit of the specifications, but ensures maximum application compatibility. Since the same computed gain value is used to generate both sets of tags (adjusted the the appropriate different reference levels), it doesn't matter which one the application uses - the result will be the same. This is the default tagging method. """ class ID3Mode(Enum): REPLAYGAIN = 1 """Write REPLAYGAIN tags according to the ReplayGain 2.0 spec.""" RVA2 = 2 """Use the ID3v2 RVA2 frames to store ReplayGain information.""" COMPATIBLE = 3 """ Write both REPLAYGAIN tags and RVA2 frames. This is the default tagging method. """ class Tagger: REPLAYGAIN_REF = -18.0 # LUFS R128_REF = -23.0 # LUFS ogg_opus_mode = OggOpusMode.COMPATIBLE id3_mode = ID3Mode.COMPATIBLE def __init__(self, filename): self.filename = filename self.tags = GainInfo() self.need_album_update = False self.need_track_update = False rg_gain_re = re.compile(r"^\s*([+-]?\d+(?:\.\d+)?)") def parse_rg_gain(self, value): m = self.rg_gain_re.match(value) if m: return self.REPLAYGAIN_REF - float(m.group(1)) return None def format_rg_gain(self, loudness): return "{:.2f} dB".format(self.REPLAYGAIN_REF - loudness) rg_peak_re = re.compile(r"^\s*([+-]?\d+(?:\.\d+)?)") def parse_rg_peak(self, value): m = self.rg_peak_re.match(value) if m: peak = float(m.group(1)) if peak > 0.0: return 20.0 * log10(peak) else: return float("-inf") return None def format_rg_peak(self, peak): return "{:.6f}".format(10.0 ** (peak / 20.0)) opus_gain_re = re.compile(r"^\s*([+-]?\d{1,5})") def parse_opus_gain(self, value): m = self.opus_gain_re.match(value) if m: return self.R128_REF - float(m.group(1)) / 256.0 def format_opus_gain(self, loudness, context): gain = int((self.R128_REF - loudness) * 256.0) clipped_gain = max(-32768, min(gain, 32767)) if gain != clipped_gain: logger.warning( "%s: Clipping OggOpus R128 %s gain adjustment %.2f dB to %.2f dB", self.filename, context, float(gain) / 256, float(clipped_gain) / 256, ) gain = clipped_gain return "{:d}".format(gain) def format_rva2_gain(self, loudness, context): int_gain = decimal.Decimal.from_float( (self.REPLAYGAIN_REF - loudness) * 512 ).to_integral_value(decimal.ROUND_HALF_EVEN) clipped_int_gain = max(-32768, min(int_gain, 32767)) if int_gain != clipped_int_gain: logger.warning( "%s: Clipping ID3 RVA2 %s gain adjustment %.2 dB to %.2f dB", self.filename, context, float(int_gain) / 512, float(clipped_int_gain) / 512, ) int_gain = clipped_int_gain return float(int_gain) / 512 def format_rva2_peak(self, peak, context): int_peak = decimal.Decimal.from_float( (10.0 ** (peak / 20.0)) * 32768 ).to_integral_value(decimal.ROUND_HALF_EVEN) clipped_int_peak = min(int_peak, 65535) if int_peak != clipped_int_peak: logger.warning( "%s: Clipping ID3 RVA2 %s peak %.6f to %.6f", self.filename, context, float(int_peak) / 32768, float(clipped_int_peak) / 32768, ) int_peak = clipped_int_peak return float(int_peak) / 32768 def read_gain_id3(self): need_update = False have_replaygain = False have_rva2 = False # Load the standard REPLAYGAIN tags first # Case-insensitive matching... for tag in self.audio.tags.getall("TXXX"): if tag.desc.lower() == "replaygain_track_gain": if self.tags.loudness is None: self.tags.loudness = self.parse_rg_gain(tag.text[0]) have_replaygain = True if tag.desc != "REPLAYGAIN_TRACK_GAIN": need_update = True elif tag.desc.lower() == "replaygain_track_peak": if self.tags.peak is None: self.tags.peak = self.parse_rg_peak(tag.text[0]) have_replaygain = True if tag.desc != "REPLAYGAIN_TRACK_PEAK": need_update = True elif tag.desc.lower() == "replaygain_album_gain": if self.tags.album_loudness is None: self.tags.album_loudness = self.parse_rg_gain(tag.text[0]) have_replaygain = True if tag.desc != "REPLAYGAIN_ALBUM_GAIN": need_update = True elif tag.desc.lower() == "replaygain_album_peak": if self.tags.album_peak is None: self.tags.album_peak = self.parse_rg_peak(tag.text[0]) have_replaygain = True if tag.desc != "REPLAYGAIN_ALBUM_GAIN": need_update = True # Try loading the legacy RVA2 tags if information is missing rva2_t = self.audio.tags.get("RVA2:track") if rva2_t is not None and rva2_t.channel == 1: if self.tags.loudness is None or self.tags.peak is None: self.tags.loudness = self.REPLAYGAIN_REF - rva2_t.gain self.tags.peak = 20.0 * log10(rva2_t.peak) have_rva2 = True rva2_a = self.audio.tags.get("RVA2:album") if rva2_a is not None and rva2_a.channel == 1: if self.tags.album_loudness is None or self.tags.album_peak is None: self.tags.album_loudness = self.REPLAYGAIN_REF - rva2_a.gain self.tags.album_peak = 20.0 * log10(rva2_a.peak) have_rva2 = True if have_rva2 and not ( self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE ): need_update = True if have_replaygain and not ( self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE ): need_update = True if not have_rva2 and ( self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE ): need_update = True if not have_replaygain and ( self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE ): need_update = True self.need_track_update = need_update self.need_album_update = need_update return def read_gain_ogg_opus(self): need_update = False have_r128 = False have_replaygain = False # Read the opus-specific 'R128' tags r128_tg = self.audio.get("R128_TRACK_GAIN") if r128_tg is not None: if self.tags.loudness is None: self.tags.loudness = self.parse_opus_gain(r128_tg[0]) have_r128 = True r128_ag = self.audio.get("R128_ALBUM_GAIN") if r128_ag is not None: if self.tags.album_loudness is None: self.tags.album_loudness = self.parse_opus_gain(r128_ag[0]) have_r128 = True # For compatibility, also read the generic replaygain tags rg_tg = self.audio.get("REPLAYGAIN_TRACK_GAIN") if rg_tg is not None: if self.tags.loudness is None: self.tags.loudness = self.parse_rg_gain(rg_tg[0]) have_replaygain = True rg_tp = self.audio.get("REPLAYGAIN_TRACK_PEAK") if rg_tp is not None: if self.tags.peak is None: self.tags.peak = self.parse_rg_peak(rg_tp[0]) have_replaygain = True rg_ag = self.audio.get("REPLAYGAIN_ALBUM_GAIN") if rg_ag is not None: if self.tags.album_loudness is None: self.tags.album_loudness = self.parse_rg_gain(rg_ag[0]) have_replaygain = True rg_ap = self.audio.get("REPLAYGAIN_ALBUM_PEAK") if rg_ap is not None: if self.tags.album_peak is None: self.tags.album_peak = self.parse_rg_peak(rg_ap[0]) have_replaygain = True # This is a hack, R128 gain tags don't store peak, but # we want to mark it as valid tag even without the peak if have_r128 and self.ogg_opus_mode is OggOpusMode.R128: if self.tags.loudness is not None and self.tags.peak is None: self.tags.peak = float("nan") if self.tags.album_loudness is not None and self.tags.album_peak is None: self.tags.album_peak = float("nan") if have_r128 and not ( self.ogg_opus_mode is OggOpusMode.R128 or self.ogg_opus_mode is OggOpusMode.COMPATIBLE ): need_update = True if have_replaygain and not ( self.ogg_opus_mode is OggOpusMode.REPLAYGAIN or self.ogg_opus_mode is OggOpusMode.COMPATIBLE ): need_update = True if not have_r128 and ( self.ogg_opus_mode is OggOpusMode.R128 or self.ogg_opus_mode is OggOpusMode.COMPATIBLE ): need_update = True if not have_replaygain and ( self.ogg_opus_mode is OggOpusMode.REPLAYGAIN or self.ogg_opus_mode is OggOpusMode.COMPATIBLE ): need_update = True self.need_track_update = need_update self.need_album_update = need_update def read_gain_mp4(self): # These are the tags used by foobar2000, and are compatible with # rockbox. for key, value in self.audio.tags.items(): atom_name = key[:4] if atom_name != "----": continue _, mean, name = key.split(":", 2) if not ( mean == "com.apple.iTunes" or mean == "org.hydrogenaudio.replaygain" ): continue if value[0].dataformat == mutagen.mp4.AtomDataType.UTF8: value = value[0].decode(encoding="UTF-8") else: continue name = name.lower() if name == "replaygain_track_gain": if self.tags.loudness is None: self.tags.loudness = self.parse_rg_gain(value) continue if name == "replaygain_track_peak": if self.tags.peak is None: self.tags.peak = self.parse_rg_peak(value) continue if name == "replaygain_album_gain": if self.tags.album_loudness is None: self.tags.album_loudness = self.parse_rg_gain(value) continue if name == "replaygain_album_peak": if self.tags.album_peak is None: self.tags.album_peak = self.parse_rg_peak(value) continue return self.tags def read_gain_generic(self): rg_tg = self.audio.get("REPLAYGAIN_TRACK_GAIN") if rg_tg is not None: if self.tags.loudness is None: self.tags.loudness = self.parse_rg_gain(rg_tg[0]) rg_tp = self.audio.get("REPLAYGAIN_TRACK_PEAK") if rg_tp is not None: if self.tags.peak is None: self.tags.peak = self.parse_rg_peak(rg_tp[0]) rg_ag = self.audio.get("REPLAYGAIN_ALBUM_GAIN") if rg_ag is not None: if self.tags.album_loudness is None: self.tags.album_loudness = self.parse_rg_gain(rg_ag[0]) rg_ap = self.audio.get("REPLAYGAIN_ALBUM_PEAK") if rg_ap is not None: if self.tags.album_peak is None: self.tags.album_peak = self.parse_rg_peak(rg_ap[0]) def read_gain(self): self.need_track_update = False self.need_album_update = False self.audio = mutagen.File(self.filename) if isinstance(self.audio, mutagen.id3.ID3FileType) and self.audio.tags is None: self.audio.add_tags() if self.audio is None or self.audio.tags is None: raise Exception( "Unable to determine tag format for file: {}".format(self.filename) ) if isinstance(self.audio.tags, mutagen.id3.ID3): self.read_gain_id3() elif isinstance(self.audio, mutagen.oggopus.OggOpus): self.read_gain_ogg_opus() elif isinstance(self.audio, mutagen.mp4.MP4): self.read_gain_mp4() else: self.read_gain_generic() if self.tags.album_loudness is not None or self.tags.album_peak is not None: self.need_track_update = True return self.tags def write_gain_id3(self): logger.debug( "%s: Writing ID3 tags using mode %s", self.filename, self.id3_mode.name ) # Delete standard ReplayGain tags to_delete = [] for tag in self.audio.tags.getall("TXXX"): name = tag.desc.lower() if ( name == "replaygain_track_gain" or name == "replaygain_track_peak" or name == "replaygain_album_gain" or name == "replaygain_album_peak" or name == "replaygain_reference_loudness" ): to_delete.append(tag.HashKey) for key in to_delete: logger.debug("%s: Removing %s", self.filename, key) del self.audio.tags[key] # Delete RVA2 frames if "RVA2:track" in self.audio: logger.debug("%s: Removing RVA2:track", self.filename) del self.audio["RVA2:track"] if "RVA2:album" in self.audio: logger.debug("%s: Removing RVA2:album", self.filename) del self.audio["RVA2:album"] if self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE: if self.tags.loudness is not None: gain = self.format_rg_gain(self.tags.loudness) logger.debug( "%s: Adding TXXX:REPLAYGAIN_TRACK_GAIN=%s", self.filename, gain ) tag = mutagen.id3.TXXX( encoding=0, desc="REPLAYGAIN_TRACK_GAIN", text=[gain] ) self.audio.tags.add(tag) if self.tags.peak is not None: peak = self.format_rg_peak(self.tags.peak) logger.debug( "%s: Adding TXXX:REPLAYGAIN_TRACK_PEAK=%s", self.filename, peak ) tag = mutagen.id3.TXXX( encoding=0, desc="REPLAYGAIN_TRACK_PEAK", text=[peak] ) self.audio.tags.add(tag) if self.tags.album_loudness is not None: gain = self.format_rg_gain(self.tags.album_loudness) logger.debug( "%s: Adding TXXX:REPLAYGAIN_ALBUM_GAIN=%s", self.filename, gain ) tag = mutagen.id3.TXXX( encoding=0, desc="REPLAYGAIN_ALBUM_GAIN", text=[gain] ) self.audio.tags.add(tag) if self.tags.album_peak is not None: peak = self.format_rg_peak(self.tags.album_peak) logger.debug( "%s: Adding TXXX:REPLAYGAIN_ALBUM_PEAK=%s", self.filename, peak ) tag = mutagen.id3.TXXX( encoding=0, desc="REPLAYGAIN_ALBUM_PEAK", text=[peak] ) self.audio.tags.add(tag) if self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE: if self.tags.loudness is not None and self.tags.peak is not None: gain = self.REPLAYGAIN_REF - self.tags.loudness peak = self.format_rva2_peak(self.tags.peak, "track") logger.debug( "%s: Adding RVA2:track={channel=1, gain=%f, peak=%f}", self.filename, gain, peak, ) tag = mutagen.id3.RVA2(desc="track", channel=1, gain=gain, peak=peak) self.audio.tags.add(tag) if ( self.tags.album_loudness is not None and self.tags.album_peak is not None ): gain = self.REPLAYGAIN_REF - self.tags.album_loudness peak = self.format_rva2_peak(self.tags.album_peak, "album") logger.debug( "%s: Adding RVA2:album={channel=1, gain=%f, peak=%f}", self.filename, gain, peak, ) tag = mutagen.id3.RVA2(desc="album", channel=1, gain=gain, peak=peak) self.audio.tags.add(tag) self.audio.tags.update_to_v24() self.audio.save() def write_gain_ogg_opus(self): logger.debug( "%s: Writing OggOpus tags using mode %s", self.filename, self.ogg_opus_mode.name, ) self.write_gain_generic_cleanup() if ( self.ogg_opus_mode is OggOpusMode.R128 or self.ogg_opus_mode is OggOpusMode.COMPATIBLE ): if self.tags.loudness is not None: gain = self.format_opus_gain(self.tags.loudness, "track") logger.debug("%s: Adding R128_TRACK_GAIN=%s", self.filename, gain) self.audio["R128_TRACK_GAIN"] = [gain] if self.tags.album_loudness is not None: gain = self.format_opus_gain(self.tags.album_loudness, "album") logger.debug("%s: Adding R128_ALBUM_GAIN=%s", self.filename, gain) self.audio["R128_ALBUM_GAIN"] = [gain] if ( self.ogg_opus_mode is OggOpusMode.REPLAYGAIN or self.ogg_opus_mode is OggOpusMode.COMPATIBLE ): self.write_gain_generic_tags() self.audio.save() def write_gain_mp4(self): logger.debug("%s: Writing MP4 tags", self.filename) to_delete = [] for key, value in self.audio.tags.items(): atom_name = key[:4] if atom_name != "----": continue _, mean, name = key.split(":", 2) if not ( mean == "com.apple.iTunes" or mean == "org.hydrogenaudio.replaygain" ): continue if value[0].dataformat == mutagen.mp4.AtomDataType.UTF8: value = value[0].decode(encoding="UTF-8") else: continue name = name.lower() if ( name == "replaygain_track_gain" or name == "replaygain_track_peak" or name == "replaygain_album_gain" or name == "replaygain_album_peak" ): to_delete.append(key) for key in to_delete: logger.debug("%s: Removing %s", self.filename, key) del self.audio.tags[key] # These are the tags used by foobar2000, and are compatible with # rockbox. if self.tags.loudness is not None: gain = self.format_rg_gain(self.tags.loudness) logger.debug( "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_TRACK_GAIN=%s", self.filename, gain, ) tag = mutagen.mp4.MP4FreeForm(gain.encode(encoding="UTF-8")) self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_TRACK_GAIN"] = [tag] if self.tags.peak is not None: peak = self.format_rg_peak(self.tags.peak) logger.debug( "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_TRACK_PEAK=%s", self.filename, peak, ) tag = mutagen.mp4.MP4FreeForm(peak.encode(encoding="UTF-8")) self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_TRACK_PEAK"] = [tag] if self.tags.album_loudness is not None: gain = self.format_rg_gain(self.tags.album_loudness) logger.debug( "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_ALBUM_GAIN=%s", self.filename, gain, ) tag = mutagen.mp4.MP4FreeForm(gain.encode(encoding="UTF-8")) self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_ALBUM_GAIN"] = [tag] if self.tags.album_peak is not None: peak = self.format_rg_peak(self.tags.album_peak) logger.debug( "%s: Adding ----:com.apple.iTunes:REPLAYGAIN_ALBUM_PEAK=%s", self.filename, peak, ) tag = mutagen.mp4.MP4FreeForm(peak.encode(encoding="UTF-8")) self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_ALBUM_PEAK"] = [tag] self.audio.save() def write_gain_generic_cleanup(self): for tag in [ # Standard tags "REPLAYGAIN_TRACK_GAIN", "REPLAYGAIN_TRACK_PEAK", "REPLAYGAIN_ALBUM_GAIN", "REPLAYGAIN_ALBUM_PEAK", # Unusual/old tags "REPLAYGAIN_REFERENCE_LOUDNESS", # OggOpus R128 gain tags (shouldn't be in other formats) "R128_TRACK_GAIN", "R128_ALBUM_GAIN", ]: if tag in self.audio: logger.debug("%s: Removing %s", self.filename, tag) del self.audio[tag] def write_gain_generic(self): logger.debug("%s: Writing Generic tags", self.filename) self.write_gain_generic_cleanup() self.write_gain_generic_tags() self.audio.save() def write_gain_generic_tags(self): if self.tags.loudness is not None: gain = self.format_rg_gain(self.tags.loudness) logger.debug("%s: Adding REPLAYGAIN_TRACK_GAIN=%s", self.filename, gain) self.audio["REPLAYGAIN_TRACK_GAIN"] = [gain] if self.tags.peak is not None: peak = self.format_rg_peak(self.tags.peak) logger.debug("%s: Adding REPLAYGAIN_TRACK_PEAK=%s", self.filename, peak) self.audio["REPLAYGAIN_TRACK_PEAK"] = [peak] if self.tags.album_loudness is not None: gain = self.format_rg_gain(self.tags.album_loudness) logger.debug("%s: Adding REPLAYGAIN_ALBUM_GAIN=%s", self.filename, gain) self.audio["REPLAYGAIN_ALBUM_GAIN"] = [gain] if self.tags.album_peak is not None: peak = self.format_rg_peak(self.tags.album_peak) logger.debug("%s: Adding REPLAYGAIN_ALBUM_PEAK=%s", self.filename, peak) self.audio["REPLAYGAIN_ALBUM_PEAK"] = [peak] def write_gain(self, tags): self.tags = tags if self.audio is None: raise Exception("write_gain called without previous read_gain") if isinstance(self.audio.tags, mutagen.id3.ID3): return self.write_gain_id3() if isinstance(self.audio, mutagen.oggopus.OggOpus): return self.write_gain_ogg_opus() if isinstance(self.audio, mutagen.mp4.MP4): return self.write_gain_mp4() return self.write_gain_generic() class GainScanner: i_re = re.compile(r"^\s+I:\s+(-?\d+\.\d+) LUFS$", re.M) peak_re = re.compile(r"^\s+Peak:\s+(-?(?:\d+\.\d+|inf)) dBFS$", re.M) async def ffmpeg_parse_ebur128(self, *ff_opts): ff_args = [ "ffmpeg", "-nostats", "-nostdin", "-hide_banner", "-vn", "-loglevel", "info", ] ff_args += ff_opts ff_args += ["-f", "null", "-"] logger.debug("GainScanner%d: ffmpeg command: %r", id(self), ff_args) ffmpeg = await asyncio.create_subprocess_exec( *ff_args, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE ) result = GainInfo() while True: line_bytes = await ffmpeg.stderr.readline() if len(line_bytes) == 0: break line_str = str(line_bytes, errors='replace').rstrip() logger.debug("GainScanner%d: ffmpeg: %s", id(self), line_str) m = self.i_re.search(line_str) if m: result.loudness = float(m.group(1)) logger.debug( "GainScanner%d: Parsed loudness: %f", id(self), result.loudness ) m = self.peak_re.search(line_str) if m: result.peak = float(m.group(1)) logger.debug("GainScanner%d: Parsed peak: %f", id(self), result.peak) await ffmpeg.wait() if ffmpeg.returncode != 0: raise RuntimeError("ffmpeg exited with code {}".format(ffmpeg.returncode)) logger.debug("GainScanner%d: Result: %r", id(self), result) return result async def scan_track(self, filename): result = await self.ffmpeg_parse_ebur128( "-i", "file:" + filename, "-filter_complex", "ebur128=framelog=verbose:peak=true[out]", "-map", "[out]", ) logger.debug("%s: Calculated track gain: %r", filename, result) return result async def scan_album(self, filenames): if len(filenames) == 0: raise ValueError("filenames is empty") ff_args = [] for filename in filenames: ff_args += ["-i", "file:" + filename] ff_args += [ "-filter_complex", "concat=n={}:v=0:a=1,ebur128=framelog=verbose:peak=none[out]".format( len(filenames) ), "-map", "[out]", ] result = await self.ffmpeg_parse_ebur128(*ff_args) # Move the result into the "album" fields result = GainInfo(album_loudness=result.loudness, album_peak=result.peak) logger.debug( "Album (%d tracks): Calculated album gain: %r", len(filenames), result ) return result class Track: def __init__(self, filename, job_sem): self.filename = filename self.job_sem = job_sem self.tagger = Tagger(filename) self.gain = GainInfo() async def read_tags(self): async with self.job_sem: loop = asyncio.get_running_loop() self.gain = await loop.run_in_executor(None, self.tagger.read_gain) async def scan_gain(self): async with self.job_sem: gain_scanner = GainScanner() self.gain = await gain_scanner.scan_track(self.filename) async def write_tags(self): async with self.job_sem: loop = asyncio.get_running_loop() await loop.run_in_executor(None, self.tagger.write_gain, self.gain) async def scan(self, force=False, skip_save=False): await self.read_tags() need_scan = False if self.gain.loudness is None or self.gain.peak is None: need_scan = True if force: need_scan = True need_save = self.tagger.need_track_update if need_scan: await self.scan_gain() need_save = True if need_save: if not skip_save: await self.write_tags() print(self.filename) print(self.gain) if need_scan: print("Rescanned loudness") if need_save: if not skip_save: print("Updated tags") else: print("Needs tag update") print() class AlbumTrack(Track): def __init__(self, filename, job_sem, exclude): super().__init__(filename, job_sem) self.exclude = exclude class Album: def __init__(self, album_param, job_sem): self.job_sem = job_sem self.gain = GainInfo() self.tracks = [] for filename in album_param["track"]: self.tracks.append(AlbumTrack(filename, job_sem, exclude=False)) for filename in album_param["exclude"]: self.tracks.append(AlbumTrack(filename, job_sem, exclude=True)) async def read_tags(self): track_tasks = [track.read_tags() for track in self.tracks] await asyncio.gather(*track_tasks) async def scan_album_gain(self): included = [t.filename for t in self.tracks if not t.exclude] async with self.job_sem: gain_scanner = GainScanner() self.gain = await gain_scanner.scan_album(included) async def scan_gain(self): album_task = asyncio.ensure_future(self.scan_album_gain()) track_tasks = [track.scan_gain() for track in self.tracks] await asyncio.gather(album_task, *track_tasks) self.gain.album_peak = max([t.gain.peak for t in self.tracks]) logger.debug( "Album (%d tracks): Calculated album peak: %r", len(self.tracks), self.gain ) for track in self.tracks: track.gain.album_loudness = self.gain.album_loudness track.gain.album_peak = self.gain.album_peak async def write_tags(self): track_tasks = [track.write_tags() for track in self.tracks] await asyncio.gather(*track_tasks) async def scan(self, force=False, skip_save=False): await self.read_tags() need_scan = False for track in self.tracks: if track.gain.loudness is None or track.gain.peak is None: need_scan = True if self.gain.album_loudness is None: self.gain.album_loudness = track.gain.album_loudness if self.gain.album_loudness != track.gain.album_loudness: need_scan = True if self.gain.album_peak is None: self.gain.album_peak = track.gain.album_peak if self.gain.album_peak != track.gain.album_peak: need_scan = True if self.gain.album_loudness is None or self.gain.album_peak is None: need_scan = True if force: need_scan = True need_save = any([track.tagger.need_album_update for track in self.tracks]) if need_scan: await self.scan_gain() need_save = True if need_save: if not skip_save: await self.write_tags() print() for track in self.tracks: print(track.filename) print(track.gain) if need_scan: print("Rescanned loudness") if need_save: if not skip_save: print("Updated tags") else: print("Needs tag update") async def main(argv=None): parser = argparse.ArgumentParser( description=""" Add ReplayGain tags to files using the EBU R128 algorithm. """, epilog=""" If neither --track or --album are specified, the mode used depends on the number of files given as arguments. If a single file is given, it will be processed in track mode. If multiple files are given, they will be processed in album mode as a single album. """, ) parser.add_argument( "-n", "--dry-run", default=False, action="store_true", help=""" Only calculate and display the ReplayGain values; do not actually save the tags in the audio files. """, ) parser.add_argument( "-f", "--force", default=False, action="store_true", help=""" Recalculate the ReplayGain values even if valid tags are already present in the files. """, ) parser.add_argument( "--debug", dest="log_level", default=logging.WARNING, action="store_const", const=logging.DEBUG, help=""" Print a bunch of extra debugging output. """, ) parser.add_argument( "-j", "--jobs", type=int, default=multiprocessing.cpu_count(), help=""" The number of operations to run in parallel. The default is auto-detected, currently %(default)s. """, ) parser.add_argument( "-t", "--track", nargs="+", default=list(), metavar="FILE", action=TrackAction, help=""" Treat the following audio files as individual tracks. """, ) parser.add_argument( "-a", "--album", nargs="+", default=list(), metavar="FILE", action=AlbumAction, help=""" Treat the following audio files as part of the same album. Each time the --album option is specified, it starts a new album. """, ) parser.add_argument( "-e", "--exclude", nargs="+", default=list(), metavar="FILE", action=AlbumAction, help=""" Tag the following files as part of the current album, but do not use their audio when calculating the value for the album ReplayGain tag. """, ) parser.add_argument("FILE", nargs="*", default=[], help=argparse.SUPPRESS) args = parser.parse_args(argv) logging.basicConfig(format="%(levelname)s: %(message)s", level=args.log_level) logger.debug("Debug logging has been enabled") logger.debug("Command line arguments: %r", args) # Handle the "loose" arguments, by turning them into tracks or albums if len(args.FILE) + len(args.exclude) > 1 or len(args.exclude) > 0: # Treat the initial arguments as an album args.album.append({"track": list(args.FILE), "exclude": list(args.exclude)}) args.FILE = None args.exclude = None elif len(args.FILE) > 0: args.track.extend(args.FILE) args.FILE = None if len(args.track) == 0 and len(args.album) == 0: parser.print_usage() sys.exit(2) job_sem = asyncio.BoundedSemaphore(args.jobs) tasks = [] albums = [Album(album, job_sem) for album in args.album] tasks += [album.scan(force=args.force, skip_save=args.dry_run) for album in albums] tracks = [Track(track, job_sem) for track in args.track] tasks += [track.scan(force=args.force, skip_save=args.dry_run) for track in tracks] await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main(sys.argv[1:]))