kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.foreman.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 02 Jan 2019, (7:21 AM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import hashlib
33 import json
34 import threading
35 import queue
36 import time
37 from datetime import datetime, timedelta
38  
39 from unmanic.libs import common, installation_link
40 from unmanic.libs.library import Library
41 from unmanic.libs.plugins import PluginsHandler
42 from unmanic.libs.worker_group import WorkerGroup
43 from unmanic.libs.workers import Worker
44  
45  
46 class Foreman(threading.Thread):
47 def __init__(self, data_queues, settings, task_queue, event):
48 super(Foreman, self).__init__(name='Foreman')
49 self.settings = settings
50 self.event = event
51 self.task_queue = task_queue
52 self.data_queues = data_queues
53 self.logger = data_queues["logging"].get_logger(self.name)
54 self.workers_pending_task_queue = queue.Queue(maxsize=1)
55 self.remote_workers_pending_task_queue = queue.Queue(maxsize=1)
56 self.complete_queue = queue.Queue()
57 self.worker_threads = {}
58 self.remote_task_manager_threads = {}
59 self.abort_flag = threading.Event()
60 self.abort_flag.clear()
61  
62 # Set the current plugin config
63 self.current_config = {
64 'settings': {},
65 'settings_hash': ''
66 }
67 self.configuration_changed()
68  
69 # Set the current time for scheduler
70 self.last_schedule_run = datetime.today().strftime('%H:%M')
71  
72 self.links = installation_link.Links()
73 self.link_heartbeat_last_run = 0
74 self.available_remote_managers = {}
75  
76 def _log(self, message, message2=None, level="info"):
77 message = common.format_message(message, message2)
78 getattr(self.logger, level)(message)
79  
80 def stop(self):
81 self.abort_flag.set()
82 # Stop all workers
83 # To avoid having the dictionary change size during iteration,
84 # we need to first get the thread_keys, then iterate through that
85 thread_keys = [t for t in self.worker_threads]
86 for thread in thread_keys:
87 self.mark_worker_thread_as_redundant(thread)
88 # Stop all remote link manager threads
89 thread_keys = [t for t in self.remote_task_manager_threads]
90 for thread in thread_keys:
91 self.mark_remote_task_manager_thread_as_redundant(thread)
92  
93 def get_total_worker_count(self):
94 """Returns the worker count as an integer"""
95 worker_count = 0
96 for worker_group in WorkerGroup.get_all_worker_groups():
97 worker_count += worker_group.get('number_of_workers', 0)
98 return int(worker_count)
99  
100 def save_current_config(self, settings=None, settings_hash=None):
101 if settings:
102 self.current_config['settings'] = settings
103 if settings_hash:
104 self.current_config['settings_hash'] = settings_hash
105 self._log('Updated config. If this is modified, all workers will be paused', level='debug')
106  
107 def get_current_library_configuration(self):
108 # Fetch all libraries
109 all_plugin_settings = {}
110 for library in Library.get_all_libraries():
111 try:
112 library_config = Library(library.get('id'))
113 except Exception as e:
114 self._log("Unable to fetch library config for ID {}".format(library.get('id')), level='exception')
115 continue
116 # Get list of enabled plugins with their settings
117 enabled_plugins = []
118 for enabled_plugin in library_config.get_enabled_plugins(include_settings=True):
119 enabled_plugins.append({
120 'plugin_id': enabled_plugin.get('plugin_id'),
121 'settings': enabled_plugin.get('settings'),
122 })
123  
124 # Get the plugin flow
125 plugin_flow = library_config.get_plugin_flow()
126  
127 # Append this library's plugin config and flow the the dictionary
128 all_plugin_settings[library.get('id')] = {
129 'enabled_plugins': enabled_plugins,
130 'plugin_flow': plugin_flow,
131 }
132 return all_plugin_settings
133  
134 def configuration_changed(self):
135 current_settings = self.get_current_library_configuration()
136 # Compare current settings with foreman recorded settings.
137 json_encoded_settings = json.dumps(current_settings, sort_keys=True).encode()
138 current_settings_hash = hashlib.md5(json_encoded_settings).hexdigest()
139 if current_settings_hash == self.current_config.get('settings_hash', ''):
140 return False
141 # Record current settings
142 self.save_current_config(settings=current_settings, settings_hash=current_settings_hash)
143 # Settings have changed
144 return True
145  
146 def validate_worker_config(self):
147 valid = True
148 frontend_messages = self.data_queues.get('frontend_messages')
149  
150 # Ensure that the enabled plugins are compatible with the PluginHandler version
151 plugin_handler = PluginsHandler()
152 if plugin_handler.get_incompatible_enabled_plugins(frontend_messages):
153 valid = False
154 if not self.links.within_enabled_link_limits(frontend_messages):
155 valid = False
156  
157 # Check if plugin configuration has been modified. If it has, stop the workers.
158 # What we want to avoid here is someone partially modifying the plugin configuration
159 # and having the workers pickup a job mid configuration.
160 if self.configuration_changed():
161 # Generate a frontend message and falsify validation
162 frontend_messages.put(
163 {
164 'id': 'pluginSettingsChangeWorkersStopped',
165 'type': 'warning',
166 'code': 'pluginSettingsChangeWorkersStopped',
167 'message': '',
168 'timeout': 0
169 }
170 )
171 valid = False
172  
173 # Ensure library config is within limits
174 if not Library.within_library_count_limits(frontend_messages):
175 valid = False
176  
177 return valid
178  
179 def run_task(self, time_now, task, worker_count, worker_group):
180 worker_group_id = worker_group.get_id()
181 self.last_schedule_run = time_now
182 if task == 'pause':
183 # Pause all workers now
184 self._log("Running scheduled event - Pause all worker threads", level='debug')
185 self.pause_all_worker_threads(worker_group_id=worker_group_id)
186 elif task == 'resume':
187 # Resume all workers now
188 self._log("Running scheduled event - Resume all worker threads", level='debug')
189 self.resume_all_worker_threads(worker_group_id=worker_group_id)
190 elif task == 'count':
191 # Set the worker count value
192 # Save the settings so this scheduled event will persist an application restart
193 self._log("Running scheduled event - Setting worker count to '{}'".format(worker_count), level='debug')
194 worker_group.set_number_of_workers(worker_count)
195 worker_group.save()
196  
197 def manage_event_schedules(self):
198 """
199 Manage all scheduled worker events
200 This function limits itself to run only once every 60 seconds
201  
202 :return:
203 """
204 days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
205 day_of_week = datetime.today().today().weekday()
206 time_now = datetime.today().strftime('%H:%M')
207  
208 # Only run once a minute
209 if time_now == self.last_schedule_run:
210 return
211  
212 for wg in WorkerGroup.get_all_worker_groups():
213 try:
214 worker_group = WorkerGroup(group_id=wg.get('id'))
215 event_schedules = worker_group.get_worker_event_schedules()
216 except Exception as e:
217 self._log("While iterating through the worker groups, the worker group disappeared", str(e), level='debug')
218 continue
219  
220 for event_schedule in event_schedules:
221 schedule_time = event_schedule.get('schedule_time')
222 # Ensure we have a schedule time
223 if not schedule_time:
224 continue
225 # Ensure the schedule time is now
226 if time_now != schedule_time:
227 continue
228  
229 repetition = event_schedule.get('repetition')
230 # Ensure we have a repetition
231 if not repetition:
232 continue
233  
234 # Check if it should run
235 if repetition == 'daily':
236 self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
237 worker_group)
238 elif repetition == 'weekday' and days[day_of_week] not in ['saturday', 'sunday']:
239 self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
240 worker_group)
241 elif repetition == 'weekend' and days[day_of_week] in ['saturday', 'sunday']:
242 self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
243 worker_group)
244 elif repetition == days[day_of_week]:
245 self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
246 worker_group)
247  
248 def init_worker_threads(self):
249 # Remove any redundant idle workers from our list
250 # To avoid having the dictionary change size during iteration,
251 # we need to first get the thread_keys, then iterate through that
252 thread_keys = [t for t in self.worker_threads]
253 for thread in thread_keys:
254 if thread in self.worker_threads:
255 if not self.worker_threads[thread].is_alive():
256 del self.worker_threads[thread]
257  
258 # Check that we have enough workers running. Spawn new ones as required.
259 worker_group_ids = []
260 worker_group_names = []
261 for worker_group in WorkerGroup.get_all_worker_groups():
262 worker_group_ids.append(worker_group.get('id'))
263  
264 # Create threads as required
265 for i in range(worker_group.get('number_of_workers')):
266 worker_id = "{}-{}".format(worker_group.get('name'), i)
267 worker_name = "{}-Worker-{}".format(worker_group.get('name'), (i + 1))
268 # Add this name to a list. If the name changes, we can remove old incorrectly named workers
269 worker_group_names.append(worker_name)
270 if worker_id not in self.worker_threads:
271 # This worker does not yet exist, create it
272 self.start_worker_thread(worker_id, worker_name, worker_group.get('id'))
273  
274 # Remove any workers that do not belong. The max number of supported workers is 12
275 for i in range(worker_group.get('number_of_workers'), 12):
276 worker_id = "{}-{}".format(worker_group.get('name'), i)
277 if worker_id in self.worker_threads:
278 # Only remove threads that are idle (never terminate a task just to reduce worker count)
279 if self.worker_threads[worker_id].idle:
280 self.mark_worker_thread_as_redundant(worker_id)
281  
282 # Remove workers for groups that no longer exist
283 for thread in self.worker_threads:
284 worker_group_id = self.worker_threads[thread].worker_group_id
285 worker_name = self.worker_threads[thread].name
286 if worker_group_id not in worker_group_ids or worker_name not in worker_group_names:
287 # Only remove threads that are idle (never terminate a task just to reduce worker count)
288 if self.worker_threads[thread].idle:
289 self.mark_worker_thread_as_redundant(thread)
290  
291 def fetch_available_remote_installation(self, library_name=None):
292 # Fetch the first matching remote worker from the list
293 assigned_installation_id = None
294 assigned_installation_info = {}
295 installation_ids = [t for t in self.available_remote_managers]
296 for installation_id in installation_ids:
297 if installation_id not in self.remote_task_manager_threads:
298 # Check that a remote worker is on an installation with a matching library name
299 installation_library_names = self.available_remote_managers[installation_id].get('library_names', [])
300 if library_name is not None and library_name not in installation_library_names:
301 continue
302 assigned_installation_info = self.available_remote_managers[installation_id]
303 assigned_installation_id = installation_id
304 break
305 return assigned_installation_id, assigned_installation_info
306  
307 def init_remote_task_manager_thread(self, library_name=None):
308 # Fetch the installation ID and info
309 installation_id, installation_info = self.fetch_available_remote_installation(library_name=library_name)
310 del self.available_remote_managers[installation_id]
311  
312 # Ensure a worker was assigned
313 if not installation_info:
314 return False
315  
316 # Startup a thread
317 thread = installation_link.RemoteTaskManager(installation_id,
318 "RemoteTaskManager-{}".format(installation_id),
319 installation_info,
320 self.remote_workers_pending_task_queue,
321 self.complete_queue,
322 self.event)
323 thread.daemon = True
324 thread.start()
325 self.remote_task_manager_threads[installation_id] = thread
326 return True
327  
328 def remove_stale_available_remote_managers(self):
329 """
330 Loop over the current list of available remote managers and remove any that were marked available over X seconds ago
331 This ensures that the data on these manager info lists are up-to-date if the remote installation config changes.
332  
333 :return:
334 """
335 installation_ids = [t for t in self.available_remote_managers]
336 for installation_id in installation_ids:
337 if installation_id not in self.remote_task_manager_threads:
338 # Check that a remote worker is on an installation with a matching library name
339 installation_info = self.available_remote_managers[installation_id]
340 if installation_info.get('created') < datetime.now() - timedelta(seconds=30):
341 del self.available_remote_managers[installation_id]
342  
343 def remove_stopped_remote_task_manager_threads(self):
344 """
345 Remove any redundant link managers from our list
346 Remove any worker IDs from the remote_task_manager_threads list so they are freed up for another link manager thread
347  
348 :return:
349 """
350 # Remove any redundant link managers from our list
351 thread_keys = [t for t in self.remote_task_manager_threads]
352 for thread in thread_keys:
353 if thread in self.remote_task_manager_threads:
354 if not self.remote_task_manager_threads[thread].is_alive():
355 self._log("Removing thread '{}'".format(thread), level='debug')
356 del self.remote_task_manager_threads[thread]
357 continue
358  
359 def terminate_unlinked_remote_task_manager_threads(self):
360 """
361 Mark a manager as redundant if the remote installation configuration has been removed
362  
363 :return:
364 """
365 # Get a list of configured UUIDS
366 configured_uuids = {}
367 for configured_remote_installation in self.settings.get_remote_installations():
368 if configured_remote_installation.get('uuid'):
369 configured_uuids[configured_remote_installation.get('uuid')] = configured_remote_installation.get('address')
370 # Find and remove any redundant link managers from our list
371 term_log_msg = "Remote installation link with {} '{}' has been removed from settings. Marking tread for termination"
372 for thread in self.remote_task_manager_threads:
373 thread_info = self.remote_task_manager_threads[thread].get_info()
374 thread_assigned_uuid = thread_info.get('installation_info', {}).get('uuid')
375 thread_assigned_address = thread_info.get('installation_info', {}).get('address')
376 # Ensure the UUID is still in our config
377 if thread_assigned_uuid not in configured_uuids:
378 self.mark_remote_task_manager_thread_as_redundant(thread)
379 self._log(term_log_msg.format('UUID', thread_assigned_uuid))
380 continue
381 # Ensure the configured address has not changed
382 configured_address = configured_uuids.get(thread_assigned_uuid)
383 if thread_assigned_address not in configured_address:
384 self.mark_remote_task_manager_thread_as_redundant(thread)
385 self._log(term_log_msg.format('address', thread_assigned_address))
386 continue
387  
388 def update_remote_worker_availability_status(self):
389 """
390 Updates the list of available remote managers that can be started
391  
392 :return:
393 """
394 available_installations = self.links.check_remote_installation_for_available_workers()
395 for installation_uuid in available_installations:
396 remote_address = available_installations[installation_uuid].get('address', '')
397 remote_auth = available_installations[installation_uuid].get('auth', 'None')
398 remote_username = available_installations[installation_uuid].get('username', '')
399 remote_password = available_installations[installation_uuid].get('password', '')
400 remote_library_names = available_installations[installation_uuid].get('library_names', [])
401 available_slots = available_installations[installation_uuid].get('available_slots', 0)
402 for slot_number in range(available_slots):
403 remote_manager_id = "{}|M{}".format(installation_uuid, slot_number)
404 if remote_manager_id in self.available_remote_managers or remote_manager_id in self.remote_task_manager_threads:
405 # This worker is already managed by a link manager thread or is already in the list of available workers
406 continue
407 # Add this remote worker ID to the list of available remote managers
408 self.available_remote_managers[remote_manager_id] = {
409 'uuid': installation_uuid,
410 'address': remote_address,
411 'auth': remote_auth,
412 'username': remote_username,
413 'password': remote_password,
414 'library_names': remote_library_names,
415 'created': datetime.now(),
416 }
417  
418 def start_worker_thread(self, worker_id, worker_name, worker_group):
419 thread = Worker(worker_id, worker_name, worker_group, self.workers_pending_task_queue,
420 self.complete_queue, self.event)
421 thread.daemon = True
422 thread.start()
423 self.worker_threads[worker_id] = thread
424  
425 def fetch_available_worker_ids(self):
426 tread_ids = []
427 for thread in self.worker_threads:
428 if self.worker_threads[thread].idle and self.worker_threads[thread].is_alive():
429 if not self.worker_threads[thread].paused:
430 tread_ids.append(self.worker_threads[thread].thread_id)
431 return tread_ids
432  
433 def check_for_idle_workers(self):
434 for thread in self.worker_threads:
435 if self.worker_threads[thread].idle and self.worker_threads[thread].is_alive():
436 if not self.worker_threads[thread].paused:
437 return True
438 return False
439  
440 def check_for_idle_remote_workers(self):
441 if self.available_remote_managers:
442 return True
443 return False
444  
445 def get_available_remote_library_names(self):
446 library_names = []
447 for installation_id in self.available_remote_managers:
448 for library_name in self.available_remote_managers[installation_id].get('library_names', []):
449 if library_name not in library_names:
450 library_names.append(library_name)
451 return library_names
452  
453 def get_tags_configured_for_worker(self, worker_id):
454 """Fetch the tags for a given worker ID"""
455 assigned_worker_group_id = self.worker_threads[worker_id].worker_group_id
456 worker_group = WorkerGroup(group_id=assigned_worker_group_id)
457 return worker_group.get_tags()
458  
459 def postprocessor_queue_full(self):
460 """
461 Check if Post-processor queue is greater than the number of workers enabled.
462 If it is, return True. Else False.
463  
464 :return:
465 """
466 frontend_messages = self.data_queues.get('frontend_messages')
467 # Use the configured worker count + 1 as the post-processor queue limit
468 limit = (int(self.get_total_worker_count()) + 1)
469 # Include a count of all available and busy remote workers for the postprocessor queue limit
470 limit += len(self.available_remote_managers)
471 limit += len(self.remote_task_manager_threads)
472 current_count = len(self.task_queue.list_processed_tasks())
473 if current_count > limit:
474 msg = "There are currently {} items in the post-processor queue. Halting feeding workers until it drops below {}."
475 self._log(msg.format(current_count, limit), level='warning')
476 frontend_messages.update(
477 {
478 'id': 'pendingTaskHaltedPostProcessorQueueFull',
479 'type': 'status',
480 'code': 'pendingTaskHaltedPostProcessorQueueFull',
481 'message': '',
482 'timeout': 0
483 }
484 )
485 return True
486  
487 # Remove the status notification
488 frontend_messages.remove_item('pendingTaskHaltedPostProcessorQueueFull')
489 return False
490  
491 def pause_worker_thread(self, worker_id):
492 """
493 Pauses a single worker thread
494  
495 :param worker_id:
496 :type worker_id:
497 :return:
498 :rtype:
499 """
500 if worker_id not in self.worker_threads:
501 self._log("Asked to pause Worker ID '{}', but this was not found.".format(worker_id), level='warning')
502 return False
503  
504 if not self.worker_threads[worker_id].paused_flag.is_set():
505 self._log("Asked to pause Worker ID '{}'".format(worker_id), level='debug')
506 self.worker_threads[worker_id].paused_flag.set()
507 return True
508  
509 def pause_all_worker_threads(self, worker_group_id=None):
510 """Pause all threads"""
511 result = True
512 for thread in self.worker_threads:
513 # Limit by worker group if requested
514 if worker_group_id and self.worker_threads[thread].worker_group_id != worker_group_id:
515 continue
516 if not self.pause_worker_thread(thread):
517 result = False
518 return result
519  
520 def resume_worker_thread(self, worker_id):
521 """
522 Resume a single worker thread
523  
524 :param worker_id:
525 :type worker_id:
526 :return:
527 :rtype:
528 """
529 self._log("Asked to resume Worker ID '{}'".format(worker_id), level='debug')
530 if worker_id not in self.worker_threads:
531 self._log("Asked to resume Worker ID '{}', but this was not found.".format(worker_id), level='warning')
532 return False
533  
534 self.worker_threads[worker_id].paused_flag.clear()
535 return True
536  
537 def resume_all_worker_threads(self, worker_group_id=None):
538 """Resume all threads"""
539 result = True
540 for thread in self.worker_threads:
541 # Limit by worker group if requested
542 if worker_group_id and self.worker_threads[thread].worker_group_id != worker_group_id:
543 continue
544 if not self.resume_worker_thread(thread):
545 result = False
546 return result
547  
548 def terminate_worker_thread(self, worker_id):
549 """
550 Terminate a single worker thread
551  
552 :param worker_id:
553 :type worker_id:
554 :return:
555 :rtype:
556 """
557 self._log("Asked to terminate Worker ID '{}'".format(worker_id), level='debug')
558 if worker_id not in self.worker_threads:
559 self._log("Asked to terminate Worker ID '{}', but this was not found.".format(worker_id), level='warning')
560 return False
561  
562 self.mark_worker_thread_as_redundant(worker_id)
563 return True
564  
565 def terminate_all_worker_threads(self):
566 """Terminate all threads"""
567 result = True
568 for thread in self.worker_threads:
569 if not self.terminate_worker_thread(thread):
570 result = False
571 return result
572  
573 def mark_worker_thread_as_redundant(self, worker_id):
574 self.worker_threads[worker_id].redundant_flag.set()
575  
576 def mark_remote_task_manager_thread_as_redundant(self, link_manager_id):
577 self.remote_task_manager_threads[link_manager_id].redundant_flag.set()
578  
579 def hand_task_to_workers(self, item, local=True, library_name=None, worker_id=None):
580 if local:
581 # Assign the task to the worker id provided
582 if worker_id in self.worker_threads and self.worker_threads[worker_id].is_alive():
583 self.worker_threads[worker_id].set_task(item)
584 # If the worker thread specified was not available to collect this task, it will be fetched again in the next loop
585 else:
586 # Place into queue for a remote link manager thread to collect
587 self.remote_workers_pending_task_queue.put(item)
588 # Spawn link manager thread to pickup task
589 if not self.init_remote_task_manager_thread(library_name=library_name):
590 # Remove item from queue
591 self.remote_workers_pending_task_queue.get_nowait()
592 # Return failure. This will cause the item to be re-queued at the bottom of the list
593 return False
594 return True
595  
596 def link_manager_tread_heartbeat(self):
597 """
598 Run a list of tasks to test the status of our Link Management threads.
599 Unlike worker threads, Link Management threads live and die for a single task.
600 If a Link Management thread is alive for more than 10 seconds without picking up a task, it will die.
601 This function will reap all dead or completed threads and clean up issues where a thread may have died
602 before running a task that was added to the pending task queue (in which case a new thread should be started)
603  
604 :return:
605 """
606 # Only run heartbeat every 10 seconds
607 time_now = time.time()
608 if self.link_heartbeat_last_run > (time_now - 10):
609 return
610 # self._log("Running remote link manager heartbeat", level='debug')
611 # Terminate remote manager threads for unlinked installations
612 self.terminate_unlinked_remote_task_manager_threads()
613 # Clear out dead threads
614 self.remove_stopped_remote_task_manager_threads()
615 # Clear out old available workers (should last only a minute before being refreshed)
616 self.remove_stale_available_remote_managers()
617 # Check for updates to the worker availability status of linked remote installations
618 self.update_remote_worker_availability_status()
619 # Mark this as the last time run
620 self.link_heartbeat_last_run = time_now
621  
622 def run(self):
623 self._log("Starting Foreman Monitor loop")
624  
625 # Flag to force checking for idle remote workers when set to False.
626 # This will prevent always looping on idle local workers when the local worker's
627 # tags prevent them from taking up tasks
628 allow_local_idle_worker_check = True
629  
630 while not self.abort_flag.is_set():
631 self.event.wait(2)
632  
633 try:
634 # Fetch all completed tasks from workers
635 while not self.abort_flag.is_set() and not self.complete_queue.empty():
636 self.event.wait(.5)
637 try:
638 task_item = self.complete_queue.get_nowait()
639 task_item.set_status('processed')
640 except queue.Empty:
641 continue
642 except Exception as e:
643 self._log("Exception when fetching completed task report from worker", message2=str(e),
644 level="exception")
645  
646 # Setup the correct number of workers
647 if not self.abort_flag.is_set():
648 self.init_worker_threads()
649  
650 # If the worker config is not valid, then pause all workers until it is
651 if not self.validate_worker_config():
652 # Pause all workers
653 self.pause_all_worker_threads()
654 continue
655  
656 # Manage worker event schedules
657 self.manage_event_schedules()
658  
659 if not self.abort_flag.is_set() and not self.task_queue.task_list_pending_is_empty():
660  
661 # Check the status of all link manager threads (close dead ones)
662 self.link_manager_tread_heartbeat()
663  
664 # Check if we are able to start up a worker for another encoding job
665 # These queues holds only one task at a time and is used to hand tasks to the workers
666 if self.workers_pending_task_queue.full() or self.remote_workers_pending_task_queue.full():
667 # In order to simplify the process and run the foreman management in a single thread, if either of
668 # these are full, it means the thread that is assigned to pick up the item has not done so.
669 # In order to prevent a second thread starting and taking the first thread's task, we should not
670 # process any more pending tasks until that first thread is ready and has taken its task out of the
671 # queue.
672 continue
673  
674 # Check if there are any free workers
675 worker_ids = []
676 if allow_local_idle_worker_check and self.check_for_idle_workers():
677 # Local workers are available
678 process_local = True
679 # For local workers, process either local tasks or tasks provided from a remote installation
680 get_local_pending_tasks_only = False
681 # Specify the worker ID that will handle the next task
682 worker_ids = self.fetch_available_worker_ids()
683 # If not workers were available (possibly due to being recycled), just continue loop
684 if not worker_ids:
685 continue
686 elif self.check_for_idle_remote_workers():
687 allow_local_idle_worker_check = True
688 # Remote workers are available
689 process_local = False
690 # For remote workers, only process local tasks. Don't hand remote tasks to another remote installation
691 get_local_pending_tasks_only = True
692 else:
693 allow_local_idle_worker_check = True
694 # All workers are currently busy
695 self.event.wait(1)
696 continue
697  
698 # Check if postprocessor task queue is full
699 if self.postprocessor_queue_full():
700 self.event.wait(5)
701 continue
702  
703 # Fetch the next item in the queue
704 available_worker_id = None
705 next_item_to_process = None
706 if process_local:
707 # For local processing, ensure tags match the available library and worker
708 for worker_id in worker_ids:
709 try:
710 library_tags = self.get_tags_configured_for_worker(worker_id)
711 except Exception as e:
712 # This will happen if the worker group is deleted
713 self._log("Error while fetching the tags for the configured worker", str(e), level='debug')
714 # Break this fore loop. The main while loop wil clean up these workers on the next pass
715 break
716 next_item_to_process = self.task_queue.get_next_pending_tasks(
717 local_only=get_local_pending_tasks_only,
718 library_tags=library_tags)
719 if next_item_to_process:
720 available_worker_id = worker_id
721 break
722 # If no local worker ID was assigned to the given item, then try again in 2 seconds
723 if not available_worker_id:
724 allow_local_idle_worker_check = False
725 self.event.wait(1)
726 continue
727 else:
728 # For remote items, run a search matching an available remote installation library
729 remote_library_names = self.get_available_remote_library_names()
730 next_item_to_process = self.task_queue.get_next_pending_tasks(local_only=get_local_pending_tasks_only,
731 library_names=remote_library_names)
732  
733 if next_item_to_process:
734 try:
735 source_abspath = next_item_to_process.get_source_abspath()
736 task_library_name = next_item_to_process.get_task_library_name()
737 except Exception as e:
738 self._log("Exception in fetching task details", message2=str(e), level="exception")
739 self.event.wait(3)
740 continue
741  
742 self._log("Processing item - {}".format(source_abspath))
743 success = self.hand_task_to_workers(next_item_to_process, local=process_local,
744 library_name=task_library_name,
745 worker_id=available_worker_id)
746 if not success:
747 self._log("Re-queueing tasks. Unable to find worker capable of processing task '{}'".format(
748 next_item_to_process.get_source_abspath()), level="warning")
749 # Re-queue item at the bottom
750 self.task_queue.requeue_tasks_at_bottom(next_item_to_process.get_task_id())
751 except Exception as e:
752 raise Exception(e)
753  
754 self._log("Leaving Foreman Monitor loop...")
755  
756 def get_all_worker_status(self):
757 all_status = []
758 for thread in self.worker_threads:
759 all_status.append(self.worker_threads[thread].get_status())
760 return all_status
761  
762 def get_worker_status(self, worker_id):
763 result = {}
764 for thread in self.worker_threads:
765 if int(worker_id) == int(thread):
766 result = self.worker_threads[thread].get_status()
767 return result