kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.workers.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 11 Aug 2021, (12:06 PM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import hashlib
33 import os
34 import queue
35 import shutil
36 import subprocess
37 import threading
38 import time
39  
40 import psutil
41  
42 from unmanic.libs import common, unlogger
43 from unmanic.libs.plugins import PluginsHandler
44  
45  
46 def default_progress_parser(line_text):
47 return {
48 'percent': ''
49 }
50  
51  
52 class WorkerCommandError(Exception):
53 def __init___(self, command):
54 Exception.__init__(self, "Worker command returned non 0 status. Command: {}".format(command))
55 self.command = command
56  
57  
58 class Worker(threading.Thread):
59 idle = True
60 paused = False
61  
62 current_task = None
63 worker_log = None
64 start_time = None
65 finish_time = None
66  
67 worker_subprocess = None
68 worker_subprocess_pid = None
69 worker_subprocess_percent = None
70 worker_subprocess_elapsed = None
71  
72 worker_runners_info = {}
73  
74 def __init__(self, thread_id, name, worker_group_id, pending_queue, complete_queue, event):
75 super(Worker, self).__init__(name=name)
76 self.thread_id = thread_id
77 self.name = name
78 self.worker_group_id = worker_group_id
79 self.event = event
80  
81 self.current_task = None
82 self.pending_queue = pending_queue
83 self.complete_queue = complete_queue
84  
85 # Create 'redundancy' flag. When this is set, the worker should die
86 self.redundant_flag = threading.Event()
87 self.redundant_flag.clear()
88  
89 # Create 'paused' flag. When this is set, the worker should be paused
90 self.paused_flag = threading.Event()
91 self.paused_flag.clear()
92  
93 # Create logger for this worker
94 unmanic_logging = unlogger.UnmanicLogger.__call__()
95 self.logger = unmanic_logging.get_logger(self.name)
96  
97 def _log(self, message, message2='', level="info"):
98 message = common.format_message(message, message2)
99 getattr(self.logger, level)(message)
100  
101 def run(self):
102 self._log("Starting worker")
103 while not self.redundant_flag.is_set():
104 self.event.wait(1) # Add delay for preventing loop maxing compute resources
105  
106 # If the Foreman has paused this worker, then don't do anything
107 if self.paused_flag.is_set():
108 self.paused = True
109 # If the worker is paused, wait for 5 seconds before continuing the loop
110 self.event.wait(5)
111 continue
112 self.paused = False
113  
114 # Set the worker as Idle - This will announce to the Foreman that it's ready for a task
115 self.idle = True
116  
117 # Wait for task
118 while not self.redundant_flag.is_set() and self.current_task:
119 self.event.wait(.5) # Add delay for preventing loop maxing compute resources
120  
121 try:
122 # Process the set task
123 self.__process_task_queue_item()
124 except queue.Empty:
125 continue
126 except Exception as e:
127 self._log("Exception in processing job with {}:".format(self.name), message2=str(e),
128 level="exception")
129  
130 self._log("Stopping worker")
131  
132 def set_task(self, new_task):
133 """Sets the given task to the worker class"""
134 # Ensure only one task can be set for a worker
135 if self.current_task:
136 return
137 # Set the task
138 self.current_task = new_task
139 self.worker_log = []
140 self.idle = False
141  
142 def get_status(self):
143 """
144 Fetch the status of this worker.
145  
146 TODO: Fetch subprocess pid
147  
148 :return:
149 """
150 status = {
151 'id': str(self.thread_id),
152 'name': self.name,
153 'idle': self.idle,
154 'paused': self.paused,
155 'start_time': None if not self.start_time else str(self.start_time),
156 'current_task': None,
157 'current_file': "",
158 'worker_log_tail': [],
159 'runners_info': {},
160 'subprocess': {
161 'pid': self.ident,
162 'percent': str(self.worker_subprocess_percent),
163 'elapsed': str(self.worker_subprocess_elapsed),
164 },
165 }
166 if self.current_task:
167 # Fetch the current file
168 try:
169 status['current_task'] = self.current_task.get_task_id()
170 except Exception as e:
171 self._log("Exception in fetching the current task ID for worker {}:".format(self.name), message2=str(e),
172 level="exception")
173  
174 # Fetch the current file
175 try:
176 status['current_file'] = self.current_task.get_source_basename()
177 except Exception as e:
178 self._log("Exception in fetching the current file of worker {}:".format(self.name), message2=str(e),
179 level="exception")
180  
181 # Append the worker log tail
182 try:
183 if self.worker_log and len(self.worker_log) > 20:
184 status['worker_log_tail'] = self.worker_log[-19:]
185 else:
186 status['worker_log_tail'] = self.worker_log
187 except Exception as e:
188 self._log("Exception in fetching log tail of worker: ", message2=str(e),
189 level="exception")
190  
191 # Append the runners info
192 try:
193 status['runners_info'] = self.worker_runners_info
194 except Exception as e:
195 self._log("Exception in runners info of worker {}:".format(self.name), message2=str(e),
196 level="exception")
197 return status
198  
199 def __unset_current_task(self):
200 self.current_task = None
201 self.worker_runners_info = {}
202 self.worker_log = []
203  
204 def __process_task_queue_item(self):
205 """
206 Processes the set task.
207  
208 :return:
209 """
210 # Mark worker as not idle now that it is processing a task
211 self.idle = False
212  
213 # Set the progress to an empty string
214 self.worker_subprocess_percent = ''
215 self.worker_subprocess_elapsed = '0'
216  
217 # Log the start of the job
218 self._log("Picked up job - {}".format(self.current_task.get_source_abspath()))
219  
220 # Mark as being "in progress"
221 self.current_task.set_status('in_progress')
222  
223 # Start current task stats
224 self.__set_start_task_stats()
225  
226 # Process the file. Will return true if success, otherwise false
227 success = self.__exec_worker_runners_on_set_task()
228 # Mark the task as either success or not
229 self.current_task.set_success(success)
230  
231 # Mark task completion statistics
232 self.__set_finish_task_stats()
233  
234 # Log completion of job
235 self._log("Finished job - {}".format(self.current_task.get_source_abspath()))
236  
237 # Place the task into the completed queue
238 self.complete_queue.put(self.current_task)
239  
240 # Reset the current file info for the next task
241 self.__unset_current_task()
242  
243 def __set_start_task_stats(self):
244 """Sets the initial stats for the start of a task"""
245 # Set the start time to now
246 self.start_time = time.time()
247  
248 # Clear the finish time
249 self.finish_time = None
250  
251 # Format our starting statistics data
252 self.current_task.task.processed_by_worker = self.name
253 self.current_task.task.start_time = self.start_time
254 self.current_task.task.finish_time = self.finish_time
255  
256 def __set_finish_task_stats(self):
257 """Sets the final stats for the end of a task"""
258 # Set the finish time to now
259 self.finish_time = time.time()
260  
261 # Set the finish time in the statistics data
262 self.current_task.task.finish_time = self.finish_time
263  
264 def __exec_worker_runners_on_set_task(self):
265 """
266 Executes the configured plugin runners against the set task.
267  
268 :return:
269 """
270 # Init plugins
271 library_id = self.current_task.get_task_library_id()
272 plugin_handler = PluginsHandler()
273 plugin_modules = plugin_handler.get_enabled_plugin_modules_by_type('worker.process_item', library_id=library_id)
274  
275 # Create dictionary of runners info for the frontend
276 self.worker_runners_info = {}
277 for plugin_module in plugin_modules:
278 self.worker_runners_info[plugin_module.get('plugin_id')] = {
279 'plugin_id': plugin_module.get('plugin_id'),
280 'status': 'pending',
281 "name": plugin_module.get('name'),
282 "author": plugin_module.get('author'),
283 "version": plugin_module.get('version'),
284 "icon": plugin_module.get('icon'),
285 "description": plugin_module.get('description'),
286 }
287  
288 # Set the absolute path to the original file
289 original_abspath = self.current_task.get_source_abspath()
290  
291 # Process item in loop.
292 # First process the item for for each plugin that configures it, then run the default Unmanic configuration
293 task_cache_path = self.current_task.get_cache_path()
294 # Set the current input file to the original file path
295 file_in = original_abspath
296 # Mark the overall success of all runners. This will be set to False if any of the runners fails.
297 overall_success = True
298 # Set the current file out to nothing.
299 # This will be configured by each runner.
300 # If no runners are configured, then nothing needs to be done.
301 current_file_out = original_abspath
302 # The number of runners that have been run
303 runner_count = 0
304 # Flag if a task has run a command
305 no_exec_command_run = True
306  
307 # Generate default data object for the runner functions
308 data = {
309 "worker_log": self.worker_log,
310 "library_id": library_id,
311 "exec_command": [],
312 "command_progress_parser": default_progress_parser,
313 "file_in": file_in,
314 "file_out": None,
315 "original_file_path": original_abspath,
316 "repeat": False,
317 }
318  
319 for plugin_module in plugin_modules:
320 # Increment the runners count (first runner will be set as #1)
321 runner_count += 1
322  
323 if not overall_success:
324 # If one of the Plugins fails, don't continue.
325 # The Plugins could be co-dependant and the final file will not go anywhere if 'overall_success' is False
326 break
327  
328 # Mark the status of the worker for the frontend
329 self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'in_progress'
330 self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False
331  
332 # Loop over runner. This way we can repeat the function with the same data if requested by the repeat flag
333 runner_pass_count = 0
334 while not self.redundant_flag.is_set():
335 runner_pass_count += 1
336  
337 # Fetch file out details
338 # This creates a temp file labeled "WORKING" that will be moved to the cache_path on completion
339 split_file_out = os.path.splitext(task_cache_path)
340 split_file_in = os.path.splitext(file_in)
341 file_out = "{}-{}-{}-{}{}".format(split_file_out[0], "WORKING", runner_count, runner_pass_count,
342 split_file_in[1])
343  
344 # Reset data object for this runner functions
345 data['library_id'] = library_id
346 data['exec_command'] = []
347 data['command_progress_parser'] = default_progress_parser
348 data['file_in'] = file_in
349 data['file_out'] = file_out
350 data['original_file_path'] = original_abspath
351 data['repeat'] = False
352  
353 self.event.wait(.2) # Add delay for preventing loop maxing compute resources
354 self.worker_log.append("\n\nRUNNER: \n{} [Pass #{}]\n\n".format(plugin_module.get('name'), runner_pass_count))
355 self.worker_log.append("\nExecuting plugin runner... Please wait\n")
356  
357 # Run plugin to update data
358 if not plugin_handler.exec_plugin_runner(data, plugin_module.get('plugin_id'), 'worker.process_item'):
359 # Skip this plugin module's loop
360 self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'complete'
361 self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False
362 # Set overall success status to failed
363 overall_success = False
364 # Append long entry to say the worker was terminated
365 self.worker_log.append("\n\nPLUGIN FAILED!")
366 self.worker_log.append("\nFailed to execute Plugin '{}'".format(plugin_module.get('name')))
367 self.worker_log.append("\nCheck Unmanic logs for more information")
368 break
369  
370 # Log the in and out files returned by the plugin runner for debugging
371 self._log("Worker process '{}' (in)".format(plugin_module.get('plugin_id')), data.get("file_in"),
372 level='debug')
373 self._log("Worker process '{}' (out)".format(plugin_module.get('plugin_id')), data.get("file_out"),
374 level='debug')
375  
376 # Only run the conversion process if "exec_command" is not empty
377 if data.get("exec_command"):
378 self.worker_log.append("\nPlugin runner requested for a command to be executed by Unmanic")
379  
380 # Exec command as subprocess
381 success = self.__exec_command_subprocess(data)
382 no_exec_command_run = False
383  
384 if self.redundant_flag.is_set():
385 # This worker has been marked as redundant. It is being terminated.
386 self._log("Worker has been terminated before a command was completed", level="warning")
387 # Mark runner as failed
388 self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False
389 # Set overall success status to failed
390 overall_success = False
391 # Append long entry to say the worker was terminated
392 self.worker_log.append("\n\nWORKER TERMINATED!")
393 # Don't continue
394 break
395  
396 # Run command. Check if command exited successfully.
397 if success:
398 # If file conversion was successful
399 self._log("Successfully ran worker process '{}' on file '{}'".format(plugin_module.get('plugin_id'),
400 data.get("file_in")))
401 # Ensure the 'file_out' that was specified by the plugin to be created was actually created.
402 if os.path.exists(data.get('file_out')):
403 # The outfile exists...
404 # In order to clean up as we go and avoid unnecessary RAM/disk use in the cache directory,
405 # we want to removed the 'file_in' file.
406 # We want to ensure that we do not accidentally remove any original files here.
407 # To avoid this, run x2 tests.
408 # First, check current 'file_in' is not the original file.
409 if os.path.abspath(data.get("file_in")) != os.path.abspath(original_abspath):
410 # Second, check that the 'file_in' is in cache directory.
411 if "unmanic_file_conversion" in os.path.abspath(data.get("file_in")):
412 # Remove this file
413 os.remove(os.path.abspath(data.get("file_in")))
414  
415 # Set the new 'file_in' as the previous runner's 'file_out' for the next loop
416 file_in = data.get("file_out")
417 else:
418 # If file conversion was successful
419 self._log(
420 "Error while running worker process '{}' on file '{}'".format(
421 plugin_module.get('plugin_id'),
422 original_abspath
423 ),
424 level="error")
425 self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False
426 overall_success = False
427  
428 # Ensure the new 'file_in' is set to the previous runner's 'file_in' for the next loop
429 file_in = data.get("file_in")
430 else:
431 # Ensure the new 'file_in' is set to the previous runner's 'file_in' for the next loop
432 file_in = data.get("file_in")
433 # Log that this plugin did not request to execute anything
434 self.worker_log.append("\nRunner did not request for Unmanic to execute a command")
435 self._log(
436 "Worker process '{}' did not request to execute a command.".format(plugin_module.get('plugin_id')),
437 level='debug')
438  
439 if os.path.exists(data.get('file_out')):
440 # Set the current file out to the most recently completed cache file
441 # If the file out does not exist, it is likely never used by the plugin.
442 current_file_out = data.get('file_out')
443 else:
444 # Ensure the current_file_out is set the currently set 'file_in'
445 current_file_out = data.get('file_in')
446  
447 if data.get("repeat"):
448 # The returned data contained the 'repeat'' flag.
449 # Run another pass against this same plugin
450 continue
451 break
452  
453 self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = True
454 self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'complete'
455  
456 # Log if no command was run by any Plugins
457 if no_exec_command_run:
458 # If no jobs were carried out on this task
459 self._log("No Plugin requested for Unmanic to run commands for this file '{}'".format(original_abspath), level='warning')
460 self.worker_log.append("\n\nNo Plugin requested for Unmanic to run commands for this file '{}'".format(original_abspath))
461  
462 # Save the completed command log
463 self.current_task.save_command_log(self.worker_log)
464  
465 # If all plugins that were executed completed successfully, then this was overall a successful task.
466 # At this point we need to move the final out file to the original task cache path so the postprocessor can collect it.
467 if overall_success:
468 # If jobs carried out on this task were all successful, we will get here
469 self._log("Successfully completed Worker processing on file '{}'".format(original_abspath))
470  
471 # Attempt to move the final output file to the final cache file path for the postprocessor
472 try:
473 # Set the new file out as the extension may have changed
474 split_file_name = os.path.splitext(current_file_out)
475 file_extension = split_file_name[1].lstrip('.')
476 cache_directory = os.path.dirname(os.path.abspath(task_cache_path))
477 self.current_task.set_cache_path(cache_directory, file_extension)
478 # Read the updated cache path
479 task_cache_path = self.current_task.get_cache_path()
480  
481 # Move file to original cache path
482 self._log("Moving final cache file from '{}' to '{}'".format(current_file_out, task_cache_path))
483 current_file_out = os.path.abspath(current_file_out)
484  
485 # There is a really odd intermittent bug with the shutil module that is causing it to
486 # sometimes report that the file does not exist.
487 # This section adds a small pause and logs the error if that is the case.
488 # I have not yet figured out a solution as this is difficult to reproduce.
489 if not os.path.exists(current_file_out):
490 self._log("Error - current_file_out path does not exist! '{}'".format(file_in), level="error")
491 self.event.wait(1)
492  
493 # Ensure the cache directory exists
494 if not os.path.exists(cache_directory):
495 os.makedirs(cache_directory)
496  
497 # Check that the current file out is not the original source file
498 if os.path.abspath(current_file_out) == os.path.abspath(original_abspath):
499 # The current file out is not a cache file, the file must have never been modified.
500 # This can happen if all Plugins failed to run, or a Plugin specifically reset the out
501 # file to the original source in order to preserve it.
502 # In this circumstance, we want to create a cache copy and let the process continue.
503 self._log("Final cache file is the same path as the original source. Creating cache copy.", level='debug')
504 shutil.copyfile(current_file_out, task_cache_path)
505 else:
506 # Use shutil module to move the file to the final task cache location
507 shutil.move(current_file_out, task_cache_path)
508 except Exception as e:
509 self._log("Exception in final move operation of file {} to {}:".format(current_file_out, task_cache_path),
510 message2=str(e), level="exception")
511 return False
512  
513 # Return True
514 return True
515  
516 # If the overall result of the jobs carried out on this task were not successful, we will get here.
517 # Log the failure and return False
518 self._log("Failed to process task for file '{}'".format(original_abspath), level='warning')
519 return False
520  
521 def __log_proc_terminated(self, proc):
522 self._log("Process {} terminated with exit code {}".format(proc, proc.returncode))
523  
524 def __terminate_proc_tree(self, proc: psutil.Process):
525 """
526 Terminate the process tree (including grandchildren).
527 Processes that fail to stop with SIGTERM will be sent a SIGKILL.
528  
529 :param proc:
530 :return:
531 """
532  
533 children = proc.children(recursive=True)
534 children.append(proc)
535 for p in children:
536 try:
537 p.terminate()
538 except psutil.NoSuchProcess:
539 pass
540 gone, alive = psutil.wait_procs(children, timeout=3, callback=self.__log_proc_terminated)
541 for p in alive:
542 try:
543 p.kill()
544 except psutil.NoSuchProcess:
545 pass
546 psutil.wait_procs(alive, timeout=3, callback=self.__log_proc_terminated)
547  
548 def __exec_command_subprocess(self, data):
549 """
550 Executes a command as a shell subprocess.
551 Uses the given parser to record progress data from the shell STDOUT.
552  
553 :param data:
554 :return:
555 """
556 # Fetch command to execute.
557 exec_command = data.get("exec_command", [])
558  
559 # Fetch the command progress parser function
560 command_progress_parser = data.get("command_progress_parser", default_progress_parser)
561  
562 # Log the command for debugging
563 command_string = exec_command
564 if isinstance(exec_command, list):
565 command_string = ' '.join(exec_command)
566 self._log("Executing: {}".format(command_string), level='debug')
567  
568 # Append start of command to worker subprocess stdout
569 self.worker_log += [
570 '\n\n',
571 'COMMAND:\n',
572 command_string,
573 '\n\n',
574 'LOG:\n',
575 ]
576  
577 # Create output path if not exists
578 common.ensure_dir(data.get("file_out"))
579  
580 # Convert file
581 try:
582 proc_pause_time = 0
583 proc_start_time = time.time()
584 # Execute command
585 if isinstance(exec_command, list):
586 sub_proc = subprocess.Popen(exec_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
587 universal_newlines=True, errors='replace')
588 elif isinstance(exec_command, str):
589 sub_proc = subprocess.Popen(exec_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
590 universal_newlines=True, errors='replace', shell=True)
591 else:
592 raise Exception(
593 "Plugin's returned 'exec_command' object must be either a list or a string. Received type {}.".format(
594 type(exec_command)))
595  
596 # Fetch process using psutil for control (sending SIGSTOP on windows will not work)
597 proc = psutil.Process(pid=sub_proc.pid)
598  
599 # Set process priority on posix systems
600 # TODO: Test how this will work on Windows
601 if os.name == "posix":
602 try:
603 parent_proc = psutil.Process(os.getpid())
604 parent_proc_nice = parent_proc.nice()
605 proc.nice(parent_proc_nice + 1)
606 except Exception as e:
607 self._log("Unable to lower priority of subprocess. Subprocess should continue to run at normal priority",
608 str(e), level='warning')
609  
610 # Record PID and PROC
611 self.worker_subprocess = sub_proc
612 self.worker_subprocess_pid = sub_proc.pid
613  
614 # Poll process for new output until finished
615 while not self.redundant_flag.is_set():
616 line_text = sub_proc.stdout.readline()
617  
618 # Fetch command stdout and append it to the current task object (to be saved during post process)
619 self.worker_log.append(line_text)
620  
621 # Check if the command has completed. If it has, exit the loop
622 if line_text == '' and sub_proc.poll() is not None:
623 self._log("Subprocess task completed!", level='debug')
624 break
625  
626 # Parse the progress
627 try:
628 progress_dict = command_progress_parser(line_text)
629 self.worker_subprocess_percent = progress_dict.get('percent', '0')
630 self.worker_subprocess_elapsed = str(time.time() - proc_start_time - proc_pause_time)
631 except Exception as e:
632 # Only need to show any sort of exception if we have debugging enabled.
633 # So we should log it as a debug rather than an exception.
634 self._log("Exception while parsing command progress", str(e), level='debug')
635  
636 # Stop the process if the worker is paused
637 # Then resume it when the worker is resumed
638 if self.paused_flag.is_set():
639 self._log("Pausing PID {}".format(sub_proc.pid), level='debug')
640 proc.suspend()
641 self.paused = True
642 start_pause = time.time()
643 while not self.redundant_flag.is_set():
644 self.event.wait(1)
645 if not self.paused_flag.is_set():
646 self._log("Resuming PID {}".format(sub_proc.pid), level='debug')
647 proc.resume()
648 self.paused = False
649 # Elapsed time is used for calculating etc.
650 # We account for this by counting the time we are paused also.
651 # This is then subtracted from the elapsed time in the calculation above.
652 proc_pause_time = int(proc_pause_time + time.time() - start_pause)
653 break
654 continue
655  
656 # Get the final output and the exit status
657 if not self.redundant_flag.is_set():
658 communicate = sub_proc.communicate()[0]
659  
660 # If the process is still running, kill it
661 if proc.is_running():
662 self._log("Found worker subprocess is still running. Killing it.", level='warning')
663 self.__terminate_proc_tree(proc)
664  
665 if sub_proc.returncode == 0:
666 return True
667 else:
668 self._log("Command run against '{}' exited with non-zero status. "
669 "Download command dump from history for more information.".format(data.get("file_in")),
670 message2=str(exec_command), level="error")
671 return False
672  
673 except Exception as e:
674 self._log("Error while executing the command against file{}.".format(data.get("file_in")), message2=str(e),
675 level="error")
676  
677 return False