kapsikkum-unmanic – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env python3 |
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | """ |
||
5 | unmanic.foreman.py |
||
6 | |||
7 | Written by: Josh.5 <jsunnex@gmail.com> |
||
8 | Date: 02 Jan 2019, (7:21 AM) |
||
9 | |||
10 | Copyright: |
||
11 | Copyright (C) Josh Sunnex - All Rights Reserved |
||
12 | |||
13 | Permission is hereby granted, free of charge, to any person obtaining a copy |
||
14 | of this software and associated documentation files (the "Software"), to deal |
||
15 | in the Software without restriction, including without limitation the rights |
||
16 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||
17 | copies of the Software, and to permit persons to whom the Software is |
||
18 | furnished to do so, subject to the following conditions: |
||
19 | |||
20 | The above copyright notice and this permission notice shall be included in all |
||
21 | copies or substantial portions of the Software. |
||
22 | |||
23 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
||
24 | EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
||
25 | MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
||
26 | IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
||
27 | DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
||
28 | OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE |
||
29 | OR OTHER DEALINGS IN THE SOFTWARE. |
||
30 | |||
31 | """ |
||
32 | import hashlib |
||
33 | import json |
||
34 | import threading |
||
35 | import queue |
||
36 | import time |
||
37 | from datetime import datetime, timedelta |
||
38 | |||
39 | from unmanic.libs import common, installation_link |
||
40 | from unmanic.libs.library import Library |
||
41 | from unmanic.libs.plugins import PluginsHandler |
||
42 | from unmanic.libs.worker_group import WorkerGroup |
||
43 | from unmanic.libs.workers import Worker |
||
44 | |||
45 | |||
46 | class Foreman(threading.Thread): |
||
47 | def __init__(self, data_queues, settings, task_queue, event): |
||
48 | super(Foreman, self).__init__(name='Foreman') |
||
49 | self.settings = settings |
||
50 | self.event = event |
||
51 | self.task_queue = task_queue |
||
52 | self.data_queues = data_queues |
||
53 | self.logger = data_queues["logging"].get_logger(self.name) |
||
54 | self.workers_pending_task_queue = queue.Queue(maxsize=1) |
||
55 | self.remote_workers_pending_task_queue = queue.Queue(maxsize=1) |
||
56 | self.complete_queue = queue.Queue() |
||
57 | self.worker_threads = {} |
||
58 | self.remote_task_manager_threads = {} |
||
59 | self.abort_flag = threading.Event() |
||
60 | self.abort_flag.clear() |
||
61 | |||
62 | # Set the current plugin config |
||
63 | self.current_config = { |
||
64 | 'settings': {}, |
||
65 | 'settings_hash': '' |
||
66 | } |
||
67 | self.configuration_changed() |
||
68 | |||
69 | # Set the current time for scheduler |
||
70 | self.last_schedule_run = datetime.today().strftime('%H:%M') |
||
71 | |||
72 | self.links = installation_link.Links() |
||
73 | self.link_heartbeat_last_run = 0 |
||
74 | self.available_remote_managers = {} |
||
75 | |||
76 | def _log(self, message, message2=None, level="info"): |
||
77 | message = common.format_message(message, message2) |
||
78 | getattr(self.logger, level)(message) |
||
79 | |||
80 | def stop(self): |
||
81 | self.abort_flag.set() |
||
82 | # Stop all workers |
||
83 | # To avoid having the dictionary change size during iteration, |
||
84 | # we need to first get the thread_keys, then iterate through that |
||
85 | thread_keys = [t for t in self.worker_threads] |
||
86 | for thread in thread_keys: |
||
87 | self.mark_worker_thread_as_redundant(thread) |
||
88 | # Stop all remote link manager threads |
||
89 | thread_keys = [t for t in self.remote_task_manager_threads] |
||
90 | for thread in thread_keys: |
||
91 | self.mark_remote_task_manager_thread_as_redundant(thread) |
||
92 | |||
93 | def get_total_worker_count(self): |
||
94 | """Returns the worker count as an integer""" |
||
95 | worker_count = 0 |
||
96 | for worker_group in WorkerGroup.get_all_worker_groups(): |
||
97 | worker_count += worker_group.get('number_of_workers', 0) |
||
98 | return int(worker_count) |
||
99 | |||
100 | def save_current_config(self, settings=None, settings_hash=None): |
||
101 | if settings: |
||
102 | self.current_config['settings'] = settings |
||
103 | if settings_hash: |
||
104 | self.current_config['settings_hash'] = settings_hash |
||
105 | self._log('Updated config. If this is modified, all workers will be paused', level='debug') |
||
106 | |||
107 | def get_current_library_configuration(self): |
||
108 | # Fetch all libraries |
||
109 | all_plugin_settings = {} |
||
110 | for library in Library.get_all_libraries(): |
||
111 | try: |
||
112 | library_config = Library(library.get('id')) |
||
113 | except Exception as e: |
||
114 | self._log("Unable to fetch library config for ID {}".format(library.get('id')), level='exception') |
||
115 | continue |
||
116 | # Get list of enabled plugins with their settings |
||
117 | enabled_plugins = [] |
||
118 | for enabled_plugin in library_config.get_enabled_plugins(include_settings=True): |
||
119 | enabled_plugins.append({ |
||
120 | 'plugin_id': enabled_plugin.get('plugin_id'), |
||
121 | 'settings': enabled_plugin.get('settings'), |
||
122 | }) |
||
123 | |||
124 | # Get the plugin flow |
||
125 | plugin_flow = library_config.get_plugin_flow() |
||
126 | |||
127 | # Append this library's plugin config and flow the the dictionary |
||
128 | all_plugin_settings[library.get('id')] = { |
||
129 | 'enabled_plugins': enabled_plugins, |
||
130 | 'plugin_flow': plugin_flow, |
||
131 | } |
||
132 | return all_plugin_settings |
||
133 | |||
134 | def configuration_changed(self): |
||
135 | current_settings = self.get_current_library_configuration() |
||
136 | # Compare current settings with foreman recorded settings. |
||
137 | json_encoded_settings = json.dumps(current_settings, sort_keys=True).encode() |
||
138 | current_settings_hash = hashlib.md5(json_encoded_settings).hexdigest() |
||
139 | if current_settings_hash == self.current_config.get('settings_hash', ''): |
||
140 | return False |
||
141 | # Record current settings |
||
142 | self.save_current_config(settings=current_settings, settings_hash=current_settings_hash) |
||
143 | # Settings have changed |
||
144 | return True |
||
145 | |||
146 | def validate_worker_config(self): |
||
147 | valid = True |
||
148 | frontend_messages = self.data_queues.get('frontend_messages') |
||
149 | |||
150 | # Ensure that the enabled plugins are compatible with the PluginHandler version |
||
151 | plugin_handler = PluginsHandler() |
||
152 | if plugin_handler.get_incompatible_enabled_plugins(frontend_messages): |
||
153 | valid = False |
||
154 | if not self.links.within_enabled_link_limits(frontend_messages): |
||
155 | valid = False |
||
156 | |||
157 | # Check if plugin configuration has been modified. If it has, stop the workers. |
||
158 | # What we want to avoid here is someone partially modifying the plugin configuration |
||
159 | # and having the workers pickup a job mid configuration. |
||
160 | if self.configuration_changed(): |
||
161 | # Generate a frontend message and falsify validation |
||
162 | frontend_messages.put( |
||
163 | { |
||
164 | 'id': 'pluginSettingsChangeWorkersStopped', |
||
165 | 'type': 'warning', |
||
166 | 'code': 'pluginSettingsChangeWorkersStopped', |
||
167 | 'message': '', |
||
168 | 'timeout': 0 |
||
169 | } |
||
170 | ) |
||
171 | valid = False |
||
172 | |||
173 | # Ensure library config is within limits |
||
174 | if not Library.within_library_count_limits(frontend_messages): |
||
175 | valid = False |
||
176 | |||
177 | return valid |
||
178 | |||
179 | def run_task(self, time_now, task, worker_count, worker_group): |
||
180 | worker_group_id = worker_group.get_id() |
||
181 | self.last_schedule_run = time_now |
||
182 | if task == 'pause': |
||
183 | # Pause all workers now |
||
184 | self._log("Running scheduled event - Pause all worker threads", level='debug') |
||
185 | self.pause_all_worker_threads(worker_group_id=worker_group_id) |
||
186 | elif task == 'resume': |
||
187 | # Resume all workers now |
||
188 | self._log("Running scheduled event - Resume all worker threads", level='debug') |
||
189 | self.resume_all_worker_threads(worker_group_id=worker_group_id) |
||
190 | elif task == 'count': |
||
191 | # Set the worker count value |
||
192 | # Save the settings so this scheduled event will persist an application restart |
||
193 | self._log("Running scheduled event - Setting worker count to '{}'".format(worker_count), level='debug') |
||
194 | worker_group.set_number_of_workers(worker_count) |
||
195 | worker_group.save() |
||
196 | |||
197 | def manage_event_schedules(self): |
||
198 | """ |
||
199 | Manage all scheduled worker events |
||
200 | This function limits itself to run only once every 60 seconds |
||
201 | |||
202 | :return: |
||
203 | """ |
||
204 | days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] |
||
205 | day_of_week = datetime.today().today().weekday() |
||
206 | time_now = datetime.today().strftime('%H:%M') |
||
207 | |||
208 | # Only run once a minute |
||
209 | if time_now == self.last_schedule_run: |
||
210 | return |
||
211 | |||
212 | for wg in WorkerGroup.get_all_worker_groups(): |
||
213 | try: |
||
214 | worker_group = WorkerGroup(group_id=wg.get('id')) |
||
215 | event_schedules = worker_group.get_worker_event_schedules() |
||
216 | except Exception as e: |
||
217 | self._log("While iterating through the worker groups, the worker group disappeared", str(e), level='debug') |
||
218 | continue |
||
219 | |||
220 | for event_schedule in event_schedules: |
||
221 | schedule_time = event_schedule.get('schedule_time') |
||
222 | # Ensure we have a schedule time |
||
223 | if not schedule_time: |
||
224 | continue |
||
225 | # Ensure the schedule time is now |
||
226 | if time_now != schedule_time: |
||
227 | continue |
||
228 | |||
229 | repetition = event_schedule.get('repetition') |
||
230 | # Ensure we have a repetition |
||
231 | if not repetition: |
||
232 | continue |
||
233 | |||
234 | # Check if it should run |
||
235 | if repetition == 'daily': |
||
236 | self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'), |
||
237 | worker_group) |
||
238 | elif repetition == 'weekday' and days[day_of_week] not in ['saturday', 'sunday']: |
||
239 | self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'), |
||
240 | worker_group) |
||
241 | elif repetition == 'weekend' and days[day_of_week] in ['saturday', 'sunday']: |
||
242 | self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'), |
||
243 | worker_group) |
||
244 | elif repetition == days[day_of_week]: |
||
245 | self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'), |
||
246 | worker_group) |
||
247 | |||
248 | def init_worker_threads(self): |
||
249 | # Remove any redundant idle workers from our list |
||
250 | # To avoid having the dictionary change size during iteration, |
||
251 | # we need to first get the thread_keys, then iterate through that |
||
252 | thread_keys = [t for t in self.worker_threads] |
||
253 | for thread in thread_keys: |
||
254 | if thread in self.worker_threads: |
||
255 | if not self.worker_threads[thread].is_alive(): |
||
256 | del self.worker_threads[thread] |
||
257 | |||
258 | # Check that we have enough workers running. Spawn new ones as required. |
||
259 | worker_group_ids = [] |
||
260 | worker_group_names = [] |
||
261 | for worker_group in WorkerGroup.get_all_worker_groups(): |
||
262 | worker_group_ids.append(worker_group.get('id')) |
||
263 | |||
264 | # Create threads as required |
||
265 | for i in range(worker_group.get('number_of_workers')): |
||
266 | worker_id = "{}-{}".format(worker_group.get('name'), i) |
||
267 | worker_name = "{}-Worker-{}".format(worker_group.get('name'), (i + 1)) |
||
268 | # Add this name to a list. If the name changes, we can remove old incorrectly named workers |
||
269 | worker_group_names.append(worker_name) |
||
270 | if worker_id not in self.worker_threads: |
||
271 | # This worker does not yet exist, create it |
||
272 | self.start_worker_thread(worker_id, worker_name, worker_group.get('id')) |
||
273 | |||
274 | # Remove any workers that do not belong. The max number of supported workers is 12 |
||
275 | for i in range(worker_group.get('number_of_workers'), 12): |
||
276 | worker_id = "{}-{}".format(worker_group.get('name'), i) |
||
277 | if worker_id in self.worker_threads: |
||
278 | # Only remove threads that are idle (never terminate a task just to reduce worker count) |
||
279 | if self.worker_threads[worker_id].idle: |
||
280 | self.mark_worker_thread_as_redundant(worker_id) |
||
281 | |||
282 | # Remove workers for groups that no longer exist |
||
283 | for thread in self.worker_threads: |
||
284 | worker_group_id = self.worker_threads[thread].worker_group_id |
||
285 | worker_name = self.worker_threads[thread].name |
||
286 | if worker_group_id not in worker_group_ids or worker_name not in worker_group_names: |
||
287 | # Only remove threads that are idle (never terminate a task just to reduce worker count) |
||
288 | if self.worker_threads[thread].idle: |
||
289 | self.mark_worker_thread_as_redundant(thread) |
||
290 | |||
291 | def fetch_available_remote_installation(self, library_name=None): |
||
292 | # Fetch the first matching remote worker from the list |
||
293 | assigned_installation_id = None |
||
294 | assigned_installation_info = {} |
||
295 | installation_ids = [t for t in self.available_remote_managers] |
||
296 | for installation_id in installation_ids: |
||
297 | if installation_id not in self.remote_task_manager_threads: |
||
298 | # Check that a remote worker is on an installation with a matching library name |
||
299 | installation_library_names = self.available_remote_managers[installation_id].get('library_names', []) |
||
300 | if library_name is not None and library_name not in installation_library_names: |
||
301 | continue |
||
302 | assigned_installation_info = self.available_remote_managers[installation_id] |
||
303 | assigned_installation_id = installation_id |
||
304 | break |
||
305 | return assigned_installation_id, assigned_installation_info |
||
306 | |||
307 | def init_remote_task_manager_thread(self, library_name=None): |
||
308 | # Fetch the installation ID and info |
||
309 | installation_id, installation_info = self.fetch_available_remote_installation(library_name=library_name) |
||
310 | del self.available_remote_managers[installation_id] |
||
311 | |||
312 | # Ensure a worker was assigned |
||
313 | if not installation_info: |
||
314 | return False |
||
315 | |||
316 | # Startup a thread |
||
317 | thread = installation_link.RemoteTaskManager(installation_id, |
||
318 | "RemoteTaskManager-{}".format(installation_id), |
||
319 | installation_info, |
||
320 | self.remote_workers_pending_task_queue, |
||
321 | self.complete_queue, |
||
322 | self.event) |
||
323 | thread.daemon = True |
||
324 | thread.start() |
||
325 | self.remote_task_manager_threads[installation_id] = thread |
||
326 | return True |
||
327 | |||
328 | def remove_stale_available_remote_managers(self): |
||
329 | """ |
||
330 | Loop over the current list of available remote managers and remove any that were marked available over X seconds ago |
||
331 | This ensures that the data on these manager info lists are up-to-date if the remote installation config changes. |
||
332 | |||
333 | :return: |
||
334 | """ |
||
335 | installation_ids = [t for t in self.available_remote_managers] |
||
336 | for installation_id in installation_ids: |
||
337 | if installation_id not in self.remote_task_manager_threads: |
||
338 | # Check that a remote worker is on an installation with a matching library name |
||
339 | installation_info = self.available_remote_managers[installation_id] |
||
340 | if installation_info.get('created') < datetime.now() - timedelta(seconds=30): |
||
341 | del self.available_remote_managers[installation_id] |
||
342 | |||
343 | def remove_stopped_remote_task_manager_threads(self): |
||
344 | """ |
||
345 | Remove any redundant link managers from our list |
||
346 | Remove any worker IDs from the remote_task_manager_threads list so they are freed up for another link manager thread |
||
347 | |||
348 | :return: |
||
349 | """ |
||
350 | # Remove any redundant link managers from our list |
||
351 | thread_keys = [t for t in self.remote_task_manager_threads] |
||
352 | for thread in thread_keys: |
||
353 | if thread in self.remote_task_manager_threads: |
||
354 | if not self.remote_task_manager_threads[thread].is_alive(): |
||
355 | self._log("Removing thread '{}'".format(thread), level='debug') |
||
356 | del self.remote_task_manager_threads[thread] |
||
357 | continue |
||
358 | |||
359 | def terminate_unlinked_remote_task_manager_threads(self): |
||
360 | """ |
||
361 | Mark a manager as redundant if the remote installation configuration has been removed |
||
362 | |||
363 | :return: |
||
364 | """ |
||
365 | # Get a list of configured UUIDS |
||
366 | configured_uuids = {} |
||
367 | for configured_remote_installation in self.settings.get_remote_installations(): |
||
368 | if configured_remote_installation.get('uuid'): |
||
369 | configured_uuids[configured_remote_installation.get('uuid')] = configured_remote_installation.get('address') |
||
370 | # Find and remove any redundant link managers from our list |
||
371 | term_log_msg = "Remote installation link with {} '{}' has been removed from settings. Marking tread for termination" |
||
372 | for thread in self.remote_task_manager_threads: |
||
373 | thread_info = self.remote_task_manager_threads[thread].get_info() |
||
374 | thread_assigned_uuid = thread_info.get('installation_info', {}).get('uuid') |
||
375 | thread_assigned_address = thread_info.get('installation_info', {}).get('address') |
||
376 | # Ensure the UUID is still in our config |
||
377 | if thread_assigned_uuid not in configured_uuids: |
||
378 | self.mark_remote_task_manager_thread_as_redundant(thread) |
||
379 | self._log(term_log_msg.format('UUID', thread_assigned_uuid)) |
||
380 | continue |
||
381 | # Ensure the configured address has not changed |
||
382 | configured_address = configured_uuids.get(thread_assigned_uuid) |
||
383 | if thread_assigned_address not in configured_address: |
||
384 | self.mark_remote_task_manager_thread_as_redundant(thread) |
||
385 | self._log(term_log_msg.format('address', thread_assigned_address)) |
||
386 | continue |
||
387 | |||
388 | def update_remote_worker_availability_status(self): |
||
389 | """ |
||
390 | Updates the list of available remote managers that can be started |
||
391 | |||
392 | :return: |
||
393 | """ |
||
394 | available_installations = self.links.check_remote_installation_for_available_workers() |
||
395 | for installation_uuid in available_installations: |
||
396 | remote_address = available_installations[installation_uuid].get('address', '') |
||
397 | remote_auth = available_installations[installation_uuid].get('auth', 'None') |
||
398 | remote_username = available_installations[installation_uuid].get('username', '') |
||
399 | remote_password = available_installations[installation_uuid].get('password', '') |
||
400 | remote_library_names = available_installations[installation_uuid].get('library_names', []) |
||
401 | available_slots = available_installations[installation_uuid].get('available_slots', 0) |
||
402 | for slot_number in range(available_slots): |
||
403 | remote_manager_id = "{}|M{}".format(installation_uuid, slot_number) |
||
404 | if remote_manager_id in self.available_remote_managers or remote_manager_id in self.remote_task_manager_threads: |
||
405 | # This worker is already managed by a link manager thread or is already in the list of available workers |
||
406 | continue |
||
407 | # Add this remote worker ID to the list of available remote managers |
||
408 | self.available_remote_managers[remote_manager_id] = { |
||
409 | 'uuid': installation_uuid, |
||
410 | 'address': remote_address, |
||
411 | 'auth': remote_auth, |
||
412 | 'username': remote_username, |
||
413 | 'password': remote_password, |
||
414 | 'library_names': remote_library_names, |
||
415 | 'created': datetime.now(), |
||
416 | } |
||
417 | |||
418 | def start_worker_thread(self, worker_id, worker_name, worker_group): |
||
419 | thread = Worker(worker_id, worker_name, worker_group, self.workers_pending_task_queue, |
||
420 | self.complete_queue, self.event) |
||
421 | thread.daemon = True |
||
422 | thread.start() |
||
423 | self.worker_threads[worker_id] = thread |
||
424 | |||
425 | def fetch_available_worker_ids(self): |
||
426 | tread_ids = [] |
||
427 | for thread in self.worker_threads: |
||
428 | if self.worker_threads[thread].idle and self.worker_threads[thread].is_alive(): |
||
429 | if not self.worker_threads[thread].paused: |
||
430 | tread_ids.append(self.worker_threads[thread].thread_id) |
||
431 | return tread_ids |
||
432 | |||
433 | def check_for_idle_workers(self): |
||
434 | for thread in self.worker_threads: |
||
435 | if self.worker_threads[thread].idle and self.worker_threads[thread].is_alive(): |
||
436 | if not self.worker_threads[thread].paused: |
||
437 | return True |
||
438 | return False |
||
439 | |||
440 | def check_for_idle_remote_workers(self): |
||
441 | if self.available_remote_managers: |
||
442 | return True |
||
443 | return False |
||
444 | |||
445 | def get_available_remote_library_names(self): |
||
446 | library_names = [] |
||
447 | for installation_id in self.available_remote_managers: |
||
448 | for library_name in self.available_remote_managers[installation_id].get('library_names', []): |
||
449 | if library_name not in library_names: |
||
450 | library_names.append(library_name) |
||
451 | return library_names |
||
452 | |||
453 | def get_tags_configured_for_worker(self, worker_id): |
||
454 | """Fetch the tags for a given worker ID""" |
||
455 | assigned_worker_group_id = self.worker_threads[worker_id].worker_group_id |
||
456 | worker_group = WorkerGroup(group_id=assigned_worker_group_id) |
||
457 | return worker_group.get_tags() |
||
458 | |||
459 | def postprocessor_queue_full(self): |
||
460 | """ |
||
461 | Check if Post-processor queue is greater than the number of workers enabled. |
||
462 | If it is, return True. Else False. |
||
463 | |||
464 | :return: |
||
465 | """ |
||
466 | frontend_messages = self.data_queues.get('frontend_messages') |
||
467 | # Use the configured worker count + 1 as the post-processor queue limit |
||
468 | limit = (int(self.get_total_worker_count()) + 1) |
||
469 | # Include a count of all available and busy remote workers for the postprocessor queue limit |
||
470 | limit += len(self.available_remote_managers) |
||
471 | limit += len(self.remote_task_manager_threads) |
||
472 | current_count = len(self.task_queue.list_processed_tasks()) |
||
473 | if current_count > limit: |
||
474 | msg = "There are currently {} items in the post-processor queue. Halting feeding workers until it drops below {}." |
||
475 | self._log(msg.format(current_count, limit), level='warning') |
||
476 | frontend_messages.update( |
||
477 | { |
||
478 | 'id': 'pendingTaskHaltedPostProcessorQueueFull', |
||
479 | 'type': 'status', |
||
480 | 'code': 'pendingTaskHaltedPostProcessorQueueFull', |
||
481 | 'message': '', |
||
482 | 'timeout': 0 |
||
483 | } |
||
484 | ) |
||
485 | return True |
||
486 | |||
487 | # Remove the status notification |
||
488 | frontend_messages.remove_item('pendingTaskHaltedPostProcessorQueueFull') |
||
489 | return False |
||
490 | |||
491 | def pause_worker_thread(self, worker_id): |
||
492 | """ |
||
493 | Pauses a single worker thread |
||
494 | |||
495 | :param worker_id: |
||
496 | :type worker_id: |
||
497 | :return: |
||
498 | :rtype: |
||
499 | """ |
||
500 | if worker_id not in self.worker_threads: |
||
501 | self._log("Asked to pause Worker ID '{}', but this was not found.".format(worker_id), level='warning') |
||
502 | return False |
||
503 | |||
504 | if not self.worker_threads[worker_id].paused_flag.is_set(): |
||
505 | self._log("Asked to pause Worker ID '{}'".format(worker_id), level='debug') |
||
506 | self.worker_threads[worker_id].paused_flag.set() |
||
507 | return True |
||
508 | |||
509 | def pause_all_worker_threads(self, worker_group_id=None): |
||
510 | """Pause all threads""" |
||
511 | result = True |
||
512 | for thread in self.worker_threads: |
||
513 | # Limit by worker group if requested |
||
514 | if worker_group_id and self.worker_threads[thread].worker_group_id != worker_group_id: |
||
515 | continue |
||
516 | if not self.pause_worker_thread(thread): |
||
517 | result = False |
||
518 | return result |
||
519 | |||
520 | def resume_worker_thread(self, worker_id): |
||
521 | """ |
||
522 | Resume a single worker thread |
||
523 | |||
524 | :param worker_id: |
||
525 | :type worker_id: |
||
526 | :return: |
||
527 | :rtype: |
||
528 | """ |
||
529 | self._log("Asked to resume Worker ID '{}'".format(worker_id), level='debug') |
||
530 | if worker_id not in self.worker_threads: |
||
531 | self._log("Asked to resume Worker ID '{}', but this was not found.".format(worker_id), level='warning') |
||
532 | return False |
||
533 | |||
534 | self.worker_threads[worker_id].paused_flag.clear() |
||
535 | return True |
||
536 | |||
537 | def resume_all_worker_threads(self, worker_group_id=None): |
||
538 | """Resume all threads""" |
||
539 | result = True |
||
540 | for thread in self.worker_threads: |
||
541 | # Limit by worker group if requested |
||
542 | if worker_group_id and self.worker_threads[thread].worker_group_id != worker_group_id: |
||
543 | continue |
||
544 | if not self.resume_worker_thread(thread): |
||
545 | result = False |
||
546 | return result |
||
547 | |||
548 | def terminate_worker_thread(self, worker_id): |
||
549 | """ |
||
550 | Terminate a single worker thread |
||
551 | |||
552 | :param worker_id: |
||
553 | :type worker_id: |
||
554 | :return: |
||
555 | :rtype: |
||
556 | """ |
||
557 | self._log("Asked to terminate Worker ID '{}'".format(worker_id), level='debug') |
||
558 | if worker_id not in self.worker_threads: |
||
559 | self._log("Asked to terminate Worker ID '{}', but this was not found.".format(worker_id), level='warning') |
||
560 | return False |
||
561 | |||
562 | self.mark_worker_thread_as_redundant(worker_id) |
||
563 | return True |
||
564 | |||
565 | def terminate_all_worker_threads(self): |
||
566 | """Terminate all threads""" |
||
567 | result = True |
||
568 | for thread in self.worker_threads: |
||
569 | if not self.terminate_worker_thread(thread): |
||
570 | result = False |
||
571 | return result |
||
572 | |||
573 | def mark_worker_thread_as_redundant(self, worker_id): |
||
574 | self.worker_threads[worker_id].redundant_flag.set() |
||
575 | |||
576 | def mark_remote_task_manager_thread_as_redundant(self, link_manager_id): |
||
577 | self.remote_task_manager_threads[link_manager_id].redundant_flag.set() |
||
578 | |||
579 | def hand_task_to_workers(self, item, local=True, library_name=None, worker_id=None): |
||
580 | if local: |
||
581 | # Assign the task to the worker id provided |
||
582 | if worker_id in self.worker_threads and self.worker_threads[worker_id].is_alive(): |
||
583 | self.worker_threads[worker_id].set_task(item) |
||
584 | # If the worker thread specified was not available to collect this task, it will be fetched again in the next loop |
||
585 | else: |
||
586 | # Place into queue for a remote link manager thread to collect |
||
587 | self.remote_workers_pending_task_queue.put(item) |
||
588 | # Spawn link manager thread to pickup task |
||
589 | if not self.init_remote_task_manager_thread(library_name=library_name): |
||
590 | # Remove item from queue |
||
591 | self.remote_workers_pending_task_queue.get_nowait() |
||
592 | # Return failure. This will cause the item to be re-queued at the bottom of the list |
||
593 | return False |
||
594 | return True |
||
595 | |||
596 | def link_manager_tread_heartbeat(self): |
||
597 | """ |
||
598 | Run a list of tasks to test the status of our Link Management threads. |
||
599 | Unlike worker threads, Link Management threads live and die for a single task. |
||
600 | If a Link Management thread is alive for more than 10 seconds without picking up a task, it will die. |
||
601 | This function will reap all dead or completed threads and clean up issues where a thread may have died |
||
602 | before running a task that was added to the pending task queue (in which case a new thread should be started) |
||
603 | |||
604 | :return: |
||
605 | """ |
||
606 | # Only run heartbeat every 10 seconds |
||
607 | time_now = time.time() |
||
608 | if self.link_heartbeat_last_run > (time_now - 10): |
||
609 | return |
||
610 | # self._log("Running remote link manager heartbeat", level='debug') |
||
611 | # Terminate remote manager threads for unlinked installations |
||
612 | self.terminate_unlinked_remote_task_manager_threads() |
||
613 | # Clear out dead threads |
||
614 | self.remove_stopped_remote_task_manager_threads() |
||
615 | # Clear out old available workers (should last only a minute before being refreshed) |
||
616 | self.remove_stale_available_remote_managers() |
||
617 | # Check for updates to the worker availability status of linked remote installations |
||
618 | self.update_remote_worker_availability_status() |
||
619 | # Mark this as the last time run |
||
620 | self.link_heartbeat_last_run = time_now |
||
621 | |||
622 | def run(self): |
||
623 | self._log("Starting Foreman Monitor loop") |
||
624 | |||
625 | # Flag to force checking for idle remote workers when set to False. |
||
626 | # This will prevent always looping on idle local workers when the local worker's |
||
627 | # tags prevent them from taking up tasks |
||
628 | allow_local_idle_worker_check = True |
||
629 | |||
630 | while not self.abort_flag.is_set(): |
||
631 | self.event.wait(2) |
||
632 | |||
633 | try: |
||
634 | # Fetch all completed tasks from workers |
||
635 | while not self.abort_flag.is_set() and not self.complete_queue.empty(): |
||
636 | self.event.wait(.5) |
||
637 | try: |
||
638 | task_item = self.complete_queue.get_nowait() |
||
639 | task_item.set_status('processed') |
||
640 | except queue.Empty: |
||
641 | continue |
||
642 | except Exception as e: |
||
643 | self._log("Exception when fetching completed task report from worker", message2=str(e), |
||
644 | level="exception") |
||
645 | |||
646 | # Setup the correct number of workers |
||
647 | if not self.abort_flag.is_set(): |
||
648 | self.init_worker_threads() |
||
649 | |||
650 | # If the worker config is not valid, then pause all workers until it is |
||
651 | if not self.validate_worker_config(): |
||
652 | # Pause all workers |
||
653 | self.pause_all_worker_threads() |
||
654 | continue |
||
655 | |||
656 | # Manage worker event schedules |
||
657 | self.manage_event_schedules() |
||
658 | |||
659 | if not self.abort_flag.is_set() and not self.task_queue.task_list_pending_is_empty(): |
||
660 | |||
661 | # Check the status of all link manager threads (close dead ones) |
||
662 | self.link_manager_tread_heartbeat() |
||
663 | |||
664 | # Check if we are able to start up a worker for another encoding job |
||
665 | # These queues holds only one task at a time and is used to hand tasks to the workers |
||
666 | if self.workers_pending_task_queue.full() or self.remote_workers_pending_task_queue.full(): |
||
667 | # In order to simplify the process and run the foreman management in a single thread, if either of |
||
668 | # these are full, it means the thread that is assigned to pick up the item has not done so. |
||
669 | # In order to prevent a second thread starting and taking the first thread's task, we should not |
||
670 | # process any more pending tasks until that first thread is ready and has taken its task out of the |
||
671 | # queue. |
||
672 | continue |
||
673 | |||
674 | # Check if there are any free workers |
||
675 | worker_ids = [] |
||
676 | if allow_local_idle_worker_check and self.check_for_idle_workers(): |
||
677 | # Local workers are available |
||
678 | process_local = True |
||
679 | # For local workers, process either local tasks or tasks provided from a remote installation |
||
680 | get_local_pending_tasks_only = False |
||
681 | # Specify the worker ID that will handle the next task |
||
682 | worker_ids = self.fetch_available_worker_ids() |
||
683 | # If not workers were available (possibly due to being recycled), just continue loop |
||
684 | if not worker_ids: |
||
685 | continue |
||
686 | elif self.check_for_idle_remote_workers(): |
||
687 | allow_local_idle_worker_check = True |
||
688 | # Remote workers are available |
||
689 | process_local = False |
||
690 | # For remote workers, only process local tasks. Don't hand remote tasks to another remote installation |
||
691 | get_local_pending_tasks_only = True |
||
692 | else: |
||
693 | allow_local_idle_worker_check = True |
||
694 | # All workers are currently busy |
||
695 | self.event.wait(1) |
||
696 | continue |
||
697 | |||
698 | # Check if postprocessor task queue is full |
||
699 | if self.postprocessor_queue_full(): |
||
700 | self.event.wait(5) |
||
701 | continue |
||
702 | |||
703 | # Fetch the next item in the queue |
||
704 | available_worker_id = None |
||
705 | next_item_to_process = None |
||
706 | if process_local: |
||
707 | # For local processing, ensure tags match the available library and worker |
||
708 | for worker_id in worker_ids: |
||
709 | try: |
||
710 | library_tags = self.get_tags_configured_for_worker(worker_id) |
||
711 | except Exception as e: |
||
712 | # This will happen if the worker group is deleted |
||
713 | self._log("Error while fetching the tags for the configured worker", str(e), level='debug') |
||
714 | # Break this fore loop. The main while loop wil clean up these workers on the next pass |
||
715 | break |
||
716 | next_item_to_process = self.task_queue.get_next_pending_tasks( |
||
717 | local_only=get_local_pending_tasks_only, |
||
718 | library_tags=library_tags) |
||
719 | if next_item_to_process: |
||
720 | available_worker_id = worker_id |
||
721 | break |
||
722 | # If no local worker ID was assigned to the given item, then try again in 2 seconds |
||
723 | if not available_worker_id: |
||
724 | allow_local_idle_worker_check = False |
||
725 | self.event.wait(1) |
||
726 | continue |
||
727 | else: |
||
728 | # For remote items, run a search matching an available remote installation library |
||
729 | remote_library_names = self.get_available_remote_library_names() |
||
730 | next_item_to_process = self.task_queue.get_next_pending_tasks(local_only=get_local_pending_tasks_only, |
||
731 | library_names=remote_library_names) |
||
732 | |||
733 | if next_item_to_process: |
||
734 | try: |
||
735 | source_abspath = next_item_to_process.get_source_abspath() |
||
736 | task_library_name = next_item_to_process.get_task_library_name() |
||
737 | except Exception as e: |
||
738 | self._log("Exception in fetching task details", message2=str(e), level="exception") |
||
739 | self.event.wait(3) |
||
740 | continue |
||
741 | |||
742 | self._log("Processing item - {}".format(source_abspath)) |
||
743 | success = self.hand_task_to_workers(next_item_to_process, local=process_local, |
||
744 | library_name=task_library_name, |
||
745 | worker_id=available_worker_id) |
||
746 | if not success: |
||
747 | self._log("Re-queueing tasks. Unable to find worker capable of processing task '{}'".format( |
||
748 | next_item_to_process.get_source_abspath()), level="warning") |
||
749 | # Re-queue item at the bottom |
||
750 | self.task_queue.requeue_tasks_at_bottom(next_item_to_process.get_task_id()) |
||
751 | except Exception as e: |
||
752 | raise Exception(e) |
||
753 | |||
754 | self._log("Leaving Foreman Monitor loop...") |
||
755 | |||
756 | def get_all_worker_status(self): |
||
757 | all_status = [] |
||
758 | for thread in self.worker_threads: |
||
759 | all_status.append(self.worker_threads[thread].get_status()) |
||
760 | return all_status |
||
761 | |||
762 | def get_worker_status(self, worker_id): |
||
763 | result = {} |
||
764 | for thread in self.worker_threads: |
||
765 | if int(worker_id) == int(thread): |
||
766 | result = self.worker_threads[thread].get_status() |
||
767 | return result |