kapsikkum-unmanic – Rev 1

Subversion Repositories:
Rev:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
    unmanic.executor.py

    Written by:               Josh.5 <jsunnex@gmail.com>
    Date:                     05 Mar 2021, (6:55 PM)

    Copyright:
           Copyright (C) Josh Sunnex - All Rights Reserved

           Permission is hereby granted, free of charge, to any person obtaining a copy
           of this software and associated documentation files (the "Software"), to deal
           in the Software without restriction, including without limitation the rights
           to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
           copies of the Software, and to permit persons to whom the Software is
           furnished to do so, subject to the following conditions:

           The above copyright notice and this permission notice shall be included in all
           copies or substantial portions of the Software.

           THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
           IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
           DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
           OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
           OR OTHER DEALINGS IN THE SOFTWARE.

"""
import copy
import gc
import os
import importlib.util
import importlib
import sys

from . import plugin_types
from unmanic.libs import unlogger, common
from ..unmodels import LibraryPluginFlow


class PluginExecutor(object):

    def __init__(self, plugins_directory=None):
        # Set plugins directory
        if not plugins_directory:
            home_directory = common.get_home_dir()
            plugins_directory = os.path.join(home_directory, '.unmanic', 'plugins')
        self.plugins_directory = plugins_directory
        # List plugin types in order that they are run
        # Listing them in order helps for the frontend
        self.plugin_types = [
            {
                'id':       'frontend.panel',
                'has_flow': False,
            },
            {
                'id':       'frontend.plugin_api',
                'has_flow': False,
            },
            {
                'id':       'library_management.file_test',
                'has_flow': True,
            },
            {
                'id':       'worker.process_item',
                'has_flow': True,
            },
            {
                'id':       'postprocessor.file_move',
                'has_flow': True,
            },
            {
                'id':       'postprocessor.task_result',
                'has_flow': True,
            },
        ]
        unmanic_logging = unlogger.UnmanicLogger.__call__()
        self.logger = unmanic_logging.get_logger(__class__.__name__)

    def _log(self, message, message2='', level="info"):
        message = common.format_message(message, message2)
        getattr(self.logger, level)(message)

    def __get_plugin_directory(self, plugin_id):
        """
        Returns the path of the plugin by it's plugin ID

        :param plugin_id:
        :return:
        """
        return os.path.join(self.plugins_directory, plugin_id)

    @staticmethod
    def __include_plugin_site_packages(path):
        plugin_site_packages_dir = os.path.join(path, 'site-packages')
        if os.path.exists(plugin_site_packages_dir) and plugin_site_packages_dir not in sys.path:
            sys.path.append(plugin_site_packages_dir)

    @staticmethod
    def __include_plugin_directory(path):
        if os.path.exists(path) and path not in sys.path:
            sys.path.append(path)

    def __load_plugin_module(self, plugin_id, path):
        """
        Loads and returns the python module from a given plugin path.
            All plugins should have a file called "plugin.py".

        :param plugin_id:
        :param path:
        :return:
        """
        # Set the module name
        module_name = '{}.plugin'.format(plugin_id)

        # Get main module file
        plugin_module_path = os.path.join(path, 'plugin.py')

        # Ensure the Unmanic plugins directory to sys path prior to loading it
        self.__include_plugin_directory(self.plugins_directory)

        # Add site-packages directory to sys path prior to loading the module
        self.__include_plugin_site_packages(path)

        # Don't re-import the module if it is already loaded.
        if module_name in sys.modules:
            return sys.modules[module_name]

        try:
            # First import the module namespace
            # Without this we are unable to reload the plugin in reload_plugin_module()
            importlib.import_module(plugin_id)

            # Import the module for this plugin
            module_spec = importlib.util.spec_from_file_location(module_name, plugin_module_path)
            plugin_import = importlib.util.module_from_spec(module_spec)

            # Adding the module to sys.modules is optional but it gives us visibility if we need it elsewhere.
            sys.modules[module_name] = plugin_import

            module_spec.loader.exec_module(plugin_import)

            return plugin_import
        except Exception as e:
            self._log("Exception encountered while importing module '{}'".format(plugin_id), message2=str(e),
                      level="exception")
            return None

    def reload_plugin_module(self, plugin_id):
        """
        Reload a plugin module

        :param plugin_id:
        :return:
        """
        # Set the module name
        module_name = '{}.plugin'.format(plugin_id)
        # self._log("Reloading module '{}'".format(module_name), level="debug")

        if module_name in sys.modules:
            # Get all submodules
            module_names = [module_name]
            for m in sys.modules:
                if plugin_id in m and m not in [plugin_id, module_name]:
                    # Add to removal list
                    module_names.append(m)
            # Reload all imported modules or remove them if that fails
            for mn in module_names:
                try:
                    importlib.reload(sys.modules[mn])
                except ImportError:
                    # The module's parent was probably not imported.
                    # Delete it from sys.modules and carry on.
                    # This will force it to be reloaded again
                    self._log("Exception encountered while trying to reload module '{}'".format(module_name),
                              level="exception")
                    del sys.modules[module_name]

    @staticmethod
    def unload_plugin_module(plugin_id):
        """
        Remove plugin module from sys.modules

        This does not really clean up memory. Things are still getting really messy behind the scenes.
        This just makes it remove the module so that it will need to be re-imported above.

        :param plugin_id:
        :return:
        """
        # Set the module name
        module_name = '{}.plugin'.format(plugin_id)

        if module_name in sys.modules:
            del sys.modules[module_name]

    @staticmethod
    def get_plugin_type_meta(plugin_type):
        return plugin_types.grab_module(plugin_type)

    def get_all_plugin_types(self):
        return self.plugin_types

    def get_all_plugin_types_in_plugin(self, plugin_id):
        return_plugin_types = []

        # Get the path for this plugin
        plugin_path = self.__get_plugin_directory(plugin_id)

        # Load this plugin module
        plugin_module = self.__load_plugin_module(plugin_id, plugin_path)

        for plugin_type in self.get_all_plugin_types():
            # Get the called runner function for the given plugin type
            plugin_type_meta = self.get_plugin_type_meta(plugin_type.get('id'))
            plugin_runner = plugin_type_meta.plugin_runner()

            # Check if this module contains the given plugin type runner function
            if hasattr(plugin_module, plugin_runner):
                # If it does, add it to the plugin_modules list
                return_plugin_types.append(plugin_type.get('id'))

        return return_plugin_types

    def execute_plugin_runner(self, data, plugin_id, plugin_type):
        """
        Given a data, a plugin ID, and a plugin type
        Load that plugin module and execute the runner
        Return the modified data

        :param data:
        :param plugin_id:
        :param plugin_type:
        :return:
        """
        # Get the path for this plugin
        plugin_path = self.__get_plugin_directory(plugin_id)

        # Load this plugin module
        plugin_module = self.__load_plugin_module(plugin_id, plugin_path)

        # Get the called runner function for the given plugin type
        plugin_type_meta = self.get_plugin_type_meta(plugin_type)
        plugin_runner = plugin_type_meta.plugin_runner()

        # Check if this module contains the given plugin type runner
        run_successfully = False
        if hasattr(plugin_module, plugin_runner):

            # If it does, get the runner function
            runner = getattr(plugin_module, plugin_runner)

            try:
                runner(data)
                run_successfully = True
            except Exception:
                self._log("Exception while carrying out '{}' plugin runner '{}'".format(plugin_type, plugin_id),
                          level="exception")

            del runner
            # gc.collect()

        return run_successfully

    def build_plugin_data_from_plugin_list_filtered_by_plugin_type(self, plugins_list, plugin_type):
        """
        Given a list of plugins and a plugin type,
        Return a filtered list of dictionaries containing:
            - the plugin module
            - the runner function to execute
            - the metadata for that plugin

        :param plugins_list:
        :param plugin_type:
        :return:
        """
        plugin_modules = []

        # Ensure called runner type exists
        if not plugin_type in plugin_types.get_all_plugin_types():
            self._log("Provided plugin type does not exist!", plugin_type, level="error")
            return plugin_modules

        # Get the called runner function for the given plugin type
        plugin_type_meta = self.get_plugin_type_meta(plugin_type)
        plugin_runner = plugin_type_meta.plugin_runner()

        for plugin_data in plugins_list:
            # Get plugin ID
            plugin_id = plugin_data.get('plugin_id')

            # Get plugin metadata
            plugin_name = plugin_data.get('name')
            plugin_author = plugin_data.get('author')
            plugin_version = plugin_data.get('version')
            plugin_icon = plugin_data.get('icon')
            plugin_description = plugin_data.get('description')

            # Get the path for this plugin
            plugin_path = self.__get_plugin_directory(plugin_id)

            # Load this plugin module
            plugin_module = self.__load_plugin_module(plugin_id, plugin_path)

            # Check if this module contains the given plugin type runner function
            if hasattr(plugin_module, plugin_runner):
                # If it does, add it to the plugin_modules list
                plugin_runner_data = {
                    "plugin_id":     plugin_id,
                    "name":          plugin_name,
                    "author":        plugin_author,
                    "version":       plugin_version,
                    "icon":          plugin_icon,
                    "description":   plugin_description,
                    "plugin_module": plugin_module,
                    "plugin_path":   plugin_path,
                }
                plugin_modules.append(plugin_runner_data)

        return plugin_modules

    def get_plugin_data_by_type(self, enabled_plugins, plugin_type):
        """
        Given a list of enabled plugins and a plugin type
        Returns a list of dictionaries containing plugin data including
            - the plugin module
            - the runner function to execute
            - the metadata for that plugin

        :param enabled_plugins:
        :param plugin_type:
        :return:
        """
        # Filter out only plugins that have runners of this type
        plugin_data = self.build_plugin_data_from_plugin_list_filtered_by_plugin_type(enabled_plugins, plugin_type)

        # Return runners
        return plugin_data

    def get_plugin_settings(self, plugin_id, library_id=None):
        """
        Returns a dictionary of a given plugin's settings

        :param plugin_id:
        :param library_id:
        :return:
        """
        # Get the path for this plugin
        plugin_path = self.__get_plugin_directory(plugin_id)

        # Load this plugin module
        plugin_module = self.__load_plugin_module(plugin_id, plugin_path)

        if not hasattr(plugin_module, 'Settings'):
            # This plugin does not have a settings class
            return {}, {}

        try:
            # Settings plugin_settings
            plugin_settings = plugin_module.Settings(library_id=library_id)

            all_plugin_settings = copy.deepcopy(plugin_settings.get_setting())
            plugin_form_settings = copy.deepcopy(plugin_settings.get_form_settings())
        except Exception as e:
            self._log("Exception while fetching settings for plugin '{}'".format(plugin_id), str(e), level='exception')
            all_plugin_settings = {}
            plugin_form_settings = {}

        return all_plugin_settings, plugin_form_settings

    def save_plugin_settings(self, plugin_id, settings, library_id=None):
        """
        Saves a collection of a given plugin's settings.
        Returns a boolean result for the overall success
        of saving all values.

        :param plugin_id:
        :param settings:
        :param library_id:
        :return:
        """
        # Get the path for this plugin
        plugin_path = self.__get_plugin_directory(plugin_id)

        # Load this plugin module
        plugin_module = self.__load_plugin_module(plugin_id, plugin_path)

        try:
            plugin_settings = plugin_module.Settings(library_id=library_id)

            save_result = True
            for key in settings:
                value = settings.get(key)
                if not plugin_settings.set_setting(key, value):
                    save_result = False

            del plugin_settings, plugin_module

            if save_result:
                self.reload_plugin_module(plugin_id)

            return save_result
        except Exception as e:
            self._log("Exception while saving settings for plugin '{}'".format(plugin_id), str(e), level='exception')
            self._log(str(e), level='exception')
            return False

    def reset_plugin_settings(self, plugin_id, library_id=None):
        """
        Reset a plugin settings by removing the config file

        :param plugin_id:
        :param library_id:
        :return:
        """
        # Get the path for this plugin
        plugin_path = self.__get_plugin_directory(plugin_id)

        # Load this plugin module
        plugin_module = self.__load_plugin_module(plugin_id, plugin_path)

        try:
            plugin_settings = plugin_module.Settings(library_id=library_id)
            return plugin_settings.reset_settings_to_defaults()
        except Exception as e:
            self._log("Exception while resetting settings for plugin '{}'".format(plugin_id), str(e), level='exception')
            return False

    def get_plugin_changelog(self, plugin_id):
        """
        Returns a list of lines from the plugin's changelog

        :param plugin_id:
        :return:
        """
        changelog = []
        # Get the path for this plugin
        plugin_path = self.__get_plugin_directory(plugin_id)
        plugin_changelog = os.path.join(plugin_path, 'changelog.md')
        if os.path.exists(plugin_changelog):
            with open(plugin_changelog, 'r') as f:
                changelog = f.readlines()

        return changelog

    def get_plugin_long_description(self, plugin_id):
        """
        Returns a list of lines from the plugin's additional description file

        :param plugin_id:
        :return:
        """
        description = []
        # Get the path for this plugin
        plugin_path = self.__get_plugin_directory(plugin_id)
        plugin_description = os.path.join(plugin_path, 'description.md')
        if os.path.exists(plugin_description):
            with open(plugin_description, 'r') as f:
                description = f.readlines()

        return description

    def test_plugin_runner(self, plugin_id, plugin_type, test_data=None, test_data_modifiers=None):
        if test_data is None:
            test_data = {}
        if test_data_modifiers is None:
            test_data_modifiers = {}
        try:
            # Get the path for this plugin
            plugin_path = self.__get_plugin_directory(plugin_id)

            # Load this plugin module
            plugin_module = self.__load_plugin_module(plugin_id, plugin_path)

            # Get the called runner function for the given plugin type
            plugin_type_meta = self.get_plugin_type_meta(plugin_type)
            if not test_data:
                test_data = plugin_type_meta.get_test_data()
                test_data = plugin_type_meta.modify_test_data(test_data, test_data_modifiers)
            errors = plugin_type_meta.run_data_schema_tests(plugin_id, plugin_module, test_data=test_data)
        except Exception as e:
            self._log("Exception while testing plugin runner for plugin '{}'".format(plugin_id), message2=str(e),
                      level="exception")
            errors = ["Exception encountered while testing runner - {}".format(str(e))]

        return errors

    def test_plugin_settings(self, plugin_id):
        errors = []

        # Get the called runner function for the given plugin type
        plugin_settings = {}
        try:
            plugin_settings, plugin_settings_meta = self.get_plugin_settings(plugin_id)
        except Exception as e:
            errors.append(str(e))

        return errors, plugin_settings