kapsikkum-unmanic – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env python3 |
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | """ |
||
5 | unmanic.scheduler.py |
||
6 | |||
7 | Written by: Josh.5 <jsunnex@gmail.com> |
||
8 | Date: 11 Sep 2021, (11:15 AM) |
||
9 | |||
10 | Copyright: |
||
11 | Copyright (C) Josh Sunnex - All Rights Reserved |
||
12 | |||
13 | Permission is hereby granted, free of charge, to any person obtaining a copy |
||
14 | of this software and associated documentation files (the "Software"), to deal |
||
15 | in the Software without restriction, including without limitation the rights |
||
16 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||
17 | copies of the Software, and to permit persons to whom the Software is |
||
18 | furnished to do so, subject to the following conditions: |
||
19 | |||
20 | The above copyright notice and this permission notice shall be included in all |
||
21 | copies or substantial portions of the Software. |
||
22 | |||
23 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
||
24 | EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
||
25 | MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
||
26 | IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
||
27 | DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
||
28 | OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE |
||
29 | OR OTHER DEALINGS IN THE SOFTWARE. |
||
30 | |||
31 | """ |
||
32 | import random |
||
33 | import threading |
||
34 | import time |
||
35 | from datetime import datetime, timedelta |
||
36 | |||
37 | import schedule |
||
38 | |||
39 | from unmanic import config |
||
40 | from unmanic.libs import common, task, unlogger |
||
41 | from unmanic.libs.installation_link import Links |
||
42 | from unmanic.libs.plugins import PluginsHandler |
||
43 | from unmanic.libs.session import Session |
||
44 | |||
45 | |||
46 | class ScheduledTasksManager(threading.Thread): |
||
47 | """ |
||
48 | Manage any tasks that Unmanic needs to execute at regular intervals |
||
49 | """ |
||
50 | |||
51 | def __init__(self, event): |
||
52 | super(ScheduledTasksManager, self).__init__(name='ScheduledTasksManager') |
||
53 | self.logger = None |
||
54 | self.event = event |
||
55 | self.abort_flag = threading.Event() |
||
56 | self.abort_flag.clear() |
||
57 | self.scheduler = schedule.Scheduler() |
||
58 | self.force_local_worker_timer = 0 |
||
59 | |||
60 | def _log(self, message, message2='', level="info"): |
||
61 | if not self.logger: |
||
62 | unmanic_logging = unlogger.UnmanicLogger.__call__() |
||
63 | self.logger = unmanic_logging.get_logger(self.name) |
||
64 | message = common.format_message(message, message2) |
||
65 | getattr(self.logger, level)(message) |
||
66 | |||
67 | def stop(self): |
||
68 | self.abort_flag.set() |
||
69 | |||
70 | def run(self): |
||
71 | self._log("Starting ScheduledTasks Monitor loop") |
||
72 | |||
73 | # Create scheduled tasks |
||
74 | # Check the session every 60 minutes |
||
75 | self.scheduler.every(60).minutes.do(self.register_unmanic) |
||
76 | # Run the plugin repo update every 3 hours |
||
77 | self.scheduler.every(3).hours.do(self.plugin_repo_update) |
||
78 | # Run the remote installation link update every 10 seconds |
||
79 | self.scheduler.every(10).seconds.do(self.update_remote_installation_links) |
||
80 | # Run the remote installation distributed worker counter sync every minute |
||
81 | self.scheduler.every(1).minutes.do(self.set_worker_count_based_on_remote_installation_links) |
||
82 | # Run a completed task cleanup every 60 minutes and on startup |
||
83 | self.scheduler.every(12).hours.do(self.manage_completed_tasks) |
||
84 | self.manage_completed_tasks() |
||
85 | |||
86 | # Loop every 2 seconds to check if a task is due to be run |
||
87 | while not self.abort_flag.is_set(): |
||
88 | self.event.wait(2) |
||
89 | # Check if scheduled task is due |
||
90 | self.scheduler.run_pending() |
||
91 | |||
92 | # Clear any tasks and exit |
||
93 | self.scheduler.clear() |
||
94 | self._log("Leaving ScheduledTasks Monitor loop...") |
||
95 | |||
96 | def register_unmanic(self): |
||
97 | self._log("Updating session data") |
||
98 | s = Session() |
||
99 | s.register_unmanic(force=True) |
||
100 | |||
101 | def plugin_repo_update(self): |
||
102 | self._log("Checking for updates to plugin repos") |
||
103 | plugin_handler = PluginsHandler() |
||
104 | plugin_handler.update_plugin_repos() |
||
105 | |||
106 | def update_remote_installation_links(self): |
||
107 | # Don't log this as it will happen often |
||
108 | links = Links() |
||
109 | links.update_all_remote_installation_links() |
||
110 | |||
111 | def set_worker_count_based_on_remote_installation_links(self): |
||
112 | settings = config.Config() |
||
113 | |||
114 | # Get local task count as int |
||
115 | task_handler = task.Task() |
||
116 | local_task_count = int(task_handler.get_total_task_list_count()) |
||
117 | |||
118 | # Get target count |
||
119 | target_count = int(settings.get_distributed_worker_count_target()) |
||
120 | # # TODO: Check if we should be aiming for one less than the target |
||
121 | # if target_count > 1: |
||
122 | # target_count -= 1 |
||
123 | |||
124 | linked_configs = [] |
||
125 | for local_config in settings.get_remote_installations(): |
||
126 | if local_config.get('enable_distributed_worker_count'): |
||
127 | linked_configs.append(local_config) |
||
128 | |||
129 | # If no remote links are configured, then return here |
||
130 | if not linked_configs: |
||
131 | return |
||
132 | |||
133 | # There is a link config with distributed worker counts enabled |
||
134 | self._log("Syncing distributed worker count for this installation") |
||
135 | |||
136 | # Get total tasks count of pending tasks across all linked_configs |
||
137 | total_tasks = local_task_count |
||
138 | for linked_config in linked_configs: |
||
139 | total_tasks += int(linked_config.get('task_count', 0)) |
||
140 | |||
141 | # From the counts fetched from all linked_configs, balance out the target count (including this installation) |
||
142 | allocated_worker_count = 0 |
||
143 | for linked_config in linked_configs: |
||
144 | if linked_config.get('task_count', 0) == 0: |
||
145 | continue |
||
146 | allocated_worker_count += round((int(linked_config.get('task_count', 0)) / total_tasks) * target_count) |
||
147 | |||
148 | # Calculate worker count for local |
||
149 | target_workers_for_this_installation = 0 |
||
150 | if local_task_count > 0: |
||
151 | target_workers_for_this_installation = round((local_task_count / total_tasks) * target_count) |
||
152 | |||
153 | # If the total allocated worker count is now above our target, set this installation back to 0 |
||
154 | if allocated_worker_count > target_count: |
||
155 | target_workers_for_this_installation = 0 |
||
156 | |||
157 | # Every 10-12 minutes (make it random), give this installation at least 1 worker if it has pending tasks. |
||
158 | # This should cause the pending task queue to sit idle if there is only one task in the queue and it will provide |
||
159 | # rotation of workers when the pending task queue is close to the same. |
||
160 | # EG. If time now (seconds) > time last checked (seconds) + 10mins (600 seconds) + random seconds within 2mins |
||
161 | time_now = time.time() |
||
162 | time_to_next_force_local_worker = int(self.force_local_worker_timer + 600 + random.randrange(120)) |
||
163 | if time_now > time_to_next_force_local_worker: |
||
164 | if (local_task_count > 1) and (target_workers_for_this_installation < 1): |
||
165 | target_workers_for_this_installation = 1 |
||
166 | self.force_local_worker_timer = time_now |
||
167 | |||
168 | self._log("Configuring worker count as {} for this installation".format(target_workers_for_this_installation)) |
||
169 | settings.set_config_item('number_of_workers', target_workers_for_this_installation, save_settings=True) |
||
170 | |||
171 | def manage_completed_tasks(self): |
||
172 | settings = config.Config() |
||
173 | # Only run if configured to auto manage completed tasks |
||
174 | if not settings.get_auto_manage_completed_tasks(): |
||
175 | return |
||
176 | |||
177 | self._log("Running completed task cleanup for this installation") |
||
178 | max_age_in_days = settings.get_max_age_of_completed_tasks() |
||
179 | date_x_days_ago = datetime.now() - timedelta(days=int(max_age_in_days)) |
||
180 | before_time = date_x_days_ago.timestamp() |
||
181 | |||
182 | task_success = True |
||
183 | inc_status = 'successfully' |
||
184 | if not settings.get_always_keep_failed_tasks(): |
||
185 | inc_status = 'successfully or failed' |
||
186 | task_success = None |
||
187 | |||
188 | # Fetch completed tasks |
||
189 | from unmanic.libs import history |
||
190 | history_logging = history.History() |
||
191 | count = history_logging.get_historic_task_list_filtered_and_sorted(task_success=task_success, |
||
192 | before_time=before_time).count() |
||
193 | results = history_logging.get_historic_task_list_filtered_and_sorted(task_success=task_success, |
||
194 | before_time=before_time) |
||
195 | |||
196 | if count == 0: |
||
197 | self._log("Found no {} completed tasks older than {} days".format(inc_status, max_age_in_days)) |
||
198 | return |
||
199 | |||
200 | self._log( |
||
201 | "Found {} {} completed tasks older than {} days that should be removed".format(count, inc_status, max_age_in_days)) |
||
202 | if not history_logging.delete_historic_tasks_recursively(results): |
||
203 | self._log("Failed to delete {} {} completed tasks".format(count, inc_status), level='error') |
||
204 | return |
||
205 | |||
206 | self._log("Deleted {} {} completed tasks".format(count, inc_status)) |