kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.scheduler.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 11 Sep 2021, (11:15 AM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import random
33 import threading
34 import time
35 from datetime import datetime, timedelta
36  
37 import schedule
38  
39 from unmanic import config
40 from unmanic.libs import common, task, unlogger
41 from unmanic.libs.installation_link import Links
42 from unmanic.libs.plugins import PluginsHandler
43 from unmanic.libs.session import Session
44  
45  
46 class ScheduledTasksManager(threading.Thread):
47 """
48 Manage any tasks that Unmanic needs to execute at regular intervals
49 """
50  
51 def __init__(self, event):
52 super(ScheduledTasksManager, self).__init__(name='ScheduledTasksManager')
53 self.logger = None
54 self.event = event
55 self.abort_flag = threading.Event()
56 self.abort_flag.clear()
57 self.scheduler = schedule.Scheduler()
58 self.force_local_worker_timer = 0
59  
60 def _log(self, message, message2='', level="info"):
61 if not self.logger:
62 unmanic_logging = unlogger.UnmanicLogger.__call__()
63 self.logger = unmanic_logging.get_logger(self.name)
64 message = common.format_message(message, message2)
65 getattr(self.logger, level)(message)
66  
67 def stop(self):
68 self.abort_flag.set()
69  
70 def run(self):
71 self._log("Starting ScheduledTasks Monitor loop")
72  
73 # Create scheduled tasks
74 # Check the session every 60 minutes
75 self.scheduler.every(60).minutes.do(self.register_unmanic)
76 # Run the plugin repo update every 3 hours
77 self.scheduler.every(3).hours.do(self.plugin_repo_update)
78 # Run the remote installation link update every 10 seconds
79 self.scheduler.every(10).seconds.do(self.update_remote_installation_links)
80 # Run the remote installation distributed worker counter sync every minute
81 self.scheduler.every(1).minutes.do(self.set_worker_count_based_on_remote_installation_links)
82 # Run a completed task cleanup every 60 minutes and on startup
83 self.scheduler.every(12).hours.do(self.manage_completed_tasks)
84 self.manage_completed_tasks()
85  
86 # Loop every 2 seconds to check if a task is due to be run
87 while not self.abort_flag.is_set():
88 self.event.wait(2)
89 # Check if scheduled task is due
90 self.scheduler.run_pending()
91  
92 # Clear any tasks and exit
93 self.scheduler.clear()
94 self._log("Leaving ScheduledTasks Monitor loop...")
95  
96 def register_unmanic(self):
97 self._log("Updating session data")
98 s = Session()
99 s.register_unmanic(force=True)
100  
101 def plugin_repo_update(self):
102 self._log("Checking for updates to plugin repos")
103 plugin_handler = PluginsHandler()
104 plugin_handler.update_plugin_repos()
105  
106 def update_remote_installation_links(self):
107 # Don't log this as it will happen often
108 links = Links()
109 links.update_all_remote_installation_links()
110  
111 def set_worker_count_based_on_remote_installation_links(self):
112 settings = config.Config()
113  
114 # Get local task count as int
115 task_handler = task.Task()
116 local_task_count = int(task_handler.get_total_task_list_count())
117  
118 # Get target count
119 target_count = int(settings.get_distributed_worker_count_target())
120 # # TODO: Check if we should be aiming for one less than the target
121 # if target_count > 1:
122 # target_count -= 1
123  
124 linked_configs = []
125 for local_config in settings.get_remote_installations():
126 if local_config.get('enable_distributed_worker_count'):
127 linked_configs.append(local_config)
128  
129 # If no remote links are configured, then return here
130 if not linked_configs:
131 return
132  
133 # There is a link config with distributed worker counts enabled
134 self._log("Syncing distributed worker count for this installation")
135  
136 # Get total tasks count of pending tasks across all linked_configs
137 total_tasks = local_task_count
138 for linked_config in linked_configs:
139 total_tasks += int(linked_config.get('task_count', 0))
140  
141 # From the counts fetched from all linked_configs, balance out the target count (including this installation)
142 allocated_worker_count = 0
143 for linked_config in linked_configs:
144 if linked_config.get('task_count', 0) == 0:
145 continue
146 allocated_worker_count += round((int(linked_config.get('task_count', 0)) / total_tasks) * target_count)
147  
148 # Calculate worker count for local
149 target_workers_for_this_installation = 0
150 if local_task_count > 0:
151 target_workers_for_this_installation = round((local_task_count / total_tasks) * target_count)
152  
153 # If the total allocated worker count is now above our target, set this installation back to 0
154 if allocated_worker_count > target_count:
155 target_workers_for_this_installation = 0
156  
157 # Every 10-12 minutes (make it random), give this installation at least 1 worker if it has pending tasks.
158 # This should cause the pending task queue to sit idle if there is only one task in the queue and it will provide
159 # rotation of workers when the pending task queue is close to the same.
160 # EG. If time now (seconds) > time last checked (seconds) + 10mins (600 seconds) + random seconds within 2mins
161 time_now = time.time()
162 time_to_next_force_local_worker = int(self.force_local_worker_timer + 600 + random.randrange(120))
163 if time_now > time_to_next_force_local_worker:
164 if (local_task_count > 1) and (target_workers_for_this_installation < 1):
165 target_workers_for_this_installation = 1
166 self.force_local_worker_timer = time_now
167  
168 self._log("Configuring worker count as {} for this installation".format(target_workers_for_this_installation))
169 settings.set_config_item('number_of_workers', target_workers_for_this_installation, save_settings=True)
170  
171 def manage_completed_tasks(self):
172 settings = config.Config()
173 # Only run if configured to auto manage completed tasks
174 if not settings.get_auto_manage_completed_tasks():
175 return
176  
177 self._log("Running completed task cleanup for this installation")
178 max_age_in_days = settings.get_max_age_of_completed_tasks()
179 date_x_days_ago = datetime.now() - timedelta(days=int(max_age_in_days))
180 before_time = date_x_days_ago.timestamp()
181  
182 task_success = True
183 inc_status = 'successfully'
184 if not settings.get_always_keep_failed_tasks():
185 inc_status = 'successfully or failed'
186 task_success = None
187  
188 # Fetch completed tasks
189 from unmanic.libs import history
190 history_logging = history.History()
191 count = history_logging.get_historic_task_list_filtered_and_sorted(task_success=task_success,
192 before_time=before_time).count()
193 results = history_logging.get_historic_task_list_filtered_and_sorted(task_success=task_success,
194 before_time=before_time)
195  
196 if count == 0:
197 self._log("Found no {} completed tasks older than {} days".format(inc_status, max_age_in_days))
198 return
199  
200 self._log(
201 "Found {} {} completed tasks older than {} days that should be removed".format(count, inc_status, max_age_in_days))
202 if not history_logging.delete_historic_tasks_recursively(results):
203 self._log("Failed to delete {} {} completed tasks".format(count, inc_status), level='error')
204 return
205  
206 self._log("Deleted {} {} completed tasks".format(count, inc_status))