kapsikkum-unmanic – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env python3 |
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | """ |
||
5 | unmanic.workers.py |
||
6 | |||
7 | Written by: Josh.5 <jsunnex@gmail.com> |
||
8 | Date: 11 Aug 2021, (12:06 PM) |
||
9 | |||
10 | Copyright: |
||
11 | Copyright (C) Josh Sunnex - All Rights Reserved |
||
12 | |||
13 | Permission is hereby granted, free of charge, to any person obtaining a copy |
||
14 | of this software and associated documentation files (the "Software"), to deal |
||
15 | in the Software without restriction, including without limitation the rights |
||
16 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||
17 | copies of the Software, and to permit persons to whom the Software is |
||
18 | furnished to do so, subject to the following conditions: |
||
19 | |||
20 | The above copyright notice and this permission notice shall be included in all |
||
21 | copies or substantial portions of the Software. |
||
22 | |||
23 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
||
24 | EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
||
25 | MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
||
26 | IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
||
27 | DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
||
28 | OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE |
||
29 | OR OTHER DEALINGS IN THE SOFTWARE. |
||
30 | |||
31 | """ |
||
32 | import hashlib |
||
33 | import os |
||
34 | import queue |
||
35 | import shutil |
||
36 | import subprocess |
||
37 | import threading |
||
38 | import time |
||
39 | |||
40 | import psutil |
||
41 | |||
42 | from unmanic.libs import common, unlogger |
||
43 | from unmanic.libs.plugins import PluginsHandler |
||
44 | |||
45 | |||
46 | def default_progress_parser(line_text): |
||
47 | return { |
||
48 | 'percent': '' |
||
49 | } |
||
50 | |||
51 | |||
52 | class WorkerCommandError(Exception): |
||
53 | def __init___(self, command): |
||
54 | Exception.__init__(self, "Worker command returned non 0 status. Command: {}".format(command)) |
||
55 | self.command = command |
||
56 | |||
57 | |||
58 | class Worker(threading.Thread): |
||
59 | idle = True |
||
60 | paused = False |
||
61 | |||
62 | current_task = None |
||
63 | worker_log = None |
||
64 | start_time = None |
||
65 | finish_time = None |
||
66 | |||
67 | worker_subprocess = None |
||
68 | worker_subprocess_pid = None |
||
69 | worker_subprocess_percent = None |
||
70 | worker_subprocess_elapsed = None |
||
71 | |||
72 | worker_runners_info = {} |
||
73 | |||
74 | def __init__(self, thread_id, name, worker_group_id, pending_queue, complete_queue, event): |
||
75 | super(Worker, self).__init__(name=name) |
||
76 | self.thread_id = thread_id |
||
77 | self.name = name |
||
78 | self.worker_group_id = worker_group_id |
||
79 | self.event = event |
||
80 | |||
81 | self.current_task = None |
||
82 | self.pending_queue = pending_queue |
||
83 | self.complete_queue = complete_queue |
||
84 | |||
85 | # Create 'redundancy' flag. When this is set, the worker should die |
||
86 | self.redundant_flag = threading.Event() |
||
87 | self.redundant_flag.clear() |
||
88 | |||
89 | # Create 'paused' flag. When this is set, the worker should be paused |
||
90 | self.paused_flag = threading.Event() |
||
91 | self.paused_flag.clear() |
||
92 | |||
93 | # Create logger for this worker |
||
94 | unmanic_logging = unlogger.UnmanicLogger.__call__() |
||
95 | self.logger = unmanic_logging.get_logger(self.name) |
||
96 | |||
97 | def _log(self, message, message2='', level="info"): |
||
98 | message = common.format_message(message, message2) |
||
99 | getattr(self.logger, level)(message) |
||
100 | |||
101 | def run(self): |
||
102 | self._log("Starting worker") |
||
103 | while not self.redundant_flag.is_set(): |
||
104 | self.event.wait(1) # Add delay for preventing loop maxing compute resources |
||
105 | |||
106 | # If the Foreman has paused this worker, then don't do anything |
||
107 | if self.paused_flag.is_set(): |
||
108 | self.paused = True |
||
109 | # If the worker is paused, wait for 5 seconds before continuing the loop |
||
110 | self.event.wait(5) |
||
111 | continue |
||
112 | self.paused = False |
||
113 | |||
114 | # Set the worker as Idle - This will announce to the Foreman that it's ready for a task |
||
115 | self.idle = True |
||
116 | |||
117 | # Wait for task |
||
118 | while not self.redundant_flag.is_set() and self.current_task: |
||
119 | self.event.wait(.5) # Add delay for preventing loop maxing compute resources |
||
120 | |||
121 | try: |
||
122 | # Process the set task |
||
123 | self.__process_task_queue_item() |
||
124 | except queue.Empty: |
||
125 | continue |
||
126 | except Exception as e: |
||
127 | self._log("Exception in processing job with {}:".format(self.name), message2=str(e), |
||
128 | level="exception") |
||
129 | |||
130 | self._log("Stopping worker") |
||
131 | |||
132 | def set_task(self, new_task): |
||
133 | """Sets the given task to the worker class""" |
||
134 | # Ensure only one task can be set for a worker |
||
135 | if self.current_task: |
||
136 | return |
||
137 | # Set the task |
||
138 | self.current_task = new_task |
||
139 | self.worker_log = [] |
||
140 | self.idle = False |
||
141 | |||
142 | def get_status(self): |
||
143 | """ |
||
144 | Fetch the status of this worker. |
||
145 | |||
146 | TODO: Fetch subprocess pid |
||
147 | |||
148 | :return: |
||
149 | """ |
||
150 | status = { |
||
151 | 'id': str(self.thread_id), |
||
152 | 'name': self.name, |
||
153 | 'idle': self.idle, |
||
154 | 'paused': self.paused, |
||
155 | 'start_time': None if not self.start_time else str(self.start_time), |
||
156 | 'current_task': None, |
||
157 | 'current_file': "", |
||
158 | 'worker_log_tail': [], |
||
159 | 'runners_info': {}, |
||
160 | 'subprocess': { |
||
161 | 'pid': self.ident, |
||
162 | 'percent': str(self.worker_subprocess_percent), |
||
163 | 'elapsed': str(self.worker_subprocess_elapsed), |
||
164 | }, |
||
165 | } |
||
166 | if self.current_task: |
||
167 | # Fetch the current file |
||
168 | try: |
||
169 | status['current_task'] = self.current_task.get_task_id() |
||
170 | except Exception as e: |
||
171 | self._log("Exception in fetching the current task ID for worker {}:".format(self.name), message2=str(e), |
||
172 | level="exception") |
||
173 | |||
174 | # Fetch the current file |
||
175 | try: |
||
176 | status['current_file'] = self.current_task.get_source_basename() |
||
177 | except Exception as e: |
||
178 | self._log("Exception in fetching the current file of worker {}:".format(self.name), message2=str(e), |
||
179 | level="exception") |
||
180 | |||
181 | # Append the worker log tail |
||
182 | try: |
||
183 | if self.worker_log and len(self.worker_log) > 20: |
||
184 | status['worker_log_tail'] = self.worker_log[-19:] |
||
185 | else: |
||
186 | status['worker_log_tail'] = self.worker_log |
||
187 | except Exception as e: |
||
188 | self._log("Exception in fetching log tail of worker: ", message2=str(e), |
||
189 | level="exception") |
||
190 | |||
191 | # Append the runners info |
||
192 | try: |
||
193 | status['runners_info'] = self.worker_runners_info |
||
194 | except Exception as e: |
||
195 | self._log("Exception in runners info of worker {}:".format(self.name), message2=str(e), |
||
196 | level="exception") |
||
197 | return status |
||
198 | |||
199 | def __unset_current_task(self): |
||
200 | self.current_task = None |
||
201 | self.worker_runners_info = {} |
||
202 | self.worker_log = [] |
||
203 | |||
204 | def __process_task_queue_item(self): |
||
205 | """ |
||
206 | Processes the set task. |
||
207 | |||
208 | :return: |
||
209 | """ |
||
210 | # Mark worker as not idle now that it is processing a task |
||
211 | self.idle = False |
||
212 | |||
213 | # Set the progress to an empty string |
||
214 | self.worker_subprocess_percent = '' |
||
215 | self.worker_subprocess_elapsed = '0' |
||
216 | |||
217 | # Log the start of the job |
||
218 | self._log("Picked up job - {}".format(self.current_task.get_source_abspath())) |
||
219 | |||
220 | # Mark as being "in progress" |
||
221 | self.current_task.set_status('in_progress') |
||
222 | |||
223 | # Start current task stats |
||
224 | self.__set_start_task_stats() |
||
225 | |||
226 | # Process the file. Will return true if success, otherwise false |
||
227 | success = self.__exec_worker_runners_on_set_task() |
||
228 | # Mark the task as either success or not |
||
229 | self.current_task.set_success(success) |
||
230 | |||
231 | # Mark task completion statistics |
||
232 | self.__set_finish_task_stats() |
||
233 | |||
234 | # Log completion of job |
||
235 | self._log("Finished job - {}".format(self.current_task.get_source_abspath())) |
||
236 | |||
237 | # Place the task into the completed queue |
||
238 | self.complete_queue.put(self.current_task) |
||
239 | |||
240 | # Reset the current file info for the next task |
||
241 | self.__unset_current_task() |
||
242 | |||
243 | def __set_start_task_stats(self): |
||
244 | """Sets the initial stats for the start of a task""" |
||
245 | # Set the start time to now |
||
246 | self.start_time = time.time() |
||
247 | |||
248 | # Clear the finish time |
||
249 | self.finish_time = None |
||
250 | |||
251 | # Format our starting statistics data |
||
252 | self.current_task.task.processed_by_worker = self.name |
||
253 | self.current_task.task.start_time = self.start_time |
||
254 | self.current_task.task.finish_time = self.finish_time |
||
255 | |||
256 | def __set_finish_task_stats(self): |
||
257 | """Sets the final stats for the end of a task""" |
||
258 | # Set the finish time to now |
||
259 | self.finish_time = time.time() |
||
260 | |||
261 | # Set the finish time in the statistics data |
||
262 | self.current_task.task.finish_time = self.finish_time |
||
263 | |||
264 | def __exec_worker_runners_on_set_task(self): |
||
265 | """ |
||
266 | Executes the configured plugin runners against the set task. |
||
267 | |||
268 | :return: |
||
269 | """ |
||
270 | # Init plugins |
||
271 | library_id = self.current_task.get_task_library_id() |
||
272 | plugin_handler = PluginsHandler() |
||
273 | plugin_modules = plugin_handler.get_enabled_plugin_modules_by_type('worker.process_item', library_id=library_id) |
||
274 | |||
275 | # Create dictionary of runners info for the frontend |
||
276 | self.worker_runners_info = {} |
||
277 | for plugin_module in plugin_modules: |
||
278 | self.worker_runners_info[plugin_module.get('plugin_id')] = { |
||
279 | 'plugin_id': plugin_module.get('plugin_id'), |
||
280 | 'status': 'pending', |
||
281 | "name": plugin_module.get('name'), |
||
282 | "author": plugin_module.get('author'), |
||
283 | "version": plugin_module.get('version'), |
||
284 | "icon": plugin_module.get('icon'), |
||
285 | "description": plugin_module.get('description'), |
||
286 | } |
||
287 | |||
288 | # Set the absolute path to the original file |
||
289 | original_abspath = self.current_task.get_source_abspath() |
||
290 | |||
291 | # Process item in loop. |
||
292 | # First process the item for for each plugin that configures it, then run the default Unmanic configuration |
||
293 | task_cache_path = self.current_task.get_cache_path() |
||
294 | # Set the current input file to the original file path |
||
295 | file_in = original_abspath |
||
296 | # Mark the overall success of all runners. This will be set to False if any of the runners fails. |
||
297 | overall_success = True |
||
298 | # Set the current file out to nothing. |
||
299 | # This will be configured by each runner. |
||
300 | # If no runners are configured, then nothing needs to be done. |
||
301 | current_file_out = original_abspath |
||
302 | # The number of runners that have been run |
||
303 | runner_count = 0 |
||
304 | # Flag if a task has run a command |
||
305 | no_exec_command_run = True |
||
306 | |||
307 | # Generate default data object for the runner functions |
||
308 | data = { |
||
309 | "worker_log": self.worker_log, |
||
310 | "library_id": library_id, |
||
311 | "exec_command": [], |
||
312 | "command_progress_parser": default_progress_parser, |
||
313 | "file_in": file_in, |
||
314 | "file_out": None, |
||
315 | "original_file_path": original_abspath, |
||
316 | "repeat": False, |
||
317 | } |
||
318 | |||
319 | for plugin_module in plugin_modules: |
||
320 | # Increment the runners count (first runner will be set as #1) |
||
321 | runner_count += 1 |
||
322 | |||
323 | if not overall_success: |
||
324 | # If one of the Plugins fails, don't continue. |
||
325 | # The Plugins could be co-dependant and the final file will not go anywhere if 'overall_success' is False |
||
326 | break |
||
327 | |||
328 | # Mark the status of the worker for the frontend |
||
329 | self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'in_progress' |
||
330 | self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False |
||
331 | |||
332 | # Loop over runner. This way we can repeat the function with the same data if requested by the repeat flag |
||
333 | runner_pass_count = 0 |
||
334 | while not self.redundant_flag.is_set(): |
||
335 | runner_pass_count += 1 |
||
336 | |||
337 | # Fetch file out details |
||
338 | # This creates a temp file labeled "WORKING" that will be moved to the cache_path on completion |
||
339 | split_file_out = os.path.splitext(task_cache_path) |
||
340 | split_file_in = os.path.splitext(file_in) |
||
341 | file_out = "{}-{}-{}-{}{}".format(split_file_out[0], "WORKING", runner_count, runner_pass_count, |
||
342 | split_file_in[1]) |
||
343 | |||
344 | # Reset data object for this runner functions |
||
345 | data['library_id'] = library_id |
||
346 | data['exec_command'] = [] |
||
347 | data['command_progress_parser'] = default_progress_parser |
||
348 | data['file_in'] = file_in |
||
349 | data['file_out'] = file_out |
||
350 | data['original_file_path'] = original_abspath |
||
351 | data['repeat'] = False |
||
352 | |||
353 | self.event.wait(.2) # Add delay for preventing loop maxing compute resources |
||
354 | self.worker_log.append("\n\nRUNNER: \n{} [Pass #{}]\n\n".format(plugin_module.get('name'), runner_pass_count)) |
||
355 | self.worker_log.append("\nExecuting plugin runner... Please wait\n") |
||
356 | |||
357 | # Run plugin to update data |
||
358 | if not plugin_handler.exec_plugin_runner(data, plugin_module.get('plugin_id'), 'worker.process_item'): |
||
359 | # Skip this plugin module's loop |
||
360 | self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'complete' |
||
361 | self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False |
||
362 | # Set overall success status to failed |
||
363 | overall_success = False |
||
364 | # Append long entry to say the worker was terminated |
||
365 | self.worker_log.append("\n\nPLUGIN FAILED!") |
||
366 | self.worker_log.append("\nFailed to execute Plugin '{}'".format(plugin_module.get('name'))) |
||
367 | self.worker_log.append("\nCheck Unmanic logs for more information") |
||
368 | break |
||
369 | |||
370 | # Log the in and out files returned by the plugin runner for debugging |
||
371 | self._log("Worker process '{}' (in)".format(plugin_module.get('plugin_id')), data.get("file_in"), |
||
372 | level='debug') |
||
373 | self._log("Worker process '{}' (out)".format(plugin_module.get('plugin_id')), data.get("file_out"), |
||
374 | level='debug') |
||
375 | |||
376 | # Only run the conversion process if "exec_command" is not empty |
||
377 | if data.get("exec_command"): |
||
378 | self.worker_log.append("\nPlugin runner requested for a command to be executed by Unmanic") |
||
379 | |||
380 | # Exec command as subprocess |
||
381 | success = self.__exec_command_subprocess(data) |
||
382 | no_exec_command_run = False |
||
383 | |||
384 | if self.redundant_flag.is_set(): |
||
385 | # This worker has been marked as redundant. It is being terminated. |
||
386 | self._log("Worker has been terminated before a command was completed", level="warning") |
||
387 | # Mark runner as failed |
||
388 | self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False |
||
389 | # Set overall success status to failed |
||
390 | overall_success = False |
||
391 | # Append long entry to say the worker was terminated |
||
392 | self.worker_log.append("\n\nWORKER TERMINATED!") |
||
393 | # Don't continue |
||
394 | break |
||
395 | |||
396 | # Run command. Check if command exited successfully. |
||
397 | if success: |
||
398 | # If file conversion was successful |
||
399 | self._log("Successfully ran worker process '{}' on file '{}'".format(plugin_module.get('plugin_id'), |
||
400 | data.get("file_in"))) |
||
401 | # Ensure the 'file_out' that was specified by the plugin to be created was actually created. |
||
402 | if os.path.exists(data.get('file_out')): |
||
403 | # The outfile exists... |
||
404 | # In order to clean up as we go and avoid unnecessary RAM/disk use in the cache directory, |
||
405 | # we want to removed the 'file_in' file. |
||
406 | # We want to ensure that we do not accidentally remove any original files here. |
||
407 | # To avoid this, run x2 tests. |
||
408 | # First, check current 'file_in' is not the original file. |
||
409 | if os.path.abspath(data.get("file_in")) != os.path.abspath(original_abspath): |
||
410 | # Second, check that the 'file_in' is in cache directory. |
||
411 | if "unmanic_file_conversion" in os.path.abspath(data.get("file_in")): |
||
412 | # Remove this file |
||
413 | os.remove(os.path.abspath(data.get("file_in"))) |
||
414 | |||
415 | # Set the new 'file_in' as the previous runner's 'file_out' for the next loop |
||
416 | file_in = data.get("file_out") |
||
417 | else: |
||
418 | # If file conversion was successful |
||
419 | self._log( |
||
420 | "Error while running worker process '{}' on file '{}'".format( |
||
421 | plugin_module.get('plugin_id'), |
||
422 | original_abspath |
||
423 | ), |
||
424 | level="error") |
||
425 | self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = False |
||
426 | overall_success = False |
||
427 | |||
428 | # Ensure the new 'file_in' is set to the previous runner's 'file_in' for the next loop |
||
429 | file_in = data.get("file_in") |
||
430 | else: |
||
431 | # Ensure the new 'file_in' is set to the previous runner's 'file_in' for the next loop |
||
432 | file_in = data.get("file_in") |
||
433 | # Log that this plugin did not request to execute anything |
||
434 | self.worker_log.append("\nRunner did not request for Unmanic to execute a command") |
||
435 | self._log( |
||
436 | "Worker process '{}' did not request to execute a command.".format(plugin_module.get('plugin_id')), |
||
437 | level='debug') |
||
438 | |||
439 | if os.path.exists(data.get('file_out')): |
||
440 | # Set the current file out to the most recently completed cache file |
||
441 | # If the file out does not exist, it is likely never used by the plugin. |
||
442 | current_file_out = data.get('file_out') |
||
443 | else: |
||
444 | # Ensure the current_file_out is set the currently set 'file_in' |
||
445 | current_file_out = data.get('file_in') |
||
446 | |||
447 | if data.get("repeat"): |
||
448 | # The returned data contained the 'repeat'' flag. |
||
449 | # Run another pass against this same plugin |
||
450 | continue |
||
451 | break |
||
452 | |||
453 | self.worker_runners_info[plugin_module.get('plugin_id')]['success'] = True |
||
454 | self.worker_runners_info[plugin_module.get('plugin_id')]['status'] = 'complete' |
||
455 | |||
456 | # Log if no command was run by any Plugins |
||
457 | if no_exec_command_run: |
||
458 | # If no jobs were carried out on this task |
||
459 | self._log("No Plugin requested for Unmanic to run commands for this file '{}'".format(original_abspath), level='warning') |
||
460 | self.worker_log.append("\n\nNo Plugin requested for Unmanic to run commands for this file '{}'".format(original_abspath)) |
||
461 | |||
462 | # Save the completed command log |
||
463 | self.current_task.save_command_log(self.worker_log) |
||
464 | |||
465 | # If all plugins that were executed completed successfully, then this was overall a successful task. |
||
466 | # At this point we need to move the final out file to the original task cache path so the postprocessor can collect it. |
||
467 | if overall_success: |
||
468 | # If jobs carried out on this task were all successful, we will get here |
||
469 | self._log("Successfully completed Worker processing on file '{}'".format(original_abspath)) |
||
470 | |||
471 | # Attempt to move the final output file to the final cache file path for the postprocessor |
||
472 | try: |
||
473 | # Set the new file out as the extension may have changed |
||
474 | split_file_name = os.path.splitext(current_file_out) |
||
475 | file_extension = split_file_name[1].lstrip('.') |
||
476 | cache_directory = os.path.dirname(os.path.abspath(task_cache_path)) |
||
477 | self.current_task.set_cache_path(cache_directory, file_extension) |
||
478 | # Read the updated cache path |
||
479 | task_cache_path = self.current_task.get_cache_path() |
||
480 | |||
481 | # Move file to original cache path |
||
482 | self._log("Moving final cache file from '{}' to '{}'".format(current_file_out, task_cache_path)) |
||
483 | current_file_out = os.path.abspath(current_file_out) |
||
484 | |||
485 | # There is a really odd intermittent bug with the shutil module that is causing it to |
||
486 | # sometimes report that the file does not exist. |
||
487 | # This section adds a small pause and logs the error if that is the case. |
||
488 | # I have not yet figured out a solution as this is difficult to reproduce. |
||
489 | if not os.path.exists(current_file_out): |
||
490 | self._log("Error - current_file_out path does not exist! '{}'".format(file_in), level="error") |
||
491 | self.event.wait(1) |
||
492 | |||
493 | # Ensure the cache directory exists |
||
494 | if not os.path.exists(cache_directory): |
||
495 | os.makedirs(cache_directory) |
||
496 | |||
497 | # Check that the current file out is not the original source file |
||
498 | if os.path.abspath(current_file_out) == os.path.abspath(original_abspath): |
||
499 | # The current file out is not a cache file, the file must have never been modified. |
||
500 | # This can happen if all Plugins failed to run, or a Plugin specifically reset the out |
||
501 | # file to the original source in order to preserve it. |
||
502 | # In this circumstance, we want to create a cache copy and let the process continue. |
||
503 | self._log("Final cache file is the same path as the original source. Creating cache copy.", level='debug') |
||
504 | shutil.copyfile(current_file_out, task_cache_path) |
||
505 | else: |
||
506 | # Use shutil module to move the file to the final task cache location |
||
507 | shutil.move(current_file_out, task_cache_path) |
||
508 | except Exception as e: |
||
509 | self._log("Exception in final move operation of file {} to {}:".format(current_file_out, task_cache_path), |
||
510 | message2=str(e), level="exception") |
||
511 | return False |
||
512 | |||
513 | # Return True |
||
514 | return True |
||
515 | |||
516 | # If the overall result of the jobs carried out on this task were not successful, we will get here. |
||
517 | # Log the failure and return False |
||
518 | self._log("Failed to process task for file '{}'".format(original_abspath), level='warning') |
||
519 | return False |
||
520 | |||
521 | def __log_proc_terminated(self, proc): |
||
522 | self._log("Process {} terminated with exit code {}".format(proc, proc.returncode)) |
||
523 | |||
524 | def __terminate_proc_tree(self, proc: psutil.Process): |
||
525 | """ |
||
526 | Terminate the process tree (including grandchildren). |
||
527 | Processes that fail to stop with SIGTERM will be sent a SIGKILL. |
||
528 | |||
529 | :param proc: |
||
530 | :return: |
||
531 | """ |
||
532 | |||
533 | children = proc.children(recursive=True) |
||
534 | children.append(proc) |
||
535 | for p in children: |
||
536 | try: |
||
537 | p.terminate() |
||
538 | except psutil.NoSuchProcess: |
||
539 | pass |
||
540 | gone, alive = psutil.wait_procs(children, timeout=3, callback=self.__log_proc_terminated) |
||
541 | for p in alive: |
||
542 | try: |
||
543 | p.kill() |
||
544 | except psutil.NoSuchProcess: |
||
545 | pass |
||
546 | psutil.wait_procs(alive, timeout=3, callback=self.__log_proc_terminated) |
||
547 | |||
548 | def __exec_command_subprocess(self, data): |
||
549 | """ |
||
550 | Executes a command as a shell subprocess. |
||
551 | Uses the given parser to record progress data from the shell STDOUT. |
||
552 | |||
553 | :param data: |
||
554 | :return: |
||
555 | """ |
||
556 | # Fetch command to execute. |
||
557 | exec_command = data.get("exec_command", []) |
||
558 | |||
559 | # Fetch the command progress parser function |
||
560 | command_progress_parser = data.get("command_progress_parser", default_progress_parser) |
||
561 | |||
562 | # Log the command for debugging |
||
563 | command_string = exec_command |
||
564 | if isinstance(exec_command, list): |
||
565 | command_string = ' '.join(exec_command) |
||
566 | self._log("Executing: {}".format(command_string), level='debug') |
||
567 | |||
568 | # Append start of command to worker subprocess stdout |
||
569 | self.worker_log += [ |
||
570 | '\n\n', |
||
571 | 'COMMAND:\n', |
||
572 | command_string, |
||
573 | '\n\n', |
||
574 | 'LOG:\n', |
||
575 | ] |
||
576 | |||
577 | # Create output path if not exists |
||
578 | common.ensure_dir(data.get("file_out")) |
||
579 | |||
580 | # Convert file |
||
581 | try: |
||
582 | proc_pause_time = 0 |
||
583 | proc_start_time = time.time() |
||
584 | # Execute command |
||
585 | if isinstance(exec_command, list): |
||
586 | sub_proc = subprocess.Popen(exec_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
||
587 | universal_newlines=True, errors='replace') |
||
588 | elif isinstance(exec_command, str): |
||
589 | sub_proc = subprocess.Popen(exec_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
||
590 | universal_newlines=True, errors='replace', shell=True) |
||
591 | else: |
||
592 | raise Exception( |
||
593 | "Plugin's returned 'exec_command' object must be either a list or a string. Received type {}.".format( |
||
594 | type(exec_command))) |
||
595 | |||
596 | # Fetch process using psutil for control (sending SIGSTOP on windows will not work) |
||
597 | proc = psutil.Process(pid=sub_proc.pid) |
||
598 | |||
599 | # Set process priority on posix systems |
||
600 | # TODO: Test how this will work on Windows |
||
601 | if os.name == "posix": |
||
602 | try: |
||
603 | parent_proc = psutil.Process(os.getpid()) |
||
604 | parent_proc_nice = parent_proc.nice() |
||
605 | proc.nice(parent_proc_nice + 1) |
||
606 | except Exception as e: |
||
607 | self._log("Unable to lower priority of subprocess. Subprocess should continue to run at normal priority", |
||
608 | str(e), level='warning') |
||
609 | |||
610 | # Record PID and PROC |
||
611 | self.worker_subprocess = sub_proc |
||
612 | self.worker_subprocess_pid = sub_proc.pid |
||
613 | |||
614 | # Poll process for new output until finished |
||
615 | while not self.redundant_flag.is_set(): |
||
616 | line_text = sub_proc.stdout.readline() |
||
617 | |||
618 | # Fetch command stdout and append it to the current task object (to be saved during post process) |
||
619 | self.worker_log.append(line_text) |
||
620 | |||
621 | # Check if the command has completed. If it has, exit the loop |
||
622 | if line_text == '' and sub_proc.poll() is not None: |
||
623 | self._log("Subprocess task completed!", level='debug') |
||
624 | break |
||
625 | |||
626 | # Parse the progress |
||
627 | try: |
||
628 | progress_dict = command_progress_parser(line_text) |
||
629 | self.worker_subprocess_percent = progress_dict.get('percent', '0') |
||
630 | self.worker_subprocess_elapsed = str(time.time() - proc_start_time - proc_pause_time) |
||
631 | except Exception as e: |
||
632 | # Only need to show any sort of exception if we have debugging enabled. |
||
633 | # So we should log it as a debug rather than an exception. |
||
634 | self._log("Exception while parsing command progress", str(e), level='debug') |
||
635 | |||
636 | # Stop the process if the worker is paused |
||
637 | # Then resume it when the worker is resumed |
||
638 | if self.paused_flag.is_set(): |
||
639 | self._log("Pausing PID {}".format(sub_proc.pid), level='debug') |
||
640 | proc.suspend() |
||
641 | self.paused = True |
||
642 | start_pause = time.time() |
||
643 | while not self.redundant_flag.is_set(): |
||
644 | self.event.wait(1) |
||
645 | if not self.paused_flag.is_set(): |
||
646 | self._log("Resuming PID {}".format(sub_proc.pid), level='debug') |
||
647 | proc.resume() |
||
648 | self.paused = False |
||
649 | # Elapsed time is used for calculating etc. |
||
650 | # We account for this by counting the time we are paused also. |
||
651 | # This is then subtracted from the elapsed time in the calculation above. |
||
652 | proc_pause_time = int(proc_pause_time + time.time() - start_pause) |
||
653 | break |
||
654 | continue |
||
655 | |||
656 | # Get the final output and the exit status |
||
657 | if not self.redundant_flag.is_set(): |
||
658 | communicate = sub_proc.communicate()[0] |
||
659 | |||
660 | # If the process is still running, kill it |
||
661 | if proc.is_running(): |
||
662 | self._log("Found worker subprocess is still running. Killing it.", level='warning') |
||
663 | self.__terminate_proc_tree(proc) |
||
664 | |||
665 | if sub_proc.returncode == 0: |
||
666 | return True |
||
667 | else: |
||
668 | self._log("Command run against '{}' exited with non-zero status. " |
||
669 | "Download command dump from history for more information.".format(data.get("file_in")), |
||
670 | message2=str(exec_command), level="error") |
||
671 | return False |
||
672 | |||
673 | except Exception as e: |
||
674 | self._log("Error while executing the command against file{}.".format(data.get("file_in")), message2=str(e), |
||
675 | level="error") |
||
676 | |||
677 | return False |