Commit 38a569a5 authored by Hermann Krumrey's avatar Hermann Krumrey
Browse files

Cleaned up available scripts

parent 2e3aa6b5
Loading
Loading
Loading
Loading

bin/anitheme-dl

deleted100644 → 0
+0 −690
Original line number Diff line number Diff line
#!/usr/bin/env python

"""LICENSE
Copyright 2015 Hermann Krumrey <hermann@krumreyh.com>

This file is part of toktokkie.

toktokkie is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

toktokkie is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with toktokkie.  If not, see <http://www.gnu.org/licenses/>.
LICENSE"""

import os
import json
import time
import shutil
import requests
import argparse
from typing import Tuple, Dict, List, Any
from bs4 import BeautifulSoup
from subprocess import check_call, STDOUT, CalledProcessError
from mutagen.easyid3 import EasyID3
# noinspection PyProtectedMember
from mutagen.id3 import ID3, APIC, TPE2
from colorama import Fore, Style


def main():
    """
    The main function of this script
    :return: None
    """
    year, season, destination = parse_args()
    prepare_dirs(destination)

    series_names = load_titles(year, season)
    selected_series = prompt_selection(series_names, destination)

    print("Loading data")
    selected_songs = load_data(year, season, destination, selected_series)
    selected_songs = handle_excludes(selected_songs, destination)

    print("Downloading Openings")
    download_webms(selected_songs)
    print("Downloading Cover Images")
    download_covers(selected_songs)
    print("Converting to MP3")
    convert_to_mp3(selected_songs)

    print("Setting MP3 metadata")
    set_mp3_metadata(year, selected_songs)

    print("Generating Artist/Album Structure")
    generate_artist_album_structure(destination, selected_songs)

    print("Done")


def parse_args() -> Tuple[int, str, str]:
    """
    Parses the CLI arguments
    :return: The year to download,
             The season to download,
             The path in which to store the files
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("year", type=int,
                        help="The year for which to download songs")
    parser.add_argument("season", type=str,
                        choices={"Spring", "Winter", "Summer", "Fall"},
                        help="The season for which to download songs.")
    parser.add_argument("--out", "-o", default="dl",
                        help="The destination directory")
    args = parser.parse_args()

    return args.year, args.season, args.out


def prepare_dirs(destination: str):
    """
    Prepares the directories in which to store the files
    :param destination: The destination to prepare
    :return: None
    """

    for subdir in ["webm", "mp3", "covers"]:
        path = os.path.join(destination, subdir)
        if not os.path.isdir(path):
            os.makedirs(path)


def execute_command(command: List[str]) -> int:
    """
    Executes a command
    :param command: The command to execute
    :return: The status code
    """
    print(Fore.LIGHTYELLOW_EX + " ".join(command) + Style.RESET_ALL)
    with open(os.devnull, "w") as devnull:
        try:
            code = check_call(command, stdout=devnull, stderr=STDOUT)
            if code != 0:
                print("{}Error Code {}{}".format(
                    Fore.LIGHTRED_EX, code, Style.RESET_ALL))
            return code
        except CalledProcessError as e:
            print(Fore.LIGHTRED_EX + "Called Process Error: " + str(e)
                  + Style.RESET_ALL)
            return 1


def aggresive_request(url: str) -> str:
    """
    Handles GET requests while analyzing status codes
    :param url: The URL to get
    :return: The response text
    """
    time.sleep(1)
    headers = {"User-Agent": "Mozilla/5.0"}
    resp = requests.get(url, headers=headers)

    while resp.status_code != 200:
        print("{}HTTP Error Code: {}{}"
              .format(Fore.RED, resp.status_code, Style.RESET_ALL))
        print(resp.headers)
        print(resp.text)
        time.sleep(60)
        resp = requests.get(url, headers=headers)

    return resp.text


def load_titles(
        year: int,
        season: str,
        include_previous_season: bool = True
) -> List[str]:
    """
    Loads a list of titles which can then be selected by the user
    :param year: The year for which to fetch titles
    :param season: The season for which to fetch titles
    :param include_previous_season: Whether to include the previous season
    :return: The list of titles
    """
    url = "https://old.reddit.com/r/AnimeThemes/wiki/{}#wiki_{}_{}_season" \
        .format(year, year, season)
    response = aggresive_request(url)

    soup = BeautifulSoup(response, "html.parser")
    listings = soup.find("div", {"class": "md wiki"})

    entries = listings.find_all("h3")
    entries = list(map(lambda x: x.text, entries))

    position = {"Winter": 1, "Spring": 2, "Summer": 3, "Fall": 4}
    segments = segmentize(entries)

    this_segment = segments[-position[season]]

    if include_previous_season:
        if season == "Winter":
            additional_segment = load_titles(year - 1, "Fall", False)
        else:
            additional_segment = segments[-position[season] + 1]
        return additional_segment + this_segment
    else:
        return this_segment


def segmentize(titles: List[str]) -> List[List[str]]:
    """
    Segments a list of titles into segments
    :param titles: The titles to segmentize
    :return: The segments
    """

    segments = []
    current_segment = []

    for i, title in enumerate(titles):
        if i > 0 \
                and titles[i - 1] > title \
                and titles[i - 1][0].lower() != title[0].lower():
            segments.append(current_segment)
            current_segment = []
        current_segment.append(title)
    segments.append(current_segment)

    return segments


def load_data(
        year: int,
        season: str,
        destination: str,
        selected_series: List[str],
        include_previous_season: bool = True
) -> List[Dict[str, Any]]:
    """
    Loads the Opening/Ending information from a combination of sources.
    :param year: The year to check
    :param season: The season to check
    :param destination: The destination at which to store the downloaded files
    :param selected_series: The series to consider
    :param include_previous_season: Whether to load data from previous seasons
    :return: The information in the following format:
                [
                    {show, type, song, link, filename,
                    mp3_file, webm_file, cover_file,
                    mal_id, mal_title, mal_cover, mal_openings, mal_endings,
                    song_info}
                ]
    """
    url = "https://old.reddit.com/r/AnimeThemes/wiki/{}#wiki_{}_{}_season" \
        .format(year, year, season)
    response = aggresive_request(url)

    soup = BeautifulSoup(response, "html.parser")
    listings = soup.find("div", {"class": "md wiki"})

    entries = listings.find_all("h3")
    tables = listings.find_all("tbody")

    tablemap = {}
    for entry in entries:
        tablemap[entry.text] = tables.pop(0)

    data = []
    seasonal_mal_ids = get_seasonal_mal_ids(year, season)

    for title, table in tablemap.items():

        if title not in selected_series:
            continue

        print("Loading data for {}...".format(title))
        mal_id = resolve_mal_id(title, seasonal_mal_ids)
        mal_data = load_mal_data(mal_id)

        rows = table.find_all("tr")

        for row in rows:
            columns = row.find_all("td")
            description = columns[0].text

            try:
                link = columns[1].find("a")["href"]
            except TypeError:  # Avoid missing links
                continue

            if not description:
                continue

            entry = {
                "show": title,
                "type": description.split("\"", 1)[0].strip(),
                "song": description.split("\"", 1)[1].rsplit("\"", 1)[0],
                "link": link
            }
            entry["filename"] = "{} {} - {}".format(
                title, entry["type"], entry["song"]
            )
            entry["webm_file"] = \
                os.path.join(destination, "webm", entry["filename"] + ".webm")
            entry["mp3_file"] = \
                os.path.join(destination, "mp3", entry["filename"] + ".mp3")
            entry["cover_file"] = \
                os.path.join(destination, "covers", entry["filename"] + ".jpg")

            for key, value in mal_data.items():
                entry[key] = value

            song_info = resolve_song_info(entry)
            entry["song_info"] = song_info

            data.append(entry)

    # load data from last year if season is Winter
    if include_previous_season and season == "Winter":
        previous_season_data = \
            load_data(year - 1, "Fall", destination, selected_series, False)
        return previous_season_data + data
    else:
        return data


def get_seasonal_mal_ids(year: int, season: str) -> Dict[str, int]:
    """
    Retrieves the myanimelist IDs for every show in an entire season
    :param year: The year of the season to check
    :param season: The season to check
    :return: A dictionary mapping series titles to myanimelist IDs
    """
    url = "https://api.jikan.moe/v3/season/{}/{}".format(year, season.lower())
    resp = aggresive_request(url)
    info = json.loads(resp)["anime"]

    malmap = {}
    for entry in info:
        malmap[entry["title"]] = entry["mal_id"]

    # Special Cases:
    if year >= 2019:
        malmap["Fruits Basket"] = 38680

    return malmap


def resolve_mal_id(series: str, seasonal_mal_ids: Dict[str, int]) -> int:
    """
    Finds out the myanimelist ID of a series
    :param series: The series for which to get the myanimelist ID for
    :param seasonal_mal_ids: The previously fetched seasonal MAL IDs
    :return: The myanimelist ID
    """

    mal_id = seasonal_mal_ids.get(series)
    if mal_id is not None:
        return mal_id

    url = "https://api.jikan.moe/v3/search/anime/?q={}&page=1".format(series)
    resp = aggresive_request(url)
    mal_id = json.loads(resp)["results"][0]["mal_id"]

    return mal_id


def load_mal_data(mal_id: int) -> Dict[str, Any]:
    """
    Loads information about a myanimelist ID
    :param mal_id: The myanimelist ID to check
    :return: The information fetched from myanimelist
    """
    url = "https://api.jikan.moe/v3/anime/{}".format(mal_id)
    resp = aggresive_request(url)
    info = json.loads(resp)

    song_info = {"opening_themes": [], "ending_themes": []}
    for song_type in song_info.keys():
        for song in info[song_type]:
            title = song.split("\"", 2)[1]
            artist = song.split("\"", 2)[2] \
                .replace("by ", "") \
                .split("(")[0]\
                .strip()
            episodes = song.split("\"", 2)[2].split("(")
            if len(episodes) > 1:
                episodes = episodes[1].split(")")[0].strip()
            else:
                episodes = ""
            song_info[song_type].append((title, artist, episodes))

    return {
        "mal_id": mal_id,
        "mal_title": info["title"],
        "mal_cover": info["image_url"],
        "mal_openings": song_info["opening_themes"],
        "mal_endings": song_info["ending_themes"]
    }


def resolve_song_info(song: Dict[str, Any]) -> Tuple[str, str, str]:
    """
    Resolves the song information for a song
    :param song: The song to get the info for
    :return: The song title, artist, episodes
    """

    song_type = song["type"].upper().split(" ")[0]
    if "OP" in song_type:
        theme_list = song["mal_openings"]
    elif "ED" in song_type:
        theme_list = song["mal_endings"]
    else:
        return "Unknown", "Unknown", "Unknown"

    number = song_type.replace("OP", "").replace("ED", "")
    if number == "":
        number = "1"
    number = int(number)

    if len(theme_list) >= number:
        return theme_list[number - 1]
    else:
        return "Unknown", "Unknown", "Unknown"


def prompt_selection(shows: List[str], destination: str) -> List[str]:
    """
    Prompts the user for a selection of series for which to download songs
    :param shows: All series that are up for selection
    :param destination: The destination directory, which may contain data
                        about previous selections
    :return: A list of series names that were selected
    """
    selection_file = os.path.join(destination, "selection.json")
    if os.path.isfile(selection_file):
        with open(selection_file, "r") as f:
            old_selection = json.loads(f.read())

        while True:
            resp = input("Use previous selection? {} (y|n)"
                         .format(old_selection))
            if resp.lower() in ["y", "n"]:
                if resp.lower() == "y":
                    return old_selection
                else:
                    break
            else:
                continue

    segments = segmentize(shows)
    counter = 0
    for segment in segments:
        print("-" * 80)
        for show in segment:
            print("[{}]: {}".format(counter + 1, show))
            counter += 1

    while True:

        selection = input(
            "Please select the series for which to download songs: "
        ).strip()

        if selection == "":
            print("Invalid Selection")
            continue

        try:
            selection = selection.strip().split(",")
            selection = list(map(lambda x: shows[int(x) - 1], selection))
        except (ValueError, IndexError):
            print("Invalid Selection")
            continue

        with open(selection_file, "w") as f:
            f.write(json.dumps(selection))

        return selection


def handle_excludes(selected_songs: List[Dict[str, Any]], destination: str) \
        -> List[Dict[str, Any]]:
    """
    Allows the user to exclude certain songs from being downloaded
    Deletes any  files that may already exist for excluded songs
    :param selected_songs: All currently selected songs
    :param destination: The destination directory
    :return: The selected songs minus any excluded songs
    """
    excludes_file = os.path.join(destination, "excludes.json")

    use_old = False
    excludes = []

    if os.path.isfile(excludes_file):
        with open(excludes_file, "r") as f:
            old_selection = json.loads(f.read())

        while True:
            resp = input("Use previous exclusion? {} (y|n)"
                         .format(old_selection))
            if resp.lower() in ["y", "n"]:
                if resp.lower() == "y":
                    excludes = old_selection
                    use_old = True
                break

    if not use_old:
        for i, song in enumerate(selected_songs):
            print("[{}]: {} ({})"
                  .format(i + 1, song["filename"], song["song_info"][2]))

        while True:

            selection = input("Please select the songs to exclude: ").strip()

            if selection == "":
                excludes = []
                break
            try:
                selection = selection.strip().split(",")
                excludes = list(map(
                    lambda x: selected_songs[int(x) - 1]["filename"],
                    selection
                ))
            except (ValueError, IndexError):
                print("Invalid Selection")
                continue
            break

    with open(excludes_file, "w") as f:
        f.write(json.dumps(excludes))

    new_selection = []
    for song in selected_songs:
        if song["filename"] not in excludes:
            new_selection.append(song)
        else:
            for _file in ["webm_file", "mp3_file", "cover_file"]:
                if os.path.isfile(song[_file]):
                    print("{}Deleting {}{}".format(
                        Fore.MAGENTA, song[_file], Style.RESET_ALL))
                    os.remove(song[_file])

    return new_selection


def resolve_selected_songs(
        selected_series: List[str],
        data: Dict[str, List[Dict[str, str]]]
) -> List[Dict[str, str]]:
    """
    Retrieves a list of all songs that are included in a selection of series
    :param selected_series: The selection of series
    :param data: The song data from reddit
    :return: The list of selected songs
    """
    selected_songs = []
    for series in selected_series:
        selected_songs += data[series]
    return selected_songs


def download_webms(selected_songs: List[Dict[str, Any]]):
    """
    Downloads a selection of webm songs
    :param selected_songs: The selection of songs to download
    :return: None
    """
    while len(selected_songs) > 0:

        retry = []

        for song in selected_songs:
            webmfile = song["webm_file"]
            # command = ["wget", song["link"], "-O", webmfile]
            command = ["curl", "-o", webmfile, song["link"]]

            if os.path.exists(webmfile) and os.path.getsize(webmfile) > 1000:
                # Skip existing files
                continue

            time.sleep(1)
            code = execute_command(command)
            if code != 0:
                # We can circumvent 520 errors by requesting the videos in
                # firefox.
                # I have no clue why this is, I'm gussing this is due to
                # caching on the host side
                time.sleep(5)
                execute_command(["firefox", song["link"]])
                time.sleep(5)
                code = execute_command(command)
                if code != 0:
                    retry.append(song)

        selected_songs = retry

        if len(retry) > 0:
            print("Waiting 15s")
            time.sleep(15)


def convert_to_mp3(selected_songs: List[Dict[str, Any]]):
    """
    Converts a selection of webm songs to mp3
    :param selected_songs: The selection of songs to convert
    :return: None
    """
    for entry in selected_songs:

        webm_file = entry["webm_file"]
        mp3_file = entry["mp3_file"]
        command = [
            "ffmpeg",
            "-i", webm_file,
            "-vn",
            "-ab", "160k",
            "-ar", "44100",
            "-y", mp3_file
        ]

        if not os.path.exists(mp3_file):
            execute_command(command)


def download_covers(selected_songs: List[Dict[str, Any]]):
    """
    Downloads a cover image for a selection of songs
    :param selected_songs: The songs for which to download cover images
    :return: None
    """
    for song in selected_songs:
        command = ["wget", song["mal_cover"], "-O", song["cover_file"]]

        if not os.path.isfile(song["cover_file"]):
            execute_command(command)


def set_mp3_metadata(
        year: int,
        selected_songs: List[Dict[str, Any]]
):
    """
    Writes the correct MP3 metadata to the MP3 files
    :param year: The year for which to generate metadata
    :param selected_songs: The songs for which to set the metadata
    :return: None
    """
    for song in selected_songs:
        artist = song["song_info"][1]

        mp3 = EasyID3(song["mp3_file"])
        mp3["title"] = song["filename"]
        mp3["artist"] = artist
        mp3["album"] = song["song"]
        mp3["date"] = str(year)
        mp3["genre"] = "Anime"
        mp3.save()

        with open(song["cover_file"], "rb") as f:
            img = f.read()

        id3 = ID3(song["mp3_file"])
        id3.add(APIC(3, "image/jpeg", 3, "Front cover", img))
        id3.add(TPE2(encoding=3, text=artist))
        id3.save()


def generate_artist_album_structure(
        destination: str,
        selected_songs: List[Dict[str, Any]]
):
    """
    Generates a folder structure for OPs and EDs following the scheme:
        Artist
        - Album
          - Song
    Songs are copied from the mp3 directory.
    :param destination: The destination in which to store the structure
    :param selected_songs: The song data
    :return: None
    """
    structure_dir = os.path.join(destination, "structured")
    ops_dir = os.path.join(structure_dir, "OP")
    eds_dir = os.path.join(structure_dir, "ED")
    if os.path.isdir(structure_dir):
        shutil.rmtree(structure_dir)
    for directory in [structure_dir, ops_dir, eds_dir]:
        os.makedirs(directory)

    for song in selected_songs:
        mp3_file = song["mp3_file"]
        artist = song["song_info"][1]
        album = song["song"]
        title = song["filename"]
        oped = song["type"]

        if "OP" in oped:
            artist_dir = os.path.join(ops_dir, artist)
        elif "ED" in oped:
            artist_dir = os.path.join(eds_dir, artist)
        else:
            print("No OP/ED type for {}".format(title))
            continue

        album_dir = os.path.join(artist_dir, album)
        song_path = os.path.join(album_dir, title + ".mp3")

        if not os.path.isdir(artist_dir):
            os.makedirs(artist_dir)
        if not os.path.isdir(album_dir):
            os.makedirs(album_dir)
        if not os.path.isfile(song_path):
            shutil.copyfile(mp3_file, song_path)


if __name__ == "__main__":
    main()

bin/combine-cbz.py

deleted100644 → 0
+0 −68
Original line number Diff line number Diff line
#!/usr/bin/env python

"""LICENSE
Copyright 2015 Hermann Krumrey <hermann@krumreyh.com>

This file is part of toktokkie.

toktokkie is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

toktokkie is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with toktokkie.  If not, see <http://www.gnu.org/licenses/>.
LICENSE"""

import os
import shutil
import argparse
from subprocess import Popen
from puffotter.os import listdir


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("files", nargs="+")
    parser.add_argument("--delete-original", action="store_true")
    args = parser.parse_args()

    fill = len(str(len(args.files)))

    dest_dir = args.files[0] + "_combined_dir"
    dest_file = args.files[0] + "_combined.cbz"
    src_images = []

    if os.path.isdir(dest_dir):
        shutil.rmtree(dest_dir)
    os.makedirs(dest_dir)

    for i, cbz in enumerate(args.files):

        tempdir = "combine_temp"
        if os.path.isdir(tempdir):
            shutil.rmtree(tempdir)

        Popen(["unzip", cbz, "-d", tempdir]).wait()

        for name, path in listdir(tempdir):
            new_name = str(i).zfill(fill) + " - " + name
            new_path = os.path.join(dest_dir, new_name)
            src_images.append(new_path)
            os.rename(path, new_path)

        shutil.rmtree(tempdir)
        if args.delete_original:
            os.remove(cbz)

    Popen(["zip", "-j", dest_file] + src_images).wait()
    shutil.rmtree(dest_dir)


if __name__ == "__main__":
    main()

bin/manga-progress.py

deleted100644 → 0
+0 −121

File deleted.

Preview size limit exceeded, changes collapsed.

bin/musicorg

deleted100644 → 0
+0 −97

File deleted.

Preview size limit exceeded, changes collapsed.

bin/toktokkie-metadata-add

deleted100644 → 0
+0 −86

File deleted.

Preview size limit exceeded, changes collapsed.

Loading