kapsikkum-unmanic – Rev 1

Subversion Repositories:
Rev:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
    unmanic.scheduler.py

    Written by:               Josh.5 <jsunnex@gmail.com>
    Date:                     11 Sep 2021, (11:15 AM)

    Copyright:
           Copyright (C) Josh Sunnex - All Rights Reserved

           Permission is hereby granted, free of charge, to any person obtaining a copy
           of this software and associated documentation files (the "Software"), to deal
           in the Software without restriction, including without limitation the rights
           to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
           copies of the Software, and to permit persons to whom the Software is
           furnished to do so, subject to the following conditions:

           The above copyright notice and this permission notice shall be included in all
           copies or substantial portions of the Software.

           THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
           IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
           DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
           OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
           OR OTHER DEALINGS IN THE SOFTWARE.

"""
import random
import threading
import time
from datetime import datetime, timedelta

import schedule

from unmanic import config
from unmanic.libs import common, task, unlogger
from unmanic.libs.installation_link import Links
from unmanic.libs.plugins import PluginsHandler
from unmanic.libs.session import Session


class ScheduledTasksManager(threading.Thread):
    """
    Manage any tasks that Unmanic needs to execute at regular intervals
    """

    def __init__(self, event):
        super(ScheduledTasksManager, self).__init__(name='ScheduledTasksManager')
        self.logger = None
        self.event = event
        self.abort_flag = threading.Event()
        self.abort_flag.clear()
        self.scheduler = schedule.Scheduler()
        self.force_local_worker_timer = 0

    def _log(self, message, message2='', level="info"):
        if not self.logger:
            unmanic_logging = unlogger.UnmanicLogger.__call__()
            self.logger = unmanic_logging.get_logger(self.name)
        message = common.format_message(message, message2)
        getattr(self.logger, level)(message)

    def stop(self):
        self.abort_flag.set()

    def run(self):
        self._log("Starting ScheduledTasks Monitor loop")

        # Create scheduled tasks
        # Check the session every 60 minutes
        self.scheduler.every(60).minutes.do(self.register_unmanic)
        # Run the plugin repo update every 3 hours
        self.scheduler.every(3).hours.do(self.plugin_repo_update)
        # Run the remote installation link update every 10 seconds
        self.scheduler.every(10).seconds.do(self.update_remote_installation_links)
        # Run the remote installation distributed worker counter sync every minute
        self.scheduler.every(1).minutes.do(self.set_worker_count_based_on_remote_installation_links)
        # Run a completed task cleanup every 60 minutes and on startup
        self.scheduler.every(12).hours.do(self.manage_completed_tasks)
        self.manage_completed_tasks()

        # Loop every 2 seconds to check if a task is due to be run
        while not self.abort_flag.is_set():
            self.event.wait(2)
            # Check if scheduled task is due
            self.scheduler.run_pending()

        # Clear any tasks and exit
        self.scheduler.clear()
        self._log("Leaving ScheduledTasks Monitor loop...")

    def register_unmanic(self):
        self._log("Updating session data")
        s = Session()
        s.register_unmanic(force=True)

    def plugin_repo_update(self):
        self._log("Checking for updates to plugin repos")
        plugin_handler = PluginsHandler()
        plugin_handler.update_plugin_repos()

    def update_remote_installation_links(self):
        # Don't log this as it will happen often
        links = Links()
        links.update_all_remote_installation_links()

    def set_worker_count_based_on_remote_installation_links(self):
        settings = config.Config()

        # Get local task count as int
        task_handler = task.Task()
        local_task_count = int(task_handler.get_total_task_list_count())

        # Get target count
        target_count = int(settings.get_distributed_worker_count_target())
        # # TODO: Check if we should be aiming for one less than the target
        # if target_count > 1:
        #     target_count -= 1

        linked_configs = []
        for local_config in settings.get_remote_installations():
            if local_config.get('enable_distributed_worker_count'):
                linked_configs.append(local_config)

        # If no remote links are configured, then return here
        if not linked_configs:
            return

        # There is a link config with distributed worker counts enabled
        self._log("Syncing distributed worker count for this installation")

        # Get total tasks count of pending tasks across all linked_configs
        total_tasks = local_task_count
        for linked_config in linked_configs:
            total_tasks += int(linked_config.get('task_count', 0))

        # From the counts fetched from all linked_configs, balance out the target count (including this installation)
        allocated_worker_count = 0
        for linked_config in linked_configs:
            if linked_config.get('task_count', 0) == 0:
                continue
            allocated_worker_count += round((int(linked_config.get('task_count', 0)) / total_tasks) * target_count)

        # Calculate worker count for local
        target_workers_for_this_installation = 0
        if local_task_count > 0:
            target_workers_for_this_installation = round((local_task_count / total_tasks) * target_count)

        # If the total allocated worker count is now above our target, set this installation back to 0
        if allocated_worker_count > target_count:
            target_workers_for_this_installation = 0

        # Every 10-12 minutes (make it random), give this installation at least 1 worker if it has pending tasks.
        #       This should cause the pending task queue to sit idle if there is only one task in the queue and it will provide
        #           rotation of workers when the pending task queue is close to the same.
        #       EG. If time now (seconds) > time last checked (seconds) + 10mins (600 seconds) + random seconds within 2mins
        time_now = time.time()
        time_to_next_force_local_worker = int(self.force_local_worker_timer + 600 + random.randrange(120))
        if time_now > time_to_next_force_local_worker:
            if (local_task_count > 1) and (target_workers_for_this_installation < 1):
                target_workers_for_this_installation = 1
                self.force_local_worker_timer = time_now

        self._log("Configuring worker count as {} for this installation".format(target_workers_for_this_installation))
        settings.set_config_item('number_of_workers', target_workers_for_this_installation, save_settings=True)

    def manage_completed_tasks(self):
        settings = config.Config()
        # Only run if configured to auto manage completed tasks
        if not settings.get_auto_manage_completed_tasks():
            return

        self._log("Running completed task cleanup for this installation")
        max_age_in_days = settings.get_max_age_of_completed_tasks()
        date_x_days_ago = datetime.now() - timedelta(days=int(max_age_in_days))
        before_time = date_x_days_ago.timestamp()

        task_success = True
        inc_status = 'successfully'
        if not settings.get_always_keep_failed_tasks():
            inc_status = 'successfully or failed'
            task_success = None

        # Fetch completed tasks
        from unmanic.libs import history
        history_logging = history.History()
        count = history_logging.get_historic_task_list_filtered_and_sorted(task_success=task_success,
                                                                           before_time=before_time).count()
        results = history_logging.get_historic_task_list_filtered_and_sorted(task_success=task_success,
                                                                             before_time=before_time)

        if count == 0:
            self._log("Found no {} completed tasks older than {} days".format(inc_status, max_age_in_days))
            return

        self._log(
            "Found {} {} completed tasks older than {} days that should be removed".format(count, inc_status, max_age_in_days))
        if not history_logging.delete_historic_tasks_recursively(results):
            self._log("Failed to delete {} {} completed tasks".format(count, inc_status), level='error')
            return

        self._log("Deleted {} {} completed tasks".format(count, inc_status))