kapsikkum-unmanic – Rev 1

Subversion Repositories:
Rev:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
    unmanic.workers.py

    Written by:               Josh.5 <jsunnex@gmail.com>
    Date:                     11 Aug 2021, (12:06 PM)

    Copyright:
           Copyright (C) Josh Sunnex - All Rights Reserved

           Permission is hereby granted, free of charge, to any person obtaining a copy
           of this software and associated documentation files (the "Software"), to deal
           in the Software without restriction, including without limitation the rights
           to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
           copies of the Software, and to permit persons to whom the Software is
           furnished to do so, subject to the following conditions:

           The above copyright notice and this permission notice shall be included in all
           copies or substantial portions of the Software.

           THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
           IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
           DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
           OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
           OR OTHER DEALINGS IN THE SOFTWARE.

"""
import hashlib
import os
import queue
import shutil
import subprocess
import threading
import time

import psutil

from unmanic.libs import common, unlogger
from unmanic.libs.plugins import PluginsHandler


def default_progress_parser(line_text):
    return {
        'percent': ''
    }


class WorkerCommandError(Exception):
    def __init___(self, command):
        Exception.__init__(self, "Worker command returned non 0 status. Command: {}".format(command))
        self.command = command


class Worker(threading.Thread):
    idle = True
    paused = False

    current_task = None
    worker_log = None
    start_time = None
    finish_time = None

    worker_subprocess = None
    worker_subprocess_pid = None
    worker_subprocess_percent = None
    worker_subprocess_elapsed = None

    worker_runners_info = {}

    def __init__(self, thread_id, name, worker_group_id, pending_queue, complete_queue, event):
        super(Worker, self).__init__(name=name)
        self.thread_id = thread_id
        self.name = name
        self.worker_group_id = worker_group_id
        self.event = event

        self.current_task = None
        self.pending_queue = pending_queue
        self.complete_queue = complete_queue

        # Create 'redundancy' flag. When this is set, the worker should die
        self.redundant_flag = threading.Event()
        self.redundant_flag.clear()

        # Create 'paused' flag. When this is set, the worker should be paused
        self.paused_flag = threading.Event()
        self.paused_flag.clear()

        # Create logger for this worker
        unmanic_logging = unlogger.UnmanicLogger.__call__()
        self.logger = unmanic_logging.get_logger(self.name)

    def _log(self, message, message2='', level="info"):
        message = common.format_message(message, message2)
        getattr(self.logger, level)(message)

    def run(self):
        self._log("Starting worker")
        while not self.redundant_flag.is_set():
            self.event.wait(1)  # Add delay for preventing loop maxing compute resources

            # If the Foreman has paused this worker, then don't do anything
            if self.paused_flag.is_set():
                self.paused = True
                # If the worker is paused, wait for 5 seconds before continuing the loop
                self.event.wait(5)
                continue
            self.paused = False

            # Set the worker as Idle - This will announce to the Foreman that it's ready for a task
            self.idle = True

            # Wait for task
            while not self.redundant_flag.is_set() and self.current_task:
                self.event.wait(.5)  # Add delay for preventing loop maxing compute resources

                try:
                    # Process the set task
                    self.__process_task_queue_item()
                except queue.Empty:
                    continue
                except Exception as e:
                    self._log("Exception in processing job with {}:".format(self.name), message2=str(e),
                              level="exception")

        self._log("Stopping worker")

    def set_task(self, new_task):
        """Sets the given task to the worker class"""
        # Ensure only one task can be set for a worker
        if self.current_task:
            return
        # Set the task
        self.current_task = new_task
        self.worker_log = []
        self.idle = False

    def get_status(self):
        """
        Fetch the status of this worker.

        TODO: Fetch subprocess pid

        :return:
        """
        status = {
            'id':              str(self.thread_id),
            'name':            self.name,
            'idle':            self.idle,
            'paused':          self.paused,
            'start_time':      None if not self.start_time else str(self.start_time),
            'current_task':    None,
            'current_file':    "",
            'worker_log_tail': [],
            'runners_info':    {},
            'subprocess':      {
                'pid':     self.ident,
                'percent': str(self.worker_subprocess_percent),
                'elapsed': str(self.worker_subprocess_elapsed),
            },
        }
        if self.current_task:
            # Fetch the current file
            try:
                status['current_task'] = self.current_task.get_task_id()
            except Exception as e:
                self._log("Exception in fetching the current task ID for worker {}:".format(self.name), message2=str(e),
                          level="exception")

            # Fetch the current file
            try:
                status['current_file'] = self.current_task.get_source_basename()
            except Exception as e:
                self._log("Exception in fetching the current file of worker {}:".format(self.name), message2=str(e),
                          level="exception")

            # Append the worker log tail
            try:
                if self.worker_log and len(self.worker_log) > 20:
                    status['worker_log_tail'] = self.worker_log[-19:]
                else:
                    status['worker_log_tail'] = self.worker_log
            except Exception as e:
                self._log("Exception in fetching log tail of worker: ", message2=str(e),
                          level="exception")

            # Append the runners info
            try:
                status['runners_info'] = self.worker_runners_info
            except Exception as e:
                self._log("Exception in runners info of worker {}:".format(self.name), message2=str(e),
                          level="exception")
        return status

    def __unset_current_task(self):
        self.current_task = None
        self.worker_runners_info = {}
        self.worker_log = []

    def __process_task_queue_item(self):
        """
        Processes the set task.

        :return:
        """
        # Mark worker as not idle now that it is processing a task
        self.idle = False

        # Set the progress to an empty string
        self.worker_subprocess_percent = ''
        self.worker_subprocess_elapsed = '0'

        # Log the start of the job
        self._log("Picked up job - {}".format(self.current_task.get_source_abspath()))

        # Mark as being "in progress"
        self.current_task.set_status('in_progress')

        # Start current task stats
        self.__set_start_task_stats()

        # Process the file. Will return true if success, otherwise false
        success = self.__exec_worker_runners_on_set_task()
        # Mark the task as either success or not
        self.current_task.set_success(success)

        # Mark task completion statistics
        self.__set_finish_task_stats()

        # Log completion of job
        self._log("Finished job - {}".format(self.current_task.get_source_abspath()))

        # Place the task into the completed queue
        self.complete_queue.put(self.current_task)

        # Reset the current file info for the next task
        self.__unset_current_task()

    def __set_start_task_stats(self):
        """Sets the initial stats for the start of a task"""
        # Set the start time to now
        self.start_time = time.time()

        # Clear the finish time
        self.finish_time = None

        # Format our starting statistics data
        self.current_task.task.processed_by_worker = self.name
        self.current_task.task.start_time = self.start_time
        self.current_task.task.finish_time = self.finish_time

    def __set_finish_task_stats(self):
        """Sets the final stats for the end of a task"""
        # Set the finish time to now
        self.finish_time = time.time()

        # Set the finish time in the statistics data
        self.current_task.task.finish_time = self.finish_time

    def __exec_worker_runners_on_set_task(self):
        """
        Executes the configured plugin runners against the set task.

        :return:
        """
        # Init plugins
        library_id = self.current_task.get_task_library_id()
        plugin_handler = PluginsHandler()
        plugin_modules = plugin_handler.get_enabled_plugin_modules_by_type('worker.process_item', library_id=library_id)

        # Create dictionary of runners info for the frontend
        self.worker_runners_info = {}
        for plugin_module in plugin_modules:
            self.worker_runners_info[plugin_module.get('plugin_id')] = {
                'plugin_id':   plugin_module.get('plugin_id'),
                'status':      'pending',
                "name":        plugin_module.get('name'),
                "author":      plugin_module.get('author'),
                "version":     plugin_module.get('version'),
                "icon":        plugin_module.get('icon'),
                "description": plugin_module.get('description'),
            }

        # Set the absolute path to the original file
        original_abspath = self.current_task.get_source_abspath()

        # Process item in loop.
        # First process the item for for each plugin that configures it, then run the default Unmanic configuration
        task_cache_path = self.current_task.get_cache_path()
        # Set the current input file to the original file path
        file_in = original_abspath
        # Mark the overall success of all runners. This will be set to False if any of the runners fails.
        overall_success = True
        # Set the current file out to nothing.
        # This will be configured by each runner.
        # If no runners are configured, then nothing needs to be done.
        current_file_out = original_abspath
        # The number of runners that have been run
        runner_count = 0
        # Flag if a task has run a command
        no_exec_command_run = True

        # Generate default data object for the runner functions
        data = {
            "worker_log":              self.worker_log,
            "library_id":              library_id,
            "exec_command":            [],
            "command_progress_parser": default_progress_parser,
            "file_in":                 file_in,
            "file_out":                None,
            "original_file_path":      original_abspath,
            "repeat":                  False,
        }

        for plugin_module in plugin_modules:
            # Increment the runners count (first runner will be set as #1)
            runner_count += 1

            if not overall_success:
                # If one of the Plugins fails, don't continue.
                # The Plugins could be co-dependant and the final file will not go anywhere if 'overall_success' is False
                break

            # Mark the status of the worker for the frontend
            self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'in_progress'
            self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False

            # Loop over runner. This way we can repeat the function with the same data if requested by the repeat flag
            runner_pass_count = 0
            while not self.redundant_flag.is_set():
                runner_pass_count += 1

                # Fetch file out details
                # This creates a temp file labeled "WORKING" that will be moved to the cache_path on completion
                split_file_out = os.path.splitext(task_cache_path)
                split_file_in = os.path.splitext(file_in)
                file_out = "{}-{}-{}-{}{}".format(split_file_out[0], "WORKING", runner_count, runner_pass_count,
                                                  split_file_in[1])

                # Reset data object for this runner functions
                data['library_id'] = library_id
                data['exec_command'] = []
                data['command_progress_parser'] = default_progress_parser
                data['file_in'] = file_in
                data['file_out'] = file_out
                data['original_file_path'] = original_abspath
                data['repeat'] = False

                self.event.wait(.2)  # Add delay for preventing loop maxing compute resources
                self.worker_log.append("\n\nRUNNER: \n{} [Pass #{}]\n\n".format(plugin_module.get('name'), runner_pass_count))
                self.worker_log.append("\nExecuting plugin runner... Please wait\n")

                # Run plugin to update data
                if not plugin_handler.exec_plugin_runner(data, plugin_module.get('plugin_id'), 'worker.process_item'):
                    # Skip this plugin module's loop
                    self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'complete'
                    self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False
                    # Set overall success status to failed
                    overall_success = False
                    # Append long entry to say the worker was terminated
                    self.worker_log.append("\n\nPLUGIN FAILED!")
                    self.worker_log.append("\nFailed to execute Plugin '{}'".format(plugin_module.get('name')))
                    self.worker_log.append("\nCheck Unmanic logs for more information")
                    break

                # Log the in and out files returned by the plugin runner for debugging
                self._log("Worker process '{}' (in)".format(plugin_module.get('plugin_id')), data.get("file_in"),
                          level='debug')
                self._log("Worker process '{}' (out)".format(plugin_module.get('plugin_id')), data.get("file_out"),
                          level='debug')

                # Only run the conversion process if "exec_command" is not empty
                if data.get("exec_command"):
                    self.worker_log.append("\nPlugin runner requested for a command to be executed by Unmanic")

                    # Exec command as subprocess
                    success = self.__exec_command_subprocess(data)
                    no_exec_command_run = False

                    if self.redundant_flag.is_set():
                        # This worker has been marked as redundant. It is being terminated.
                        self._log("Worker has been terminated before a command was completed", level="warning")
                        # Mark runner as failed
                        self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False
                        # Set overall success status to failed
                        overall_success = False
                        # Append long entry to say the worker was terminated
                        self.worker_log.append("\n\nWORKER TERMINATED!")
                        # Don't continue
                        break

                    # Run command. Check if command exited successfully.
                    if success:
                        # If file conversion was successful
                        self._log("Successfully ran worker process '{}' on file '{}'".format(plugin_module.get('plugin_id'),
                                                                                             data.get("file_in")))
                        # Ensure the 'file_out' that was specified by the plugin to be created was actually created.
                        if os.path.exists(data.get('file_out')):
                            # The outfile exists...
                            # In order to clean up as we go and avoid unnecessary RAM/disk use in the cache directory,
                            #   we want to removed the 'file_in' file.
                            # We want to ensure that we do not accidentally remove any original files here.
                            # To avoid this, run x2 tests.
                            # First, check current 'file_in' is not the original file.
                            if os.path.abspath(data.get("file_in")) != os.path.abspath(original_abspath):
                                # Second, check that the 'file_in' is in cache directory.
                                if "unmanic_file_conversion" in os.path.abspath(data.get("file_in")):
                                    # Remove this file
                                    os.remove(os.path.abspath(data.get("file_in")))

                            # Set the new 'file_in' as the previous runner's 'file_out' for the next loop
                            file_in = data.get("file_out")
                    else:
                        # If file conversion was successful
                        self._log(
                            "Error while running worker process '{}' on file '{}'".format(
                                plugin_module.get('plugin_id'),
                                original_abspath
                            ),
                            level="error")
                        self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False
                        overall_success = False

                        # Ensure the new 'file_in' is set to the previous runner's 'file_in' for the next loop
                        file_in = data.get("file_in")
                else:
                    # Ensure the new 'file_in' is set to the previous runner's 'file_in' for the next loop
                    file_in = data.get("file_in")
                    # Log that this plugin did not request to execute anything
                    self.worker_log.append("\nRunner did not request for Unmanic to execute a command")
                    self._log(
                        "Worker process '{}' did not request to execute a command.".format(plugin_module.get('plugin_id')),
                        level='debug')

                if os.path.exists(data.get('file_out')):
                    # Set the current file out to the most recently completed cache file
                    # If the file out does not exist, it is likely never used by the plugin.
                    current_file_out = data.get('file_out')
                else:
                    # Ensure the current_file_out is set the currently set 'file_in'
                    current_file_out = data.get('file_in')

                if data.get("repeat"):
                    # The returned data contained the 'repeat'' flag.
                    # Run another pass against this same plugin
                    continue
                break

            self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = True
            self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'complete'

        # Log if no command was run by any Plugins
        if no_exec_command_run:
            # If no jobs were carried out on this task
            self._log("No Plugin requested for Unmanic to run commands for this file '{}'".format(original_abspath), level='warning')
            self.worker_log.append("\n\nNo Plugin requested for Unmanic to run commands for this file '{}'".format(original_abspath))

        # Save the completed command log
        self.current_task.save_command_log(self.worker_log)

        # If all plugins that were executed completed successfully, then this was overall a successful task.
        # At this point we need to move the final out file to the original task cache path so the postprocessor can collect it.
        if overall_success:
            # If jobs carried out on this task were all successful, we will get here
            self._log("Successfully completed Worker processing on file '{}'".format(original_abspath))

            # Attempt to move the final output file to the final cache file path for the postprocessor
            try:
                # Set the new file out as the extension may have changed
                split_file_name = os.path.splitext(current_file_out)
                file_extension = split_file_name[1].lstrip('.')
                cache_directory = os.path.dirname(os.path.abspath(task_cache_path))
                self.current_task.set_cache_path(cache_directory, file_extension)
                # Read the updated cache path
                task_cache_path = self.current_task.get_cache_path()

                # Move file to original cache path
                self._log("Moving final cache file from '{}' to '{}'".format(current_file_out, task_cache_path))
                current_file_out = os.path.abspath(current_file_out)

                # There is a really odd intermittent bug with the shutil module that is causing it to
                #   sometimes report that the file does not exist.
                # This section adds a small pause and logs the error if that is the case.
                # I have not yet figured out a solution as this is difficult to reproduce.
                if not os.path.exists(current_file_out):
                    self._log("Error - current_file_out path does not exist! '{}'".format(file_in), level="error")
                    self.event.wait(1)

                # Ensure the cache directory exists
                if not os.path.exists(cache_directory):
                    os.makedirs(cache_directory)

                # Check that the current file out is not the original source file
                if os.path.abspath(current_file_out) == os.path.abspath(original_abspath):
                    # The current file out is not a cache file, the file must have never been modified.
                    # This can happen if all Plugins failed to run, or a Plugin specifically reset the out
                    #   file to the original source in order to preserve it.
                    # In this circumstance, we want to create a cache copy and let the process continue.
                    self._log("Final cache file is the same path as the original source. Creating cache copy.", level='debug')
                    shutil.copyfile(current_file_out, task_cache_path)
                else:
                    # Use shutil module to move the file to the final task cache location
                    shutil.move(current_file_out, task_cache_path)
            except Exception as e:
                self._log("Exception in final move operation of file {} to {}:".format(current_file_out, task_cache_path),
                          message2=str(e), level="exception")
                return False

            # Return True
            return True

        # If the overall result of the jobs carried out on this task were not successful, we will get here.
        # Log the failure and return False
        self._log("Failed to process task for file '{}'".format(original_abspath), level='warning')
        return False

    def __log_proc_terminated(self, proc):
        self._log("Process {} terminated with exit code {}".format(proc, proc.returncode))

    def __terminate_proc_tree(self, proc: psutil.Process):
        """
        Terminate the process tree (including grandchildren).
        Processes that fail to stop with SIGTERM will be sent a SIGKILL.

        :param proc:
        :return:
        """

        children = proc.children(recursive=True)
        children.append(proc)
        for p in children:
            try:
                p.terminate()
            except psutil.NoSuchProcess:
                pass
        gone, alive = psutil.wait_procs(children, timeout=3, callback=self.__log_proc_terminated)
        for p in alive:
            try:
                p.kill()
            except psutil.NoSuchProcess:
                pass
        psutil.wait_procs(alive, timeout=3, callback=self.__log_proc_terminated)

    def __exec_command_subprocess(self, data):
        """
        Executes a command as a shell subprocess.
        Uses the given parser to record progress data from the shell STDOUT.

        :param data:
        :return:
        """
        # Fetch command to execute.
        exec_command = data.get("exec_command", [])

        # Fetch the command progress parser function
        command_progress_parser = data.get("command_progress_parser", default_progress_parser)

        # Log the command for debugging
        command_string = exec_command
        if isinstance(exec_command, list):
            command_string = ' '.join(exec_command)
        self._log("Executing: {}".format(command_string), level='debug')

        # Append start of command to worker subprocess stdout
        self.worker_log += [
            '\n\n',
            'COMMAND:\n',
            command_string,
            '\n\n',
            'LOG:\n',
        ]

        # Create output path if not exists
        common.ensure_dir(data.get("file_out"))

        # Convert file
        try:
            proc_pause_time = 0
            proc_start_time = time.time()
            # Execute command
            if isinstance(exec_command, list):
                sub_proc = subprocess.Popen(exec_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                                            universal_newlines=True, errors='replace')
            elif isinstance(exec_command, str):
                sub_proc = subprocess.Popen(exec_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                                            universal_newlines=True, errors='replace', shell=True)
            else:
                raise Exception(
                    "Plugin's returned 'exec_command' object must be either a list or a string. Received type {}.".format(
                        type(exec_command)))

            # Fetch process using psutil for control (sending SIGSTOP on windows will not work)
            proc = psutil.Process(pid=sub_proc.pid)

            # Set process priority on posix systems
            # TODO: Test how this will work on Windows
            if os.name == "posix":
                try:
                    parent_proc = psutil.Process(os.getpid())
                    parent_proc_nice = parent_proc.nice()
                    proc.nice(parent_proc_nice + 1)
                except Exception as e:
                    self._log("Unable to lower priority of subprocess. Subprocess should continue to run at normal priority",
                              str(e), level='warning')

            # Record PID and PROC
            self.worker_subprocess = sub_proc
            self.worker_subprocess_pid = sub_proc.pid

            # Poll process for new output until finished
            while not self.redundant_flag.is_set():
                line_text = sub_proc.stdout.readline()

                # Fetch command stdout and append it to the current task object (to be saved during post process)
                self.worker_log.append(line_text)

                # Check if the command has completed. If it has, exit the loop
                if line_text == '' and sub_proc.poll() is not None:
                    self._log("Subprocess task completed!", level='debug')
                    break

                # Parse the progress
                try:
                    progress_dict = command_progress_parser(line_text)
                    self.worker_subprocess_percent = progress_dict.get('percent', '0')
                    self.worker_subprocess_elapsed = str(time.time() - proc_start_time - proc_pause_time)
                except Exception as e:
                    # Only need to show any sort of exception if we have debugging enabled.
                    # So we should log it as a debug rather than an exception.
                    self._log("Exception while parsing command progress", str(e), level='debug')

                # Stop the process if the worker is paused
                # Then resume it when the worker is resumed
                if self.paused_flag.is_set():
                    self._log("Pausing PID {}".format(sub_proc.pid), level='debug')
                    proc.suspend()
                    self.paused = True
                    start_pause = time.time()
                    while not self.redundant_flag.is_set():
                        self.event.wait(1)
                        if not self.paused_flag.is_set():
                            self._log("Resuming PID {}".format(sub_proc.pid), level='debug')
                            proc.resume()
                            self.paused = False
                            # Elapsed time is used for calculating etc.
                            # We account for this by counting the time we are paused also.
                            # This is then subtracted from the elapsed time in the calculation above.
                            proc_pause_time = int(proc_pause_time + time.time() - start_pause)
                            break
                        continue

            # Get the final output and the exit status
            if not self.redundant_flag.is_set():
                communicate = sub_proc.communicate()[0]

            # If the process is still running, kill it
            if proc.is_running():
                self._log("Found worker subprocess is still running. Killing it.", level='warning')
                self.__terminate_proc_tree(proc)

            if sub_proc.returncode == 0:
                return True
            else:
                self._log("Command run against '{}' exited with non-zero status. "
                          "Download command dump from history for more information.".format(data.get("file_in")),
                          message2=str(exec_command), level="error")
                return False

        except Exception as e:
            self._log("Error while executing the command against file{}.".format(data.get("file_in")), message2=str(e),
                      level="error")

        return False