scripts/regainer.py

1113 lines
39 KiB
Python

#!/usr/bin/env python3
# regainer - advanced ReplayGain tagging
# Copyright 2016 Calvin Walton <calvin.walton@kepstin.ca>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Advanced ReplayGain scanner and tagger"""
__version__ = "1.0.0"
import subprocess
import argparse
import asyncio
import multiprocessing
import re
import mutagen
from math import log10
from enum import Enum
import sys
import logging
import decimal
logger = logging.getLogger(__name__)
class AlbumAction(argparse.Action):
def __init__(self, option_strings, dest, **kwargs):
super(AlbumAction, self).__init__(option_strings, dest, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if namespace.album is None:
namespace.album = list()
if option_string == "-a" or option_string == "--album":
namespace.album.append({"track": list(values), "exclude": list()})
if option_string == "-e" or option_string == "--exclude":
if len(namespace.album) == 0:
if namespace.exclude is None:
namespace.exclude = values
else:
namespace.exclude.extend(values)
else:
namespace.album[-1]["exclude"].extend(values)
class TrackAction(argparse.Action):
def __init__(self, option_strings, dest, **kwargs):
super(TrackAction, self).__init__(option_strings, dest, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if namespace.track is None:
namespace.track = list(values)
else:
namespace.track.extend(values)
class GainInfo:
def __init__(self, loudness=None, album_loudness=None, peak=None, album_peak=None):
self.loudness = loudness
self.album_loudness = album_loudness
self.peak = peak
self.album_peak = album_peak
def __repr__(self):
return (
"GainInfo(loudness={}, peak={}, album_loudness={}, album_peak={})".format(
repr(self.loudness),
repr(self.peak),
repr(self.album_loudness),
repr(self.album_peak),
)
)
def __str__(self):
str = ""
str += "Track: "
if self.loudness is None:
str += "I: None"
else:
str += "I: {:.2f} LUFS".format(self.loudness)
str += ", "
if self.peak is None:
str += "Peak: None"
else:
str += "Peak: {:.2f} dBFS".format(self.peak)
str += "; Album: "
if self.album_loudness is None:
str += "I: None"
else:
str += "I: {:.2f} LUFS".format(self.album_loudness)
str += ", "
if self.album_peak is None:
str += "Peak: None"
else:
str += "Peak: {:.2f} dBFS".format(self.album_peak)
return str
class OggOpusMode(Enum):
R128 = 1
"""
Write R128 gain tags as specified in the Ogg Opus encapsulation doc.
This writes the tags R128_TRACK_GAIN and R128_ALBUM_GAIN to the file,
and removes any REPLAYGAIN tags that may be present. The R128 gain tags
use the EBU R128 -23 LUFS reference level.
This method is the most standards compliant, but has limited application
compatibility.
"""
REPLAYGAIN = 2
"""
Write REPLAYGAIN tags compatible with FLAC, Vorbis, etc.
This writes the tags REPLAYGAIN_{TRACK,ALBUM}_{GAIN,PEAK} to the file,
and removes any R128 gain tags that may be present. The REPLAYGAIN tags
use the -18 LUFS reference level from ReplayGain 2.0.
This method is against the spirit of the specifications, but has good
application compatibility (most applications share parsing code between
Opus tags and other formats).
"""
COMPATIBLE = 3
"""
Write both R128 gain and REPLAYGAIN tags.
This writes the tags from both the "standard" and "replaygain" modes at
the same time.
This method is against the spirit of the specifications, but ensures
maximum application compatibility. Since the same computed gain value
is used to generate both sets of tags (adjusted the the appropriate
different reference levels), it doesn't matter which one the application
uses - the result will be the same.
This is the default tagging method.
"""
class ID3Mode(Enum):
REPLAYGAIN = 1
"""Write REPLAYGAIN tags according to the ReplayGain 2.0 spec."""
RVA2 = 2
"""Use the ID3v2 RVA2 frames to store ReplayGain information."""
COMPATIBLE = 3
"""
Write both REPLAYGAIN tags and RVA2 frames.
This is the default tagging method.
"""
class Tagger:
REPLAYGAIN_REF = -18.0 # LUFS
R128_REF = -23.0 # LUFS
ogg_opus_mode = OggOpusMode.COMPATIBLE
id3_mode = ID3Mode.COMPATIBLE
def __init__(self, filename):
self.filename = filename
self.tags = GainInfo()
self.need_album_update = False
self.need_track_update = False
rg_gain_re = re.compile(r"^\s*([+-]?\d+(?:\.\d+)?)")
def parse_rg_gain(self, value):
m = self.rg_gain_re.match(value)
if m:
return self.REPLAYGAIN_REF - float(m.group(1))
return None
def format_rg_gain(self, loudness):
return "{:.2f} dB".format(self.REPLAYGAIN_REF - loudness)
rg_peak_re = re.compile(r"^\s*([+-]?\d+(?:\.\d+)?)")
def parse_rg_peak(self, value):
m = self.rg_peak_re.match(value)
if m:
peak = float(m.group(1))
if peak > 0.0:
return 20.0 * log10(peak)
else:
return float("-inf")
return None
def format_rg_peak(self, peak):
return "{:.6f}".format(10.0 ** (peak / 20.0))
opus_gain_re = re.compile(r"^\s*([+-]?\d{1,5})")
def parse_opus_gain(self, value):
m = self.opus_gain_re.match(value)
if m:
return self.R128_REF - float(m.group(1)) / 256.0
def format_opus_gain(self, loudness, context):
gain = int((self.R128_REF - loudness) * 256.0)
clipped_gain = max(-32768, min(gain, 32767))
if gain != clipped_gain:
logger.warning(
"%s: Clipping OggOpus R128 %s gain adjustment %.2f dB to %.2f dB",
self.filename,
context,
float(gain) / 256,
float(clipped_gain) / 256,
)
gain = clipped_gain
return "{:d}".format(gain)
def format_rva2_gain(self, loudness, context):
int_gain = decimal.Decimal.from_float(
(self.REPLAYGAIN_REF - loudness) * 512
).to_integral_value(decimal.ROUND_HALF_EVEN)
clipped_int_gain = max(-32768, min(int_gain, 32767))
if int_gain != clipped_int_gain:
logger.warning(
"%s: Clipping ID3 RVA2 %s gain adjustment %.2 dB to %.2f dB",
self.filename,
context,
float(int_gain) / 512,
float(clipped_int_gain) / 512,
)
int_gain = clipped_int_gain
return float(int_gain) / 512
def format_rva2_peak(self, peak, context):
int_peak = decimal.Decimal.from_float(
(10.0 ** (peak / 20.0)) * 32768
).to_integral_value(decimal.ROUND_HALF_EVEN)
clipped_int_peak = min(int_peak, 65535)
if int_peak != clipped_int_peak:
logger.warning(
"%s: Clipping ID3 RVA2 %s peak %.6f to %.6f",
self.filename,
context,
float(int_peak) / 32768,
float(clipped_int_peak) / 32768,
)
int_peak = clipped_int_peak
return float(int_peak) / 32768
def read_gain_id3(self):
need_update = False
have_replaygain = False
have_rva2 = False
# Load the standard REPLAYGAIN tags first
# Case-insensitive matching...
for tag in self.audio.tags.getall("TXXX"):
if tag.desc.lower() == "replaygain_track_gain":
if self.tags.loudness is None:
self.tags.loudness = self.parse_rg_gain(tag.text[0])
have_replaygain = True
if tag.desc != "REPLAYGAIN_TRACK_GAIN":
need_update = True
elif tag.desc.lower() == "replaygain_track_peak":
if self.tags.peak is None:
self.tags.peak = self.parse_rg_peak(tag.text[0])
have_replaygain = True
if tag.desc != "REPLAYGAIN_TRACK_PEAK":
need_update = True
elif tag.desc.lower() == "replaygain_album_gain":
if self.tags.album_loudness is None:
self.tags.album_loudness = self.parse_rg_gain(tag.text[0])
have_replaygain = True
if tag.desc != "REPLAYGAIN_ALBUM_GAIN":
need_update = True
elif tag.desc.lower() == "replaygain_album_peak":
if self.tags.album_peak is None:
self.tags.album_peak = self.parse_rg_peak(tag.text[0])
have_replaygain = True
if tag.desc != "REPLAYGAIN_ALBUM_GAIN":
need_update = True
# Try loading the legacy RVA2 tags if information is missing
rva2_t = self.audio.tags.get("RVA2:track")
if rva2_t is not None and rva2_t.channel == 1:
if self.tags.loudness is None or self.tags.peak is None:
self.tags.loudness = self.REPLAYGAIN_REF - rva2_t.gain
self.tags.peak = 20.0 * log10(rva2_t.peak)
have_rva2 = True
rva2_a = self.audio.tags.get("RVA2:album")
if rva2_a is not None and rva2_a.channel == 1:
if self.tags.album_loudness is None or self.tags.album_peak is None:
self.tags.album_loudness = self.REPLAYGAIN_REF - rva2_a.gain
self.tags.album_peak = 20.0 * log10(rva2_a.peak)
have_rva2 = True
if have_rva2 and not (
self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE
):
need_update = True
if have_replaygain and not (
self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE
):
need_update = True
if not have_rva2 and (
self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE
):
need_update = True
if not have_replaygain and (
self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE
):
need_update = True
self.need_track_update = need_update
self.need_album_update = need_update
return
def read_gain_ogg_opus(self):
need_update = False
have_r128 = False
have_replaygain = False
# Read the opus-specific 'R128' tags
r128_tg = self.audio.get("R128_TRACK_GAIN")
if r128_tg is not None:
if self.tags.loudness is None:
self.tags.loudness = self.parse_opus_gain(r128_tg[0])
have_r128 = True
r128_ag = self.audio.get("R128_ALBUM_GAIN")
if r128_ag is not None:
if self.tags.album_loudness is None:
self.tags.album_loudness = self.parse_opus_gain(r128_ag[0])
have_r128 = True
# For compatibility, also read the generic replaygain tags
rg_tg = self.audio.get("REPLAYGAIN_TRACK_GAIN")
if rg_tg is not None:
if self.tags.loudness is None:
self.tags.loudness = self.parse_rg_gain(rg_tg[0])
have_replaygain = True
rg_tp = self.audio.get("REPLAYGAIN_TRACK_PEAK")
if rg_tp is not None:
if self.tags.peak is None:
self.tags.peak = self.parse_rg_peak(rg_tp[0])
have_replaygain = True
rg_ag = self.audio.get("REPLAYGAIN_ALBUM_GAIN")
if rg_ag is not None:
if self.tags.album_loudness is None:
self.tags.album_loudness = self.parse_rg_gain(rg_ag[0])
have_replaygain = True
rg_ap = self.audio.get("REPLAYGAIN_ALBUM_PEAK")
if rg_ap is not None:
if self.tags.album_peak is None:
self.tags.album_peak = self.parse_rg_peak(rg_ap[0])
have_replaygain = True
# This is a hack, R128 gain tags don't store peak, but
# we want to mark it as valid tag even without the peak
if have_r128 and self.ogg_opus_mode is OggOpusMode.R128:
if self.tags.loudness is not None and self.tags.peak is None:
self.tags.peak = float("nan")
if self.tags.album_loudness is not None and self.tags.album_peak is None:
self.tags.album_peak = float("nan")
if have_r128 and not (
self.ogg_opus_mode is OggOpusMode.R128
or self.ogg_opus_mode is OggOpusMode.COMPATIBLE
):
need_update = True
if have_replaygain and not (
self.ogg_opus_mode is OggOpusMode.REPLAYGAIN
or self.ogg_opus_mode is OggOpusMode.COMPATIBLE
):
need_update = True
if not have_r128 and (
self.ogg_opus_mode is OggOpusMode.R128
or self.ogg_opus_mode is OggOpusMode.COMPATIBLE
):
need_update = True
if not have_replaygain and (
self.ogg_opus_mode is OggOpusMode.REPLAYGAIN
or self.ogg_opus_mode is OggOpusMode.COMPATIBLE
):
need_update = True
self.need_track_update = need_update
self.need_album_update = need_update
def read_gain_mp4(self):
# These are the tags used by foobar2000, and are compatible with
# rockbox.
for key, value in self.audio.tags.items():
atom_name = key[:4]
if atom_name != "----":
continue
_, mean, name = key.split(":", 2)
if not (
mean == "com.apple.iTunes" or mean == "org.hydrogenaudio.replaygain"
):
continue
if value[0].dataformat == mutagen.mp4.AtomDataType.UTF8:
value = value[0].decode(encoding="UTF-8")
else:
continue
name = name.lower()
if name == "replaygain_track_gain":
if self.tags.loudness is None:
self.tags.loudness = self.parse_rg_gain(value)
continue
if name == "replaygain_track_peak":
if self.tags.peak is None:
self.tags.peak = self.parse_rg_peak(value)
continue
if name == "replaygain_album_gain":
if self.tags.album_loudness is None:
self.tags.album_loudness = self.parse_rg_gain(value)
continue
if name == "replaygain_album_peak":
if self.tags.album_peak is None:
self.tags.album_peak = self.parse_rg_peak(value)
continue
return self.tags
def read_gain_generic(self):
rg_tg = self.audio.get("REPLAYGAIN_TRACK_GAIN")
if rg_tg is not None:
if self.tags.loudness is None:
self.tags.loudness = self.parse_rg_gain(rg_tg[0])
rg_tp = self.audio.get("REPLAYGAIN_TRACK_PEAK")
if rg_tp is not None:
if self.tags.peak is None:
self.tags.peak = self.parse_rg_peak(rg_tp[0])
rg_ag = self.audio.get("REPLAYGAIN_ALBUM_GAIN")
if rg_ag is not None:
if self.tags.album_loudness is None:
self.tags.album_loudness = self.parse_rg_gain(rg_ag[0])
rg_ap = self.audio.get("REPLAYGAIN_ALBUM_PEAK")
if rg_ap is not None:
if self.tags.album_peak is None:
self.tags.album_peak = self.parse_rg_peak(rg_ap[0])
def read_gain(self):
self.need_track_update = False
self.need_album_update = False
self.audio = mutagen.File(self.filename)
if isinstance(self.audio, mutagen.id3.ID3FileType) and self.audio.tags is None:
self.audio.add_tags()
if self.audio is None or self.audio.tags is None:
raise Exception(
"Unable to determine tag format for file: {}".format(self.filename)
)
if isinstance(self.audio.tags, mutagen.id3.ID3):
self.read_gain_id3()
elif isinstance(self.audio, mutagen.oggopus.OggOpus):
self.read_gain_ogg_opus()
elif isinstance(self.audio, mutagen.mp4.MP4):
self.read_gain_mp4()
else:
self.read_gain_generic()
if self.tags.album_loudness is not None or self.tags.album_peak is not None:
self.need_track_update = True
return self.tags
def write_gain_id3(self):
logger.debug(
"%s: Writing ID3 tags using mode %s", self.filename, self.id3_mode.name
)
# Delete standard ReplayGain tags
to_delete = []
for tag in self.audio.tags.getall("TXXX"):
name = tag.desc.lower()
if (
name == "replaygain_track_gain"
or name == "replaygain_track_peak"
or name == "replaygain_album_gain"
or name == "replaygain_album_peak"
or name == "replaygain_reference_loudness"
):
to_delete.append(tag.HashKey)
for key in to_delete:
logger.debug("%s: Removing %s", self.filename, key)
del self.audio.tags[key]
# Delete RVA2 frames
if "RVA2:track" in self.audio:
logger.debug("%s: Removing RVA2:track", self.filename)
del self.audio["RVA2:track"]
if "RVA2:album" in self.audio:
logger.debug("%s: Removing RVA2:album", self.filename)
del self.audio["RVA2:album"]
if self.id3_mode is ID3Mode.REPLAYGAIN or self.id3_mode is ID3Mode.COMPATIBLE:
if self.tags.loudness is not None:
gain = self.format_rg_gain(self.tags.loudness)
logger.debug(
"%s: Adding TXXX:REPLAYGAIN_TRACK_GAIN=%s", self.filename, gain
)
tag = mutagen.id3.TXXX(
encoding=0, desc="REPLAYGAIN_TRACK_GAIN", text=[gain]
)
self.audio.tags.add(tag)
if self.tags.peak is not None:
peak = self.format_rg_peak(self.tags.peak)
logger.debug(
"%s: Adding TXXX:REPLAYGAIN_TRACK_PEAK=%s", self.filename, peak
)
tag = mutagen.id3.TXXX(
encoding=0, desc="REPLAYGAIN_TRACK_PEAK", text=[peak]
)
self.audio.tags.add(tag)
if self.tags.album_loudness is not None:
gain = self.format_rg_gain(self.tags.album_loudness)
logger.debug(
"%s: Adding TXXX:REPLAYGAIN_ALBUM_GAIN=%s", self.filename, gain
)
tag = mutagen.id3.TXXX(
encoding=0, desc="REPLAYGAIN_ALBUM_GAIN", text=[gain]
)
self.audio.tags.add(tag)
if self.tags.album_peak is not None:
peak = self.format_rg_peak(self.tags.album_peak)
logger.debug(
"%s: Adding TXXX:REPLAYGAIN_ALBUM_PEAK=%s", self.filename, peak
)
tag = mutagen.id3.TXXX(
encoding=0, desc="REPLAYGAIN_ALBUM_PEAK", text=[peak]
)
self.audio.tags.add(tag)
if self.id3_mode is ID3Mode.RVA2 or self.id3_mode is ID3Mode.COMPATIBLE:
if self.tags.loudness is not None and self.tags.peak is not None:
gain = self.REPLAYGAIN_REF - self.tags.loudness
peak = self.format_rva2_peak(self.tags.peak, "track")
logger.debug(
"%s: Adding RVA2:track={channel=1, gain=%f, peak=%f}",
self.filename,
gain,
peak,
)
tag = mutagen.id3.RVA2(desc="track", channel=1, gain=gain, peak=peak)
self.audio.tags.add(tag)
if (
self.tags.album_loudness is not None
and self.tags.album_peak is not None
):
gain = self.REPLAYGAIN_REF - self.tags.album_loudness
peak = self.format_rva2_peak(self.tags.album_peak, "album")
logger.debug(
"%s: Adding RVA2:album={channel=1, gain=%f, peak=%f}",
self.filename,
gain,
peak,
)
tag = mutagen.id3.RVA2(desc="album", channel=1, gain=gain, peak=peak)
self.audio.tags.add(tag)
self.audio.tags.update_to_v24()
self.audio.save()
def write_gain_ogg_opus(self):
logger.debug(
"%s: Writing OggOpus tags using mode %s",
self.filename,
self.ogg_opus_mode.name,
)
self.write_gain_generic_cleanup()
if (
self.ogg_opus_mode is OggOpusMode.R128
or self.ogg_opus_mode is OggOpusMode.COMPATIBLE
):
if self.tags.loudness is not None:
gain = self.format_opus_gain(self.tags.loudness, "track")
logger.debug("%s: Adding R128_TRACK_GAIN=%s", self.filename, gain)
self.audio["R128_TRACK_GAIN"] = [gain]
if self.tags.album_loudness is not None:
gain = self.format_opus_gain(self.tags.album_loudness, "album")
logger.debug("%s: Adding R128_ALBUM_GAIN=%s", self.filename, gain)
self.audio["R128_ALBUM_GAIN"] = [gain]
if (
self.ogg_opus_mode is OggOpusMode.REPLAYGAIN
or self.ogg_opus_mode is OggOpusMode.COMPATIBLE
):
self.write_gain_generic_tags()
self.audio.save()
def write_gain_mp4(self):
logger.debug("%s: Writing MP4 tags", self.filename)
to_delete = []
for key, value in self.audio.tags.items():
atom_name = key[:4]
if atom_name != "----":
continue
_, mean, name = key.split(":", 2)
if not (
mean == "com.apple.iTunes" or mean == "org.hydrogenaudio.replaygain"
):
continue
if value[0].dataformat == mutagen.mp4.AtomDataType.UTF8:
value = value[0].decode(encoding="UTF-8")
else:
continue
name = name.lower()
if (
name == "replaygain_track_gain"
or name == "replaygain_track_peak"
or name == "replaygain_album_gain"
or name == "replaygain_album_peak"
):
to_delete.append(key)
for key in to_delete:
logger.debug("%s: Removing %s", self.filename, key)
del self.audio.tags[key]
# These are the tags used by foobar2000, and are compatible with
# rockbox.
if self.tags.loudness is not None:
gain = self.format_rg_gain(self.tags.loudness)
logger.debug(
"%s: Adding ----:com.apple.iTunes:REPLAYGAIN_TRACK_GAIN=%s",
self.filename,
gain,
)
tag = mutagen.mp4.MP4FreeForm(gain.encode(encoding="UTF-8"))
self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_TRACK_GAIN"] = [tag]
if self.tags.peak is not None:
peak = self.format_rg_peak(self.tags.peak)
logger.debug(
"%s: Adding ----:com.apple.iTunes:REPLAYGAIN_TRACK_PEAK=%s",
self.filename,
peak,
)
tag = mutagen.mp4.MP4FreeForm(peak.encode(encoding="UTF-8"))
self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_TRACK_PEAK"] = [tag]
if self.tags.album_loudness is not None:
gain = self.format_rg_gain(self.tags.album_loudness)
logger.debug(
"%s: Adding ----:com.apple.iTunes:REPLAYGAIN_ALBUM_GAIN=%s",
self.filename,
gain,
)
tag = mutagen.mp4.MP4FreeForm(gain.encode(encoding="UTF-8"))
self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_ALBUM_GAIN"] = [tag]
if self.tags.album_peak is not None:
peak = self.format_rg_peak(self.tags.album_peak)
logger.debug(
"%s: Adding ----:com.apple.iTunes:REPLAYGAIN_ALBUM_PEAK=%s",
self.filename,
peak,
)
tag = mutagen.mp4.MP4FreeForm(peak.encode(encoding="UTF-8"))
self.audio.tags["----:com.apple.iTunes:REPLAYGAIN_ALBUM_PEAK"] = [tag]
self.audio.save()
def write_gain_generic_cleanup(self):
for tag in [
# Standard tags
"REPLAYGAIN_TRACK_GAIN",
"REPLAYGAIN_TRACK_PEAK",
"REPLAYGAIN_ALBUM_GAIN",
"REPLAYGAIN_ALBUM_PEAK",
# Unusual/old tags
"REPLAYGAIN_REFERENCE_LOUDNESS",
# OggOpus R128 gain tags (shouldn't be in other formats)
"R128_TRACK_GAIN",
"R128_ALBUM_GAIN",
]:
if tag in self.audio:
logger.debug("%s: Removing %s", self.filename, tag)
del self.audio[tag]
def write_gain_generic(self):
logger.debug("%s: Writing Generic tags", self.filename)
self.write_gain_generic_cleanup()
self.write_gain_generic_tags()
self.audio.save()
def write_gain_generic_tags(self):
if self.tags.loudness is not None:
gain = self.format_rg_gain(self.tags.loudness)
logger.debug("%s: Adding REPLAYGAIN_TRACK_GAIN=%s", self.filename, gain)
self.audio["REPLAYGAIN_TRACK_GAIN"] = [gain]
if self.tags.peak is not None:
peak = self.format_rg_peak(self.tags.peak)
logger.debug("%s: Adding REPLAYGAIN_TRACK_PEAK=%s", self.filename, peak)
self.audio["REPLAYGAIN_TRACK_PEAK"] = [peak]
if self.tags.album_loudness is not None:
gain = self.format_rg_gain(self.tags.album_loudness)
logger.debug("%s: Adding REPLAYGAIN_ALBUM_GAIN=%s", self.filename, gain)
self.audio["REPLAYGAIN_ALBUM_GAIN"] = [gain]
if self.tags.album_peak is not None:
peak = self.format_rg_peak(self.tags.album_peak)
logger.debug("%s: Adding REPLAYGAIN_ALBUM_PEAK=%s", self.filename, peak)
self.audio["REPLAYGAIN_ALBUM_PEAK"] = [peak]
def write_gain(self, tags):
self.tags = tags
if self.audio is None:
raise Exception("write_gain called without previous read_gain")
if isinstance(self.audio.tags, mutagen.id3.ID3):
return self.write_gain_id3()
if isinstance(self.audio, mutagen.oggopus.OggOpus):
return self.write_gain_ogg_opus()
if isinstance(self.audio, mutagen.mp4.MP4):
return self.write_gain_mp4()
return self.write_gain_generic()
class GainScanner:
i_re = re.compile(r"^\s+I:\s+(-?\d+\.\d+) LUFS$", re.M)
peak_re = re.compile(r"^\s+Peak:\s+(-?(?:\d+\.\d+|inf)) dBFS$", re.M)
async def ffmpeg_parse_ebur128(self, *ff_opts):
ff_args = [
"ffmpeg",
"-nostats",
"-nostdin",
"-hide_banner",
"-vn",
"-loglevel",
"info",
]
ff_args += ff_opts
ff_args += ["-f", "null", "-"]
logger.debug("GainScanner%d: ffmpeg command: %r", id(self), ff_args)
ffmpeg = await asyncio.create_subprocess_exec(
*ff_args,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE
)
result = GainInfo()
while True:
line_bytes = await ffmpeg.stderr.readline()
if len(line_bytes) == 0:
break
line_str = str(line_bytes, errors='replace').rstrip()
logger.debug("GainScanner%d: ffmpeg: %s", id(self), line_str)
m = self.i_re.search(line_str)
if m:
result.loudness = float(m.group(1))
logger.debug(
"GainScanner%d: Parsed loudness: %f", id(self), result.loudness
)
m = self.peak_re.search(line_str)
if m:
result.peak = float(m.group(1))
logger.debug("GainScanner%d: Parsed peak: %f", id(self), result.peak)
await ffmpeg.wait()
if ffmpeg.returncode != 0:
raise RuntimeError("ffmpeg exited with code {}".format(ffmpeg.returncode))
logger.debug("GainScanner%d: Result: %r", id(self), result)
return result
async def scan_track(self, filename):
result = await self.ffmpeg_parse_ebur128(
"-i",
"file:" + filename,
"-filter_complex",
"ebur128=framelog=verbose:peak=true[out]",
"-map",
"[out]",
)
logger.debug("%s: Calculated track gain: %r", filename, result)
return result
async def scan_album(self, filenames):
if len(filenames) == 0:
raise ValueError("filenames is empty")
ff_args = []
for filename in filenames:
ff_args += ["-i", "file:" + filename]
ff_args += [
"-filter_complex",
"concat=n={}:v=0:a=1,ebur128=framelog=verbose:peak=none[out]".format(
len(filenames)
),
"-map",
"[out]",
]
result = await self.ffmpeg_parse_ebur128(*ff_args)
# Move the result into the "album" fields
result = GainInfo(album_loudness=result.loudness, album_peak=result.peak)
logger.debug(
"Album (%d tracks): Calculated album gain: %r", len(filenames), result
)
return result
class Track:
def __init__(self, filename, job_sem):
self.filename = filename
self.job_sem = job_sem
self.tagger = Tagger(filename)
self.gain = GainInfo()
async def read_tags(self):
async with self.job_sem:
loop = asyncio.get_running_loop()
self.gain = await loop.run_in_executor(None, self.tagger.read_gain)
async def scan_gain(self):
async with self.job_sem:
gain_scanner = GainScanner()
self.gain = await gain_scanner.scan_track(self.filename)
async def write_tags(self):
async with self.job_sem:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.tagger.write_gain, self.gain)
async def scan(self, force=False, skip_save=False):
await self.read_tags()
need_scan = False
if self.gain.loudness is None or self.gain.peak is None:
need_scan = True
if force:
need_scan = True
need_save = self.tagger.need_track_update
if need_scan:
await self.scan_gain()
need_save = True
if need_save:
if not skip_save:
await self.write_tags()
print(self.filename)
print(self.gain)
if need_scan:
print("Rescanned loudness")
if need_save:
if not skip_save:
print("Updated tags")
else:
print("Needs tag update")
print()
class AlbumTrack(Track):
def __init__(self, filename, job_sem, exclude):
super().__init__(filename, job_sem)
self.exclude = exclude
class Album:
def __init__(self, album_param, job_sem):
self.job_sem = job_sem
self.gain = GainInfo()
self.tracks = []
for filename in album_param["track"]:
self.tracks.append(AlbumTrack(filename, job_sem, exclude=False))
for filename in album_param["exclude"]:
self.tracks.append(AlbumTrack(filename, job_sem, exclude=True))
async def read_tags(self):
track_tasks = [track.read_tags() for track in self.tracks]
await asyncio.gather(*track_tasks)
async def scan_album_gain(self):
included = [t.filename for t in self.tracks if not t.exclude]
async with self.job_sem:
gain_scanner = GainScanner()
self.gain = await gain_scanner.scan_album(included)
async def scan_gain(self):
album_task = asyncio.ensure_future(self.scan_album_gain())
track_tasks = [track.scan_gain() for track in self.tracks]
await asyncio.gather(album_task, *track_tasks)
self.gain.album_peak = max([t.gain.peak for t in self.tracks])
logger.debug(
"Album (%d tracks): Calculated album peak: %r", len(self.tracks), self.gain
)
for track in self.tracks:
track.gain.album_loudness = self.gain.album_loudness
track.gain.album_peak = self.gain.album_peak
async def write_tags(self):
track_tasks = [track.write_tags() for track in self.tracks]
await asyncio.gather(*track_tasks)
async def scan(self, force=False, skip_save=False):
await self.read_tags()
need_scan = False
for track in self.tracks:
if track.gain.loudness is None or track.gain.peak is None:
need_scan = True
if self.gain.album_loudness is None:
self.gain.album_loudness = track.gain.album_loudness
if self.gain.album_loudness != track.gain.album_loudness:
need_scan = True
if self.gain.album_peak is None:
self.gain.album_peak = track.gain.album_peak
if self.gain.album_peak != track.gain.album_peak:
need_scan = True
if self.gain.album_loudness is None or self.gain.album_peak is None:
need_scan = True
if force:
need_scan = True
need_save = any([track.tagger.need_album_update for track in self.tracks])
if need_scan:
await self.scan_gain()
need_save = True
if need_save:
if not skip_save:
await self.write_tags()
print()
for track in self.tracks:
print(track.filename)
print(track.gain)
if need_scan:
print("Rescanned loudness")
if need_save:
if not skip_save:
print("Updated tags")
else:
print("Needs tag update")
async def main(argv=None):
parser = argparse.ArgumentParser(
description="""
Add ReplayGain tags to files using the EBU R128 algorithm.
""",
epilog="""
If neither --track or --album are specified, the mode used depends on
the number of files given as arguments. If a single file is given, it
will be processed in track mode. If multiple files are given, they
will be processed in album mode as a single album.
""",
)
parser.add_argument(
"-n",
"--dry-run",
default=False,
action="store_true",
help="""
Only calculate and display the ReplayGain values; do not actually
save the tags in the audio files.
""",
)
parser.add_argument(
"-f",
"--force",
default=False,
action="store_true",
help="""
Recalculate the ReplayGain values even if valid tags are already
present in the files.
""",
)
parser.add_argument(
"--debug",
dest="log_level",
default=logging.WARNING,
action="store_const",
const=logging.DEBUG,
help="""
Print a bunch of extra debugging output.
""",
)
parser.add_argument(
"-j",
"--jobs",
type=int,
default=multiprocessing.cpu_count(),
help="""
The number of operations to run in parallel. The default is
auto-detected, currently %(default)s.
""",
)
parser.add_argument(
"-t",
"--track",
nargs="+",
default=list(),
metavar="FILE",
action=TrackAction,
help="""
Treat the following audio files as individual tracks.
""",
)
parser.add_argument(
"-a",
"--album",
nargs="+",
default=list(),
metavar="FILE",
action=AlbumAction,
help="""
Treat the following audio files as part of the same album.
Each time the --album option is specified, it starts a new album.
""",
)
parser.add_argument(
"-e",
"--exclude",
nargs="+",
default=list(),
metavar="FILE",
action=AlbumAction,
help="""
Tag the following files as part of the current album, but do not
use their audio when calculating the value for the album ReplayGain
tag.
""",
)
parser.add_argument("FILE", nargs="*", default=[], help=argparse.SUPPRESS)
args = parser.parse_args(argv)
logging.basicConfig(format="%(levelname)s: %(message)s", level=args.log_level)
logger.debug("Debug logging has been enabled")
logger.debug("Command line arguments: %r", args)
# Handle the "loose" arguments, by turning them into tracks or albums
if len(args.FILE) + len(args.exclude) > 1 or len(args.exclude) > 0:
# Treat the initial arguments as an album
args.album.append({"track": list(args.FILE), "exclude": list(args.exclude)})
args.FILE = None
args.exclude = None
elif len(args.FILE) > 0:
args.track.extend(args.FILE)
args.FILE = None
if len(args.track) == 0 and len(args.album) == 0:
parser.print_usage()
sys.exit(2)
job_sem = asyncio.BoundedSemaphore(args.jobs)
tasks = []
albums = [Album(album, job_sem) for album in args.album]
tasks += [album.scan(force=args.force, skip_save=args.dry_run) for album in albums]
tracks = [Track(track, job_sem) for track in args.track]
tasks += [track.scan(force=args.force, skip_save=args.dry_run) for track in tracks]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))