kapsikkum-unmanic – Rev 1

Subversion Repositories:
Rev:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
    unmanic.common.py
 
    Written by:               Josh.5 <jsunnex@gmail.com>
    Date:                     06 Dec 2018, (7:21 AM)
 
    Copyright:
           Copyright (C) Josh Sunnex - All Rights Reserved
 
           Permission is hereby granted, free of charge, to any person obtaining a copy
           of this software and associated documentation files (the "Software"), to deal
           in the Software without restriction, including without limitation the rights
           to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
           copies of the Software, and to permit persons to whom the Software is
           furnished to do so, subject to the following conditions:
  
           The above copyright notice and this permission notice shall be included in all
           copies or substantial portions of the Software.
  
           THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
           EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
           IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
           DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
           OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
           OR OTHER DEALINGS IN THE SOFTWARE.

"""
import copy
import datetime
import hashlib
import os
import random
import string
import shutil


def get_home_dir():
    # Attempt to get the HOME_DIR environment variable
    home_dir = os.environ.get('HOME_DIR')
    # If HOME_DIR is unset, empty, or specifically set to use the home directory
    if not home_dir:
        # Expand the tilde to the user's home directory
        home_dir = os.path.expanduser("~")
    else:
        # For any value of HOME_DIR, ensure tilde and relative paths are correctly handled
        # os.path.expanduser will handle tilde but won't affect absolute paths
        # os.path.abspath will convert a relative path to an absolute path
        home_dir = os.path.abspath(os.path.expanduser(home_dir))
    return home_dir


def get_default_root_path():
    root = os.path.join(os.sep)
    if os.name == "nt":
        root = os.path.join('c:', os.sep)
    return root


def get_default_library_path():
    library_path = os.path.join(get_default_root_path(), 'library')
    # Windows set the default library directory into %USERPROFILE%\Documents
    if os.name == "nt":
        library_path = os.path.join(os.path.expandvars(r'%USERPROFILE%'), 'Documents')
    return library_path


def get_default_cache_path():
    cache_path = os.path.join(get_default_root_path(), 'tmp', 'unmanic')
    # Windows set the default temp directory into %LOCALAPPDATA%\Temp\Unmanic
    if os.name == "nt":
        cache_path = os.path.join(os.path.expandvars(r'%LOCALAPPDATA%\Temp'), 'Unmanic')
    return cache_path


def format_message(message, message2=''):
    message = str(message)
    if message2:
        # Message2 can support other objects:
        if isinstance(message2, str):
            message = "%s - %s" % (message, str(message2))
        elif isinstance(message2, dict) or isinstance(message2, list):
            import pprint
            message2 = pprint.pformat(message2, indent=1)
            message = "%s \n%s" % (message, str(message2))
        else:
            message = "%s - %s" % (message, str(message2))
    message = "[FORMATTED] - %s" % message
    return message


def make_timestamp_human_readable(ts):
    """
    Accept a unix timestamp, return a human readable timedelta string.

    :param ts: a datetime, timedelta, or timestamp (integer / float) object
    :returns: Human readable timedelta string (Str)
    """
    units = ("year", "day", "hour", "minute", "second", "millisecond", "microsecond")
    precision = 1
    past_tense = "{} ago"
    future_tense = "in {}"

    # Get datetime from ts string
    dt = datetime.datetime.fromtimestamp(ts)
    delta = datetime.datetime.now(tz=dt.tzinfo) - dt

    # Determine if this is past or future tense
    the_tense = future_tense if delta < datetime.timedelta(0) else past_tense

    # Create a dictionary of units
    delta = abs(delta)
    d = {
        "year":        int(delta.days / 365),
        "day":         int(delta.days % 365),
        "hour":        int(delta.seconds / 3600),
        "minute":      int(delta.seconds / 60) % 60,
        "second":      delta.seconds % 60,
        "millisecond": delta.microseconds / 1000,
        "microsecond": delta.microseconds % 1000,
    }

    human_readable_list = []
    count = 0

    # Start building up the output in the human readable list.
    for unit in units:
        if count >= precision:
            break  # met precision
        if d[unit] == 0:
            continue  # skip 0's
        else:
            s = "" if d[unit] == 1 else "s"  # handle plurals
            human_readable_list.append("{} {}{}".format(d[unit], unit, s))
        count += 1

    return the_tense.format(", ".join(human_readable_list))


def ensure_dir(file_path):
    directory = os.path.dirname(file_path)
    if not os.path.exists(directory):
        os.makedirs(directory)


def time_string_to_seconds(time_string):
    pt = datetime.datetime.strptime(time_string, '%H:%M:%S.%f')
    return pt.second + pt.minute * 60 + pt.hour * 3600


def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 153
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
            while f.read(1) != b'\n':
                f.seek(-2, os.SEEK_CUR)
        except IOError:
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines
        avg_line_length *= 1.3


def touch(fname, mode=0o666, dir_fd=None, **kwargs):
    """Touch a file. If it does not exist, create it."""
    flags = os.O_CREAT | os.O_APPEND
    with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f:
        os.utime(f.fileno() if os.utime in os.supports_fd else fname,
                 dir_fd=None if os.supports_fd else dir_fd, **kwargs)


def clean_files_in_cache_dir(cache_directory):
    """This will completely wipe all contents from a directory"""
    if os.path.exists(cache_directory):
        for root, subFolders, files in os.walk(cache_directory):
            root_bn = os.path.basename(root)
            if root_bn.startswith("unmanic_file_conversion-"):
                try:
                    print("Clearing cache path - {}".format(root))
                    shutil.rmtree(root)
                except Exception as e:
                    print("Exception while clearing cache path - {}".format(str(e)))
            elif root_bn.startswith("unmanic_remote_pending_library-"):
                try:
                    print("Clearing remote library cache path - {}".format(root))
                    shutil.rmtree(root)
                except Exception as e:
                    print("Exception while clearing remote library cache path - {}".format(str(e)))


def random_string(string_length=5):
    """Generate a random string of fixed length"""
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(string_length))


def json_dump_to_file(json_data, out_file, check=True, rollback_on_fail=True):
    """Dump json data to a file. Optionally checks that the output json data is valid"""
    import json
    import time
    import tempfile
    import shutil

    result = {
        'errors':  [],
        'success': False
    }

    # If check param is flagged and there already exists a out file, create a temporary backup

    if rollback_on_fail and os.path.exists(out_file):
        temp_dir = tempfile.gettempdir()
        temp_path = os.path.join(temp_dir, 'json_dump_to_file_backup-{}'.format(time.time()))
        try:
            shutil.copy2(out_file, temp_path)
            result['temp_path'] = temp_path
        except Exception as e:
            result['success'] = False
            result['errors'].append("Failed to create temporary file - {}".format(str(e)))

    # Write data to out_file
    try:
        with open(out_file, 'w') as outfile:
            json.dump(json_data, outfile, sort_keys=True, indent=4)
        result['success'] = True
    except Exception as e:
        result['success'] = False
        result['errors'].append("Exception in writing to file: {}".format(str(e)))

    # If check param is flagged, ensure json data exists in the output file
    if check:
        try:
            with open(out_file) as infile:
                data = json.load(infile)
        except Exception as e:
            result['success'] = False
            result['errors'].append("JSON file invalid - {}".format(e))

    # If data save was unsuccessful and the rollback_on_fail param is flagged
    #   and there is a temp file set, roll back to old file
    if not result.get('success') and result.get('temp_path') and rollback_on_fail:
        try:
            os.remove(out_file)
            shutil.copy2(result.get('temp_path'), out_file)
            os.remove(result.get('temp_path'))
        except Exception as e:
            result['success'] = False
            result['errors'].append("Exception while restoring original file file: {}".format(str(e)))

    return result


def extract_video_codecs_from_file_properties(file_properties: dict):
    """
    Read a dictionary of file properties
    Extract a list of video codecs from the video streams

    :param file_properties:
    :return:
    """
    codecs = []
    for stream in file_properties['streams']:
        if stream['codec_type'] == 'video':
            codecs.append(stream['codec_name'])
    return codecs


def get_file_checksum(path):
    """
    Read a checksum of a file.

    Rather than opening the whole file in memory, open it in chunks.
    This is slightly slower, but allows working on systems with limited memory.

    :param path:
    :return:
    """
    file_hash = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b''):
            file_hash.update(chunk)
    return copy.copy(file_hash.hexdigest())