kapsikkum-unmanic – Rev 1
?pathlinks?
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
unmanic.foreman.py
Written by: Josh.5 <jsunnex@gmail.com>
Date: 02 Jan 2019, (7:21 AM)
Copyright:
Copyright (C) Josh Sunnex - All Rights Reserved
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
OR OTHER DEALINGS IN THE SOFTWARE.
"""
import hashlib
import json
import threading
import queue
import time
from datetime import datetime, timedelta
from unmanic.libs import common, installation_link
from unmanic.libs.library import Library
from unmanic.libs.plugins import PluginsHandler
from unmanic.libs.worker_group import WorkerGroup
from unmanic.libs.workers import Worker
class Foreman(threading.Thread):
def __init__(self, data_queues, settings, task_queue, event):
super(Foreman, self).__init__(name='Foreman')
self.settings = settings
self.event = event
self.task_queue = task_queue
self.data_queues = data_queues
self.logger = data_queues["logging"].get_logger(self.name)
self.workers_pending_task_queue = queue.Queue(maxsize=1)
self.remote_workers_pending_task_queue = queue.Queue(maxsize=1)
self.complete_queue = queue.Queue()
self.worker_threads = {}
self.remote_task_manager_threads = {}
self.abort_flag = threading.Event()
self.abort_flag.clear()
# Set the current plugin config
self.current_config = {
'settings': {},
'settings_hash': ''
}
self.configuration_changed()
# Set the current time for scheduler
self.last_schedule_run = datetime.today().strftime('%H:%M')
self.links = installation_link.Links()
self.link_heartbeat_last_run = 0
self.available_remote_managers = {}
def _log(self, message, message2=None, level="info"):
message = common.format_message(message, message2)
getattr(self.logger, level)(message)
def stop(self):
self.abort_flag.set()
# Stop all workers
# To avoid having the dictionary change size during iteration,
# we need to first get the thread_keys, then iterate through that
thread_keys = [t for t in self.worker_threads]
for thread in thread_keys:
self.mark_worker_thread_as_redundant(thread)
# Stop all remote link manager threads
thread_keys = [t for t in self.remote_task_manager_threads]
for thread in thread_keys:
self.mark_remote_task_manager_thread_as_redundant(thread)
def get_total_worker_count(self):
"""Returns the worker count as an integer"""
worker_count = 0
for worker_group in WorkerGroup.get_all_worker_groups():
worker_count += worker_group.get('number_of_workers', 0)
return int(worker_count)
def save_current_config(self, settings=None, settings_hash=None):
if settings:
self.current_config['settings'] = settings
if settings_hash:
self.current_config['settings_hash'] = settings_hash
self._log('Updated config. If this is modified, all workers will be paused', level='debug')
def get_current_library_configuration(self):
# Fetch all libraries
all_plugin_settings = {}
for library in Library.get_all_libraries():
try:
library_config = Library(library.get('id'))
except Exception as e:
self._log("Unable to fetch library config for ID {}".format(library.get('id')), level='exception')
continue
# Get list of enabled plugins with their settings
enabled_plugins = []
for enabled_plugin in library_config.get_enabled_plugins(include_settings=True):
enabled_plugins.append({
'plugin_id': enabled_plugin.get('plugin_id'),
'settings': enabled_plugin.get('settings'),
})
# Get the plugin flow
plugin_flow = library_config.get_plugin_flow()
# Append this library's plugin config and flow the the dictionary
all_plugin_settings[library.get('id')] = {
'enabled_plugins': enabled_plugins,
'plugin_flow': plugin_flow,
}
return all_plugin_settings
def configuration_changed(self):
current_settings = self.get_current_library_configuration()
# Compare current settings with foreman recorded settings.
json_encoded_settings = json.dumps(current_settings, sort_keys=True).encode()
current_settings_hash = hashlib.md5(json_encoded_settings).hexdigest()
if current_settings_hash == self.current_config.get('settings_hash', ''):
return False
# Record current settings
self.save_current_config(settings=current_settings, settings_hash=current_settings_hash)
# Settings have changed
return True
def validate_worker_config(self):
valid = True
frontend_messages = self.data_queues.get('frontend_messages')
# Ensure that the enabled plugins are compatible with the PluginHandler version
plugin_handler = PluginsHandler()
if plugin_handler.get_incompatible_enabled_plugins(frontend_messages):
valid = False
if not self.links.within_enabled_link_limits(frontend_messages):
valid = False
# Check if plugin configuration has been modified. If it has, stop the workers.
# What we want to avoid here is someone partially modifying the plugin configuration
# and having the workers pickup a job mid configuration.
if self.configuration_changed():
# Generate a frontend message and falsify validation
frontend_messages.put(
{
'id': 'pluginSettingsChangeWorkersStopped',
'type': 'warning',
'code': 'pluginSettingsChangeWorkersStopped',
'message': '',
'timeout': 0
}
)
valid = False
# Ensure library config is within limits
if not Library.within_library_count_limits(frontend_messages):
valid = False
return valid
def run_task(self, time_now, task, worker_count, worker_group):
worker_group_id = worker_group.get_id()
self.last_schedule_run = time_now
if task == 'pause':
# Pause all workers now
self._log("Running scheduled event - Pause all worker threads", level='debug')
self.pause_all_worker_threads(worker_group_id=worker_group_id)
elif task == 'resume':
# Resume all workers now
self._log("Running scheduled event - Resume all worker threads", level='debug')
self.resume_all_worker_threads(worker_group_id=worker_group_id)
elif task == 'count':
# Set the worker count value
# Save the settings so this scheduled event will persist an application restart
self._log("Running scheduled event - Setting worker count to '{}'".format(worker_count), level='debug')
worker_group.set_number_of_workers(worker_count)
worker_group.save()
def manage_event_schedules(self):
"""
Manage all scheduled worker events
This function limits itself to run only once every 60 seconds
:return:
"""
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
day_of_week = datetime.today().today().weekday()
time_now = datetime.today().strftime('%H:%M')
# Only run once a minute
if time_now == self.last_schedule_run:
return
for wg in WorkerGroup.get_all_worker_groups():
try:
worker_group = WorkerGroup(group_id=wg.get('id'))
event_schedules = worker_group.get_worker_event_schedules()
except Exception as e:
self._log("While iterating through the worker groups, the worker group disappeared", str(e), level='debug')
continue
for event_schedule in event_schedules:
schedule_time = event_schedule.get('schedule_time')
# Ensure we have a schedule time
if not schedule_time:
continue
# Ensure the schedule time is now
if time_now != schedule_time:
continue
repetition = event_schedule.get('repetition')
# Ensure we have a repetition
if not repetition:
continue
# Check if it should run
if repetition == 'daily':
self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
worker_group)
elif repetition == 'weekday' and days[day_of_week] not in ['saturday', 'sunday']:
self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
worker_group)
elif repetition == 'weekend' and days[day_of_week] in ['saturday', 'sunday']:
self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
worker_group)
elif repetition == days[day_of_week]:
self.run_task(time_now, event_schedule.get('schedule_task'), event_schedule.get('schedule_worker_count'),
worker_group)
def init_worker_threads(self):
# Remove any redundant idle workers from our list
# To avoid having the dictionary change size during iteration,
# we need to first get the thread_keys, then iterate through that
thread_keys = [t for t in self.worker_threads]
for thread in thread_keys:
if thread in self.worker_threads:
if not self.worker_threads[thread].is_alive():
del self.worker_threads[thread]
# Check that we have enough workers running. Spawn new ones as required.
worker_group_ids = []
worker_group_names = []
for worker_group in WorkerGroup.get_all_worker_groups():
worker_group_ids.append(worker_group.get('id'))
# Create threads as required
for i in range(worker_group.get('number_of_workers')):
worker_id = "{}-{}".format(worker_group.get('name'), i)
worker_name = "{}-Worker-{}".format(worker_group.get('name'), (i + 1))
# Add this name to a list. If the name changes, we can remove old incorrectly named workers
worker_group_names.append(worker_name)
if worker_id not in self.worker_threads:
# This worker does not yet exist, create it
self.start_worker_thread(worker_id, worker_name, worker_group.get('id'))
# Remove any workers that do not belong. The max number of supported workers is 12
for i in range(worker_group.get('number_of_workers'), 12):
worker_id = "{}-{}".format(worker_group.get('name'), i)
if worker_id in self.worker_threads:
# Only remove threads that are idle (never terminate a task just to reduce worker count)
if self.worker_threads[worker_id].idle:
self.mark_worker_thread_as_redundant(worker_id)
# Remove workers for groups that no longer exist
for thread in self.worker_threads:
worker_group_id = self.worker_threads[thread].worker_group_id
worker_name = self.worker_threads[thread].name
if worker_group_id not in worker_group_ids or worker_name not in worker_group_names:
# Only remove threads that are idle (never terminate a task just to reduce worker count)
if self.worker_threads[thread].idle:
self.mark_worker_thread_as_redundant(thread)
def fetch_available_remote_installation(self, library_name=None):
# Fetch the first matching remote worker from the list
assigned_installation_id = None
assigned_installation_info = {}
installation_ids = [t for t in self.available_remote_managers]
for installation_id in installation_ids:
if installation_id not in self.remote_task_manager_threads:
# Check that a remote worker is on an installation with a matching library name
installation_library_names = self.available_remote_managers[installation_id].get('library_names', [])
if library_name is not None and library_name not in installation_library_names:
continue
assigned_installation_info = self.available_remote_managers[installation_id]
assigned_installation_id = installation_id
break
return assigned_installation_id, assigned_installation_info
def init_remote_task_manager_thread(self, library_name=None):
# Fetch the installation ID and info
installation_id, installation_info = self.fetch_available_remote_installation(library_name=library_name)
del self.available_remote_managers[installation_id]
# Ensure a worker was assigned
if not installation_info:
return False
# Startup a thread
thread = installation_link.RemoteTaskManager(installation_id,
"RemoteTaskManager-{}".format(installation_id),
installation_info,
self.remote_workers_pending_task_queue,
self.complete_queue,
self.event)
thread.daemon = True
thread.start()
self.remote_task_manager_threads[installation_id] = thread
return True
def remove_stale_available_remote_managers(self):
"""
Loop over the current list of available remote managers and remove any that were marked available over X seconds ago
This ensures that the data on these manager info lists are up-to-date if the remote installation config changes.
:return:
"""
installation_ids = [t for t in self.available_remote_managers]
for installation_id in installation_ids:
if installation_id not in self.remote_task_manager_threads:
# Check that a remote worker is on an installation with a matching library name
installation_info = self.available_remote_managers[installation_id]
if installation_info.get('created') < datetime.now() - timedelta(seconds=30):
del self.available_remote_managers[installation_id]
def remove_stopped_remote_task_manager_threads(self):
"""
Remove any redundant link managers from our list
Remove any worker IDs from the remote_task_manager_threads list so they are freed up for another link manager thread
:return:
"""
# Remove any redundant link managers from our list
thread_keys = [t for t in self.remote_task_manager_threads]
for thread in thread_keys:
if thread in self.remote_task_manager_threads:
if not self.remote_task_manager_threads[thread].is_alive():
self._log("Removing thread '{}'".format(thread), level='debug')
del self.remote_task_manager_threads[thread]
continue
def terminate_unlinked_remote_task_manager_threads(self):
"""
Mark a manager as redundant if the remote installation configuration has been removed
:return:
"""
# Get a list of configured UUIDS
configured_uuids = {}
for configured_remote_installation in self.settings.get_remote_installations():
if configured_remote_installation.get('uuid'):
configured_uuids[configured_remote_installation.get('uuid')] = configured_remote_installation.get('address')
# Find and remove any redundant link managers from our list
term_log_msg = "Remote installation link with {} '{}' has been removed from settings. Marking tread for termination"
for thread in self.remote_task_manager_threads:
thread_info = self.remote_task_manager_threads[thread].get_info()
thread_assigned_uuid = thread_info.get('installation_info', {}).get('uuid')
thread_assigned_address = thread_info.get('installation_info', {}).get('address')
# Ensure the UUID is still in our config
if thread_assigned_uuid not in configured_uuids:
self.mark_remote_task_manager_thread_as_redundant(thread)
self._log(term_log_msg.format('UUID', thread_assigned_uuid))
continue
# Ensure the configured address has not changed
configured_address = configured_uuids.get(thread_assigned_uuid)
if thread_assigned_address not in configured_address:
self.mark_remote_task_manager_thread_as_redundant(thread)
self._log(term_log_msg.format('address', thread_assigned_address))
continue
def update_remote_worker_availability_status(self):
"""
Updates the list of available remote managers that can be started
:return:
"""
available_installations = self.links.check_remote_installation_for_available_workers()
for installation_uuid in available_installations:
remote_address = available_installations[installation_uuid].get('address', '')
remote_auth = available_installations[installation_uuid].get('auth', 'None')
remote_username = available_installations[installation_uuid].get('username', '')
remote_password = available_installations[installation_uuid].get('password', '')
remote_library_names = available_installations[installation_uuid].get('library_names', [])
available_slots = available_installations[installation_uuid].get('available_slots', 0)
for slot_number in range(available_slots):
remote_manager_id = "{}|M{}".format(installation_uuid, slot_number)
if remote_manager_id in self.available_remote_managers or remote_manager_id in self.remote_task_manager_threads:
# This worker is already managed by a link manager thread or is already in the list of available workers
continue
# Add this remote worker ID to the list of available remote managers
self.available_remote_managers[remote_manager_id] = {
'uuid': installation_uuid,
'address': remote_address,
'auth': remote_auth,
'username': remote_username,
'password': remote_password,
'library_names': remote_library_names,
'created': datetime.now(),
}
def start_worker_thread(self, worker_id, worker_name, worker_group):
thread = Worker(worker_id, worker_name, worker_group, self.workers_pending_task_queue,
self.complete_queue, self.event)
thread.daemon = True
thread.start()
self.worker_threads[worker_id] = thread
def fetch_available_worker_ids(self):
tread_ids = []
for thread in self.worker_threads:
if self.worker_threads[thread].idle and self.worker_threads[thread].is_alive():
if not self.worker_threads[thread].paused:
tread_ids.append(self.worker_threads[thread].thread_id)
return tread_ids
def check_for_idle_workers(self):
for thread in self.worker_threads:
if self.worker_threads[thread].idle and self.worker_threads[thread].is_alive():
if not self.worker_threads[thread].paused:
return True
return False
def check_for_idle_remote_workers(self):
if self.available_remote_managers:
return True
return False
def get_available_remote_library_names(self):
library_names = []
for installation_id in self.available_remote_managers:
for library_name in self.available_remote_managers[installation_id].get('library_names', []):
if library_name not in library_names:
library_names.append(library_name)
return library_names
def get_tags_configured_for_worker(self, worker_id):
"""Fetch the tags for a given worker ID"""
assigned_worker_group_id = self.worker_threads[worker_id].worker_group_id
worker_group = WorkerGroup(group_id=assigned_worker_group_id)
return worker_group.get_tags()
def postprocessor_queue_full(self):
"""
Check if Post-processor queue is greater than the number of workers enabled.
If it is, return True. Else False.
:return:
"""
frontend_messages = self.data_queues.get('frontend_messages')
# Use the configured worker count + 1 as the post-processor queue limit
limit = (int(self.get_total_worker_count()) + 1)
# Include a count of all available and busy remote workers for the postprocessor queue limit
limit += len(self.available_remote_managers)
limit += len(self.remote_task_manager_threads)
current_count = len(self.task_queue.list_processed_tasks())
if current_count > limit:
msg = "There are currently {} items in the post-processor queue. Halting feeding workers until it drops below {}."
self._log(msg.format(current_count, limit), level='warning')
frontend_messages.update(
{
'id': 'pendingTaskHaltedPostProcessorQueueFull',
'type': 'status',
'code': 'pendingTaskHaltedPostProcessorQueueFull',
'message': '',
'timeout': 0
}
)
return True
# Remove the status notification
frontend_messages.remove_item('pendingTaskHaltedPostProcessorQueueFull')
return False
def pause_worker_thread(self, worker_id):
"""
Pauses a single worker thread
:param worker_id:
:type worker_id:
:return:
:rtype:
"""
if worker_id not in self.worker_threads:
self._log("Asked to pause Worker ID '{}', but this was not found.".format(worker_id), level='warning')
return False
if not self.worker_threads[worker_id].paused_flag.is_set():
self._log("Asked to pause Worker ID '{}'".format(worker_id), level='debug')
self.worker_threads[worker_id].paused_flag.set()
return True
def pause_all_worker_threads(self, worker_group_id=None):
"""Pause all threads"""
result = True
for thread in self.worker_threads:
# Limit by worker group if requested
if worker_group_id and self.worker_threads[thread].worker_group_id != worker_group_id:
continue
if not self.pause_worker_thread(thread):
result = False
return result
def resume_worker_thread(self, worker_id):
"""
Resume a single worker thread
:param worker_id:
:type worker_id:
:return:
:rtype:
"""
self._log("Asked to resume Worker ID '{}'".format(worker_id), level='debug')
if worker_id not in self.worker_threads:
self._log("Asked to resume Worker ID '{}', but this was not found.".format(worker_id), level='warning')
return False
self.worker_threads[worker_id].paused_flag.clear()
return True
def resume_all_worker_threads(self, worker_group_id=None):
"""Resume all threads"""
result = True
for thread in self.worker_threads:
# Limit by worker group if requested
if worker_group_id and self.worker_threads[thread].worker_group_id != worker_group_id:
continue
if not self.resume_worker_thread(thread):
result = False
return result
def terminate_worker_thread(self, worker_id):
"""
Terminate a single worker thread
:param worker_id:
:type worker_id:
:return:
:rtype:
"""
self._log("Asked to terminate Worker ID '{}'".format(worker_id), level='debug')
if worker_id not in self.worker_threads:
self._log("Asked to terminate Worker ID '{}', but this was not found.".format(worker_id), level='warning')
return False
self.mark_worker_thread_as_redundant(worker_id)
return True
def terminate_all_worker_threads(self):
"""Terminate all threads"""
result = True
for thread in self.worker_threads:
if not self.terminate_worker_thread(thread):
result = False
return result
def mark_worker_thread_as_redundant(self, worker_id):
self.worker_threads[worker_id].redundant_flag.set()
def mark_remote_task_manager_thread_as_redundant(self, link_manager_id):
self.remote_task_manager_threads[link_manager_id].redundant_flag.set()
def hand_task_to_workers(self, item, local=True, library_name=None, worker_id=None):
if local:
# Assign the task to the worker id provided
if worker_id in self.worker_threads and self.worker_threads[worker_id].is_alive():
self.worker_threads[worker_id].set_task(item)
# If the worker thread specified was not available to collect this task, it will be fetched again in the next loop
else:
# Place into queue for a remote link manager thread to collect
self.remote_workers_pending_task_queue.put(item)
# Spawn link manager thread to pickup task
if not self.init_remote_task_manager_thread(library_name=library_name):
# Remove item from queue
self.remote_workers_pending_task_queue.get_nowait()
# Return failure. This will cause the item to be re-queued at the bottom of the list
return False
return True
def link_manager_tread_heartbeat(self):
"""
Run a list of tasks to test the status of our Link Management threads.
Unlike worker threads, Link Management threads live and die for a single task.
If a Link Management thread is alive for more than 10 seconds without picking up a task, it will die.
This function will reap all dead or completed threads and clean up issues where a thread may have died
before running a task that was added to the pending task queue (in which case a new thread should be started)
:return:
"""
# Only run heartbeat every 10 seconds
time_now = time.time()
if self.link_heartbeat_last_run > (time_now - 10):
return
# self._log("Running remote link manager heartbeat", level='debug')
# Terminate remote manager threads for unlinked installations
self.terminate_unlinked_remote_task_manager_threads()
# Clear out dead threads
self.remove_stopped_remote_task_manager_threads()
# Clear out old available workers (should last only a minute before being refreshed)
self.remove_stale_available_remote_managers()
# Check for updates to the worker availability status of linked remote installations
self.update_remote_worker_availability_status()
# Mark this as the last time run
self.link_heartbeat_last_run = time_now
def run(self):
self._log("Starting Foreman Monitor loop")
# Flag to force checking for idle remote workers when set to False.
# This will prevent always looping on idle local workers when the local worker's
# tags prevent them from taking up tasks
allow_local_idle_worker_check = True
while not self.abort_flag.is_set():
self.event.wait(2)
try:
# Fetch all completed tasks from workers
while not self.abort_flag.is_set() and not self.complete_queue.empty():
self.event.wait(.5)
try:
task_item = self.complete_queue.get_nowait()
task_item.set_status('processed')
except queue.Empty:
continue
except Exception as e:
self._log("Exception when fetching completed task report from worker", message2=str(e),
level="exception")
# Setup the correct number of workers
if not self.abort_flag.is_set():
self.init_worker_threads()
# If the worker config is not valid, then pause all workers until it is
if not self.validate_worker_config():
# Pause all workers
self.pause_all_worker_threads()
continue
# Manage worker event schedules
self.manage_event_schedules()
if not self.abort_flag.is_set() and not self.task_queue.task_list_pending_is_empty():
# Check the status of all link manager threads (close dead ones)
self.link_manager_tread_heartbeat()
# Check if we are able to start up a worker for another encoding job
# These queues holds only one task at a time and is used to hand tasks to the workers
if self.workers_pending_task_queue.full() or self.remote_workers_pending_task_queue.full():
# In order to simplify the process and run the foreman management in a single thread, if either of
# these are full, it means the thread that is assigned to pick up the item has not done so.
# In order to prevent a second thread starting and taking the first thread's task, we should not
# process any more pending tasks until that first thread is ready and has taken its task out of the
# queue.
continue
# Check if there are any free workers
worker_ids = []
if allow_local_idle_worker_check and self.check_for_idle_workers():
# Local workers are available
process_local = True
# For local workers, process either local tasks or tasks provided from a remote installation
get_local_pending_tasks_only = False
# Specify the worker ID that will handle the next task
worker_ids = self.fetch_available_worker_ids()
# If not workers were available (possibly due to being recycled), just continue loop
if not worker_ids:
continue
elif self.check_for_idle_remote_workers():
allow_local_idle_worker_check = True
# Remote workers are available
process_local = False
# For remote workers, only process local tasks. Don't hand remote tasks to another remote installation
get_local_pending_tasks_only = True
else:
allow_local_idle_worker_check = True
# All workers are currently busy
self.event.wait(1)
continue
# Check if postprocessor task queue is full
if self.postprocessor_queue_full():
self.event.wait(5)
continue
# Fetch the next item in the queue
available_worker_id = None
next_item_to_process = None
if process_local:
# For local processing, ensure tags match the available library and worker
for worker_id in worker_ids:
try:
library_tags = self.get_tags_configured_for_worker(worker_id)
except Exception as e:
# This will happen if the worker group is deleted
self._log("Error while fetching the tags for the configured worker", str(e), level='debug')
# Break this fore loop. The main while loop wil clean up these workers on the next pass
break
next_item_to_process = self.task_queue.get_next_pending_tasks(
local_only=get_local_pending_tasks_only,
library_tags=library_tags)
if next_item_to_process:
available_worker_id = worker_id
break
# If no local worker ID was assigned to the given item, then try again in 2 seconds
if not available_worker_id:
allow_local_idle_worker_check = False
self.event.wait(1)
continue
else:
# For remote items, run a search matching an available remote installation library
remote_library_names = self.get_available_remote_library_names()
next_item_to_process = self.task_queue.get_next_pending_tasks(local_only=get_local_pending_tasks_only,
library_names=remote_library_names)
if next_item_to_process:
try:
source_abspath = next_item_to_process.get_source_abspath()
task_library_name = next_item_to_process.get_task_library_name()
except Exception as e:
self._log("Exception in fetching task details", message2=str(e), level="exception")
self.event.wait(3)
continue
self._log("Processing item - {}".format(source_abspath))
success = self.hand_task_to_workers(next_item_to_process, local=process_local,
library_name=task_library_name,
worker_id=available_worker_id)
if not success:
self._log("Re-queueing tasks. Unable to find worker capable of processing task '{}'".format(
next_item_to_process.get_source_abspath()), level="warning")
# Re-queue item at the bottom
self.task_queue.requeue_tasks_at_bottom(next_item_to_process.get_task_id())
except Exception as e:
raise Exception(e)
self._log("Leaving Foreman Monitor loop...")
def get_all_worker_status(self):
all_status = []
for thread in self.worker_threads:
all_status.append(self.worker_threads[thread].get_status())
return all_status
def get_worker_status(self, worker_id):
result = {}
for thread in self.worker_threads:
if int(worker_id) == int(thread):
result = self.worker_threads[thread].get_status()
return result