kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.installation_link.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 28 Oct 2021, (7:24 PM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import json
33 import os.path
34 import queue
35 import threading
36 import time
37  
38 import requests
39 from requests.auth import HTTPBasicAuth
40 from requests_toolbelt import MultipartEncoder
41  
42 from unmanic import config
43 from unmanic.libs import common, session, task, unlogger
44 from unmanic.libs.library import Library
45 from unmanic.libs.session import Session
46 from unmanic.libs.singleton import SingletonType
47  
48  
49 class RequestHandler:
50  
51 def __init__(self, *args, **kwargs):
52 self.auth = kwargs.get('auth', '')
53 # Set username (could be passed in as None)
54 self.username = ''
55 if kwargs.get('username'):
56 self.username = kwargs.get('username')
57 # Set password (could be passed in as None)
58 self.password = ''
59 if kwargs.get('password'):
60 self.password = kwargs.get('password')
61  
62 def __get_request_auth(self):
63 request_auth = None
64 if self.auth and self.auth.lower() == 'basic':
65 request_auth = HTTPBasicAuth(self.username, self.password)
66 return request_auth
67  
68 def get(self, url, **kwargs):
69 return requests.get(url, auth=self.__get_request_auth(), **kwargs)
70  
71 def post(self, url, **kwargs):
72 return requests.post(url, auth=self.__get_request_auth(), **kwargs)
73  
74 def delete(self, url, **kwargs):
75 return requests.delete(url, auth=self.__get_request_auth(), **kwargs)
76  
77  
78 class Links(object, metaclass=SingletonType):
79 _network_transfer_lock = {}
80  
81 def __init__(self, *args, **kwargs):
82 self.settings = config.Config()
83 self.session = session.Session()
84 unmanic_logging = unlogger.UnmanicLogger.__call__()
85 self.logger = unmanic_logging.get_logger(__class__.__name__)
86  
87 def _log(self, message, message2='', level="info"):
88 message = common.format_message(message, message2)
89 getattr(self.logger, level)(message)
90  
91 def __format_address(self, address: str):
92 # Strip all whitespace
93 address = address.strip()
94 # Add http if it does not exist
95 if not address.lower().startswith('http'):
96 address = "http://{}".format(address)
97 # Strip any trailing slashes
98 address = address.rstrip('/')
99 return address
100  
101 def __merge_config_dicts(self, config_dict, compare_dict):
102 for key in config_dict.keys():
103 if config_dict.get(key) != compare_dict.get(key) and compare_dict.get(key) is not None:
104 # Apply the new value
105 config_dict[key] = compare_dict.get(key)
106 # Also flag the dict as updated
107 config_dict['last_updated'] = time.time()
108  
109 def __generate_default_config(self, config_dict: dict):
110 return {
111 "address": config_dict.get('address', '???'),
112 "auth": config_dict.get('auth', 'None'),
113 "username": config_dict.get('username', ''),
114 "password": config_dict.get('password', ''),
115 "enable_receiving_tasks": config_dict.get('enable_receiving_tasks', False),
116 "enable_sending_tasks": config_dict.get('enable_sending_tasks', False),
117 "enable_task_preloading": config_dict.get('enable_task_preloading', True),
118 "preloading_count": config_dict.get('preloading_count', 2),
119 "enable_checksum_validation": config_dict.get('enable_checksum_validation', False),
120 "enable_config_missing_libraries": config_dict.get('enable_config_missing_libraries', False),
121 "enable_distributed_worker_count": config_dict.get('enable_distributed_worker_count', False),
122 "name": config_dict.get('name', '???'),
123 "version": config_dict.get('version', '???'),
124 "uuid": config_dict.get('uuid', '???'),
125 "available": config_dict.get('available', False),
126 "task_count": config_dict.get('task_count', 0),
127 "last_updated": config_dict.get('last_updated', time.time()),
128 }
129  
130 def acquire_network_transfer_lock(self, url, transfer_limit=1, lock_type='send'):
131 """
132 Limit transfers to each installation to 1 at a time
133  
134 :param url:
135 :param transfer_limit:
136 :param lock_type:
137 :return:
138 """
139 time_now = time.time()
140 lock = threading.RLock()
141 # Limit maximum transfer limit to 5
142 if transfer_limit > 5:
143 transfer_limit = 5
144 # Acquire a lock if one is available
145 with lock:
146 for tx_lock in range(transfer_limit):
147 lock_key = "[{}-{}]-{}".format(lock_type, tx_lock, url)
148 if self._network_transfer_lock.get(lock_key, {}).get('expires', 0) < time_now:
149 # Create new upload lock that will expire in 1 minute
150 self._network_transfer_lock[lock_key] = {
151 'expires': (time_now + 60),
152 }
153 # Return success
154 return lock_key
155 # Failed to acquire network transfer lock
156 return False
157  
158 def release_network_transfer_lock(self, lock_key):
159 """
160 Expire the transfer lock for the given lock_key
161  
162 :param lock_key:
163 :return:
164 """
165 lock = threading.RLock()
166 with lock:
167 # Expire the lock for this address
168 self._network_transfer_lock[lock_key] = {}
169 return True
170  
171 def remote_api_get(self, remote_config: dict, endpoint: str, timeout=2):
172 """
173 GET to remote installation API
174  
175 :param remote_config:
176 :param endpoint:
177 :param timeout:
178 :return:
179 """
180 request_handler = RequestHandler(
181 auth=remote_config.get('auth'),
182 username=remote_config.get('username'),
183 password=remote_config.get('password'),
184 )
185 address = self.__format_address(remote_config.get('address'))
186 url = "{}{}".format(address, endpoint)
187 res = request_handler.get(url, timeout=timeout)
188 if res.status_code == 200:
189 return res.json()
190 elif res.status_code in [400, 404, 405, 500]:
191 json_data = res.json()
192 self._log("Error while executing GET on remote installation API - {}. Message: '{}'".format(
193 endpoint,
194 json_data.get('error')),
195 message2=json_data.get('traceback', []), level='error')
196 return {}
197  
198 def remote_api_post(self, remote_config: dict, endpoint: str, data: dict, timeout=2):
199 """
200 POST to remote installation API
201  
202 :param remote_config:
203 :param endpoint:
204 :param data:
205 :param timeout:
206 :return:
207 """
208 request_handler = RequestHandler(
209 auth=remote_config.get('auth'),
210 username=remote_config.get('username'),
211 password=remote_config.get('password'),
212 )
213 address = self.__format_address(remote_config.get('address'))
214 url = "{}{}".format(address, endpoint)
215 res = request_handler.post(url, json=data, timeout=timeout)
216 if res.status_code == 200:
217 return res.json()
218 elif res.status_code in [400, 404, 405, 500]:
219 json_data = res.json()
220 self._log("Error while executing POST on remote installation API - {}. Message: '{}'".format(
221 endpoint,
222 json_data.get('error')),
223 message2=json_data.get('traceback', []), level='error')
224 return json_data
225 return {}
226  
227 def remote_api_post_file(self, remote_config: dict, endpoint: str, path: str):
228 """
229 Send a file to the remote installation
230 No timeout is set so the request will continue until closed
231  
232 :param remote_config:
233 :param endpoint:
234 :param path:
235 :return:
236 """
237 request_handler = RequestHandler(
238 auth=remote_config.get('auth'),
239 username=remote_config.get('username'),
240 password=remote_config.get('password'),
241 )
242 address = self.__format_address(remote_config.get('address'))
243 url = "{}{}".format(address, endpoint)
244 # NOTE: If you remove a content type from the upload (text/plain) the file upload fails
245 # NOTE2: The 'ith open(path, "rb") as f' method reads the file into memory before uploading.
246 # This is slow and not ideal for devices with small amounts of ram.
247 # ```
248 # with open(path, "rb") as f:
249 # files = {"fileName": (os.path.basename(path), f, 'text/plain')}
250 # res = requests.post(url, files=files)
251 # ```
252 m = MultipartEncoder(fields={'fileName': (os.path.basename(path), open(path, 'rb'), 'text/plain')})
253 res = request_handler.post(url, data=m, headers={'Content-Type': m.content_type})
254 if res.status_code == 200:
255 return res.json()
256 elif res.status_code in [400, 404, 405, 500]:
257 json_data = res.json()
258 self._log("Error while uploading file to remote installation API - {}. Message: '{}'".format(
259 endpoint,
260 json_data.get('error')),
261 message2=json_data.get('traceback', []), level='error')
262 return {}
263  
264 def remote_api_delete(self, remote_config: dict, endpoint: str, data: dict, timeout=2):
265 """
266 DELETE to remote installation API
267  
268 :param remote_config:
269 :param endpoint:
270 :param data:
271 :param timeout:
272 :return:
273 """
274 request_handler = RequestHandler(
275 auth=remote_config.get('auth'),
276 username=remote_config.get('username'),
277 password=remote_config.get('password'),
278 )
279 address = self.__format_address(remote_config.get('address'))
280 url = "{}{}".format(address, endpoint)
281 res = request_handler.delete(url, json=data, timeout=timeout)
282 if res.status_code == 200:
283 return res.json()
284 elif res.status_code in [400, 404, 405, 500]:
285 json_data = res.json()
286 self._log("Error while executing DELETE on remote installation API - {}. Message: '{}'".format(
287 endpoint,
288 json_data.get('error')),
289 message2=json_data.get('traceback', []), level='error')
290 return {}
291  
292 def remote_api_get_download(self, remote_config: dict, endpoint: str, path: str):
293 """
294 Download a file from a remote installation
295  
296 :param remote_config:
297 :param endpoint:
298 :param path:
299 :return:
300 """
301 request_handler = RequestHandler(
302 auth=remote_config.get('auth'),
303 username=remote_config.get('username'),
304 password=remote_config.get('password'),
305 )
306 address = self.__format_address(remote_config.get('address'))
307 url = "{}{}".format(address, endpoint)
308 with request_handler.get(url, stream=True) as r:
309 r.raise_for_status()
310 with open(path, 'wb') as f:
311 for chunk in r.iter_content(chunk_size=None):
312 if chunk:
313 f.write(chunk)
314 return True
315  
316 def validate_remote_installation(self, address: str, **kwargs):
317 """
318 Validate a remote Unmanic installation by requesting
319 its system info and version
320  
321 :param address:
322 :param username:
323 :param password:
324 :return:
325 """
326 address = self.__format_address(address)
327  
328 request_handler = RequestHandler(
329 auth=kwargs.get('auth'),
330 username=kwargs.get('username'),
331 password=kwargs.get('password'),
332 )
333  
334 # Fetch config
335 url = "{}/unmanic/api/v2/settings/configuration".format(address)
336 res = request_handler.get(url, timeout=2)
337 if res.status_code != 200:
338 if res.status_code in [400, 404, 405, 500]:
339 json_data = res.json()
340 self._log("Error while fetching remote installation config. Message: '{}'".format(json_data.get('error')),
341 message2=json_data.get('traceback', []), level='error')
342 return {}
343 system_configuration_data = res.json()
344  
345 # Fetch settings
346 url = "{}/unmanic/api/v2/settings/read".format(address)
347 res = request_handler.get(url, timeout=2)
348 if res.status_code != 200:
349 if res.status_code in [400, 404, 405, 500]:
350 json_data = res.json()
351 self._log("Error while fetching remote installation settings. Message: '{}'".format(json_data.get('error')),
352 message2=json_data.get('traceback', []), level='error')
353 return {}
354 settings_data = res.json()
355  
356 # Fetch version
357 url = "{}/unmanic/api/v2/version/read".format(address)
358 res = request_handler.get(url, timeout=2)
359 if res.status_code != 200:
360 if res.status_code in [400, 404, 405, 500]:
361 json_data = res.json()
362 self._log("Error while fetching remote installation version. Message: '{}'".format(json_data.get('error')),
363 message2=json_data.get('traceback', []), level='error')
364 return {}
365 version_data = res.json()
366  
367 # Fetch version
368 url = "{}/unmanic/api/v2/session/state".format(address)
369 res = request_handler.get(url, timeout=2)
370 if res.status_code != 200:
371 if res.status_code in [400, 404, 405, 500]:
372 json_data = res.json()
373 self._log(
374 "Error while fetching remote installation session state. Message: '{}'".format(json_data.get('error')),
375 message2=json_data.get('traceback', []), level='error')
376 return {}
377 session_data = res.json()
378  
379 # Fetch task count data
380 data = {
381 "start": 0,
382 "length": 1
383 }
384 url = "{}/unmanic/api/v2/pending/tasks".format(address)
385 res = request_handler.post(url, json=data, timeout=2)
386 if res.status_code != 200:
387 if res.status_code in [400, 404, 405, 500]:
388 json_data = res.json()
389 self._log(
390 "Error while fetching remote installation pending task list. Message: '{}'".format(json_data.get('error')),
391 message2=json_data.get('traceback', []), level='error')
392 return {}
393 tasks_data = res.json()
394  
395 return {
396 'system_configuration': system_configuration_data.get('configuration'),
397 'settings': settings_data.get('settings'),
398 'version': version_data.get('version'),
399 'session': {
400 "level": session_data.get('level'),
401 "picture_uri": session_data.get('picture_uri'),
402 "name": session_data.get('name'),
403 "email": session_data.get('email'),
404 "uuid": session_data.get('uuid'),
405 },
406 'task_count': int(tasks_data.get('recordsTotal', 0))
407 }
408  
409 def update_all_remote_installation_links(self):
410 """
411 Updates the link status and configuration of linked remote installations
412  
413 :return:
414 """
415 save_settings = False
416 installation_id_list = []
417 remote_installations = []
418 distributed_worker_count_target = self.settings.get_distributed_worker_count_target()
419 for local_config in self.settings.get_remote_installations():
420 # Ensure address is not added twice by comparing installation IDs
421 # Items matching these checks will be skipped over and will not be added to the installation list
422 # that will be re-saved
423 if local_config.get('uuid') in installation_id_list and local_config.get('uuid', '???') != '???':
424 # Do not update this installation. By doing this it will be removed from the list
425 save_settings = True
426 continue
427  
428 # Ensure the address is something valid
429 if not local_config.get('address'):
430 save_settings = True
431 continue
432  
433 # Remove any entries that have an unknown address and uuid
434 if local_config.get('address') == '???' and local_config.get('uuid') == '???':
435 save_settings = True
436 continue
437  
438 # Fetch updated data
439 installation_data = None
440 try:
441 installation_data = self.validate_remote_installation(local_config.get('address'),
442 auth=local_config.get('auth'),
443 username=local_config.get('username'),
444 password=local_config.get('password'))
445 except Exception:
446 pass
447  
448 # Generate updated configured values
449 updated_config = self.__generate_default_config(local_config)
450 updated_config["available"] = False
451 if installation_data:
452 # Mark the installation as available
453 updated_config["available"] = True
454  
455 # Append the current task count
456 updated_config["task_count"] = installation_data.get('task_count', 0)
457  
458 merge_dict = {
459 "name": installation_data.get('settings', {}).get('installation_name'),
460 "version": installation_data.get('version'),
461 "uuid": installation_data.get('session', {}).get('uuid'),
462 }
463 self.__merge_config_dicts(updated_config, merge_dict)
464  
465 # Fetch the corresponding remote configuration for this local installation
466 remote_config = {}
467 try:
468 remote_config = self.fetch_remote_installation_link_config_for_this(local_config)
469 except requests.exceptions.Timeout:
470 self._log("Request to fetch remote installation config timed out", level='warning')
471 updated_config["available"] = False
472 except requests.exceptions.RequestException as e:
473 self._log("Request to fetch remote installation config failed", message2=str(e), level='warning')
474 updated_config["available"] = False
475 except Exception as e:
476 self._log("Failed to fetch remote installation config", message2=str(e), level='error')
477 updated_config["available"] = False
478  
479 # If the remote configuration is newer than this one, use those values
480 # The remote installation will do the same and this will synchronise
481 remote_link_config = remote_config.get('link_config', {})
482 if local_config.get('last_updated', 1) < remote_link_config.get('last_updated', 1):
483 # Note that the configuration options are reversed when reading from the remote installation config
484 # These items are not synced here:
485 # - enable_task_preloading
486 # - enable_checksum_validation
487 # - enable_config_missing_libraries
488 if updated_config["enable_receiving_tasks"] != remote_link_config.get('enable_sending_tasks'):
489 updated_config["enable_receiving_tasks"] = remote_link_config.get('enable_sending_tasks')
490 save_settings = True
491 if updated_config["enable_sending_tasks"] != remote_link_config.get('enable_receiving_tasks'):
492 updated_config["enable_sending_tasks"] = remote_link_config.get('enable_receiving_tasks')
493 save_settings = True
494 # Update the distributed_worker_count_target
495 distributed_worker_count_target = remote_config.get('distributed_worker_count_target', 0)
496 # Also sync the last_updated flag
497 updated_config['last_updated'] = remote_link_config.get('last_updated')
498  
499 # If the remote config is unable to contact this installation (or it does not have a corresponding config yet)
500 # then also push the configuration
501 if not remote_link_config.get('available'):
502 try:
503 self.push_remote_installation_link_config(updated_config)
504 except requests.exceptions.Timeout:
505 self._log("Request to push link config to remote installation timed out", level='warning')
506 updated_config["available"] = False
507 except requests.exceptions.RequestException as e:
508 self._log("Request to push link config to remote installation failed", message2=str(e),
509 level='warning')
510 updated_config["available"] = False
511 except Exception as e:
512 self._log("Failed to push link config to remote installation", message2=str(e), level='error')
513 updated_config["available"] = False
514  
515 # Push library configurations for missing remote libraries (if configured to do so)
516 if local_config.get('enable_sending_tasks') and local_config.get('enable_config_missing_libraries'):
517 # Fetch remote installation library name list
518 results = self.remote_api_get(local_config, '/unmanic/api/v2/settings/libraries')
519 existing_library_names = []
520 for library in results.get('libraries', []):
521 existing_library_names.append(library.get('name'))
522 # Loop over local libraries and create an import object for each one that is missing
523 for library in Library.get_all_libraries():
524 # Ignore local libraries that are configured for remote only
525 if library.get('enable_remote_only'):
526 continue
527 # For each of the missing libraries, create a new remote library with that config.
528 if library.get('name') not in existing_library_names:
529 # Export library config
530 import_data = Library.export(library.get('id'))
531 # Set library ID to 0 to generate new library from this import
532 import_data['library_id'] = 0
533 # Configure remote library to be fore remote files only
534 import_data['library_config']['enable_remote_only'] = True
535 import_data['library_config']['enable_scanner'] = False
536 import_data['library_config']['enable_inotify'] = False
537 # Import library on remote installation
538 self._log("Importing remote library config '{}'".format(library.get('name')), message2=import_data,
539 level='debug')
540 result = self.import_remote_library_config(local_config, import_data)
541 if result is None:
542 # There was a connection issue of some kind. This was already logged.
543 continue
544 if result.get('success'):
545 self._log("Successfully imported library '{}'".format(library.get('name')), level='debug')
546 continue
547 self._log("Failed to import library config '{}'".format(library.get('name')),
548 message2=result.get('error'), level='error')
549  
550 # Only save to file if the settings have been updated
551 remote_installations.append(updated_config)
552  
553 # Add UUID to list for next loop
554 installation_id_list.append(updated_config.get('uuid', '???'))
555  
556 # Update installation data. Only save the config to disk if it was modified
557 settings_dict = {
558 'remote_installations': remote_installations,
559 'distributed_worker_count_target': distributed_worker_count_target
560 }
561 self.settings.set_bulk_config_items(settings_dict, save_settings=save_settings)
562  
563 return remote_installations
564  
565 def read_remote_installation_link_config(self, uuid: str):
566 """
567 Returns the configuration of the remote installation
568  
569 :param uuid:
570 :return:
571 """
572 for remote_installation in self.settings.get_remote_installations():
573 if remote_installation.get('uuid') == uuid:
574 # If not yet configured, set default values before returning
575 return self.__generate_default_config(remote_installation)
576  
577 # Ensure we have settings data from the remote installation
578 raise Exception("Unable to read installation link configuration.")
579  
580 def update_single_remote_installation_link_config(self, configuration: dict, distributed_worker_count_target=0):
581 """
582 Returns the configuration of the remote installation
583  
584 :param configuration:
585 :param distributed_worker_count_target:
586 :return:
587 """
588 uuid = configuration.get('uuid')
589 if not uuid:
590 raise Exception("Updating a single installation link configuration requires a UUID.")
591  
592 current_distributed_worker_count_target = self.settings.get_distributed_worker_count_target()
593 force_update_flag = False
594 if int(current_distributed_worker_count_target) != int(distributed_worker_count_target):
595 force_update_flag = True
596  
597 config_exists = False
598 remote_installations = []
599 for local_config in self.settings.get_remote_installations():
600 updated_config = self.__generate_default_config(local_config)
601  
602 # If this is the uuid in the config provided, then update our config with the provided values
603 if local_config.get('uuid') == uuid:
604 config_exists = True
605 self.__merge_config_dicts(updated_config, configuration)
606  
607 # If this link is configured for distributed worker count, and that count was change,
608 # force the last update flag to be updated so this change is disseminated
609 if force_update_flag and configuration.get('enable_distributed_worker_count'):
610 updated_config['last_updated'] = time.time()
611  
612 remote_installations.append(updated_config)
613  
614 # If the config does not yet exist, the add it now
615 if not config_exists:
616 remote_installations.append(self.__generate_default_config(configuration))
617  
618 # Update installation data and save the config to disk
619 settings_dict = {
620 'remote_installations': remote_installations,
621 'distributed_worker_count_target': distributed_worker_count_target
622 }
623 self.settings.set_bulk_config_items(settings_dict, save_settings=True)
624  
625 def delete_remote_installation_link_config(self, uuid: str):
626 """
627 Removes a link configuration for a remote installation given its uuid
628 If no uuid match is found, returns False
629  
630 :param uuid:
631 :return:
632 """
633 removed = False
634 updated_list = []
635 for remote_installation in self.settings.get_remote_installations():
636 if remote_installation.get('uuid') == uuid:
637 # Mark the task as having successfully remoted the installation
638 removed = True
639 continue
640 # Only add remote installations that do not match
641 updated_list.append(remote_installation)
642  
643 # Update installation data and save the config to disk
644 settings_dict = {
645 'remote_installations': updated_list,
646 }
647 self.settings.set_bulk_config_items(settings_dict, save_settings=True)
648 return removed
649  
650 def fetch_remote_installation_link_config_for_this(self, remote_config: dict):
651 """
652 Fetches and returns the corresponding link configuration from a remote installation
653  
654 :param remote_config:
655 :return:
656 """
657 request_handler = RequestHandler(
658 auth=remote_config.get('auth'),
659 username=remote_config.get('username'),
660 password=remote_config.get('password'),
661 )
662 address = self.__format_address(remote_config.get('address'))
663 url = "{}/unmanic/api/v2/settings/link/read".format(address)
664 data = {
665 "uuid": self.session.uuid
666 }
667 res = request_handler.post(url, json=data, timeout=2)
668 if res.status_code == 200:
669 return res.json()
670 elif res.status_code in [400, 404, 405, 500]:
671 json_data = res.json()
672 self._log("Error while fetching remote installation link config. Message: '{}'".format(json_data.get('error')),
673 message2=json_data.get('traceback', []), level='error')
674 return {}
675  
676 def push_remote_installation_link_config(self, configuration: dict):
677 """
678 Pushes the given link config to the remote installation returns the corresponding link configuration from a remote installation
679  
680 :param configuration:
681 :return:
682 """
683 request_handler = RequestHandler(
684 auth=configuration.get('auth'),
685 username=configuration.get('username'),
686 password=configuration.get('password'),
687 )
688 address = self.__format_address(configuration.get('address'))
689 url = "{}/unmanic/api/v2/settings/link/write".format(address)
690  
691 # First generate an updated config
692 updated_config = self.__generate_default_config(configuration)
693  
694 # Update the bits for the remote instance
695 updated_config['uuid'] = self.session.uuid
696 updated_config['name'] = self.settings.get_installation_name()
697 updated_config['version'] = self.settings.read_version()
698  
699 # Configure settings
700 updated_config["enable_receiving_tasks"] = configuration.get('enable_sending_tasks')
701 updated_config["enable_sending_tasks"] = configuration.get('enable_receiving_tasks')
702  
703 # Current task count
704 task_handler = task.Task()
705 updated_config["task_count"] = int(task_handler.get_total_task_list_count())
706  
707 # Fetch local config for distributed_worker_count_target
708 distributed_worker_count_target = self.settings.get_distributed_worker_count_target()
709  
710 # Remove some of the other fields. These will need to be adjusted on the remote instance manually
711 del updated_config['address']
712 del updated_config['available']
713  
714 data = {
715 'link_config': updated_config,
716 'distributed_worker_count_target': distributed_worker_count_target
717 }
718 res = request_handler.post(url, json=data, timeout=2)
719 if res.status_code == 200:
720 return True
721 elif res.status_code in [400, 404, 405, 500]:
722 json_data = res.json()
723 self._log("Error while pushing remote installation link config. Message: '{}'".format(json_data.get('error')),
724 message2=json_data.get('traceback', []), level='error')
725 return False
726  
727 def check_remote_installation_for_available_workers(self):
728 """
729 Return a list of installations with workers available for a remote task.
730 This list is filtered by:
731 - Only installations that are available
732 - Only installations that are configured for sending tasks to
733 - Only installations that have not pending tasks
734 - Only installations that have at least one idle worker that is not paused
735  
736 :return:
737 """
738 installations_with_info = {}
739 for lc in self.settings.get_remote_installations():
740 local_config = self.__generate_default_config(lc)
741  
742 # Only installations that are available
743 if not local_config.get('available'):
744 continue
745  
746 # Only installations that are configured for sending tasks to
747 if not local_config.get('enable_sending_tasks'):
748 continue
749  
750 # No valid UUID, no valid connection. This link may still be syncing
751 if len(local_config.get('uuid', '')) < 20:
752 continue
753  
754 try:
755 # Define auth
756 # Only installations that have at least one idle worker that is not paused
757 results = self.remote_api_get(local_config, '/unmanic/api/v2/workers/status')
758 worker_list = results.get('workers_status', [])
759  
760 # Only add installations that have not got pending tasks. This is unless we are configured to preload the queue
761 max_pending_tasks = 0
762 if local_config.get('enable_task_preloading'):
763 # Preload with the number of workers (regardless of the worker status) plus an additional one to account
764 # for delays in the downloads
765 max_pending_tasks = local_config.get('preloading_count')
766 results = self.remote_api_post(local_config, '/unmanic/api/v2/pending/tasks', {
767 "start": 0,
768 "length": 1
769 })
770 if results.get('error'):
771 continue
772 current_pending_tasks = int(results.get('recordsFiltered', 0))
773 if local_config.get('enable_task_preloading') and current_pending_tasks >= max_pending_tasks:
774 self._log("Remote installation has exceeded the max remote pending task count ({})".format(
775 current_pending_tasks), level='debug')
776 continue
777  
778 # Fetch remote installation library name list
779 results = self.remote_api_get(local_config, '/unmanic/api/v2/settings/libraries')
780 library_names = []
781 for library in results.get('libraries', []):
782 library_names.append(library.get('name'))
783  
784 # Ensure that worker count is more than 0
785 if len(worker_list):
786 installations_with_info[local_config.get('uuid')] = {
787 "address": local_config.get('address'),
788 "auth": local_config.get('auth'),
789 "username": local_config.get('username'),
790 "password": local_config.get('password'),
791 "enable_task_preloading": local_config.get('enable_task_preloading'),
792 "preloading_count": local_config.get('preloading_count'),
793 "library_names": library_names,
794 "available_slots": 0,
795 }
796  
797 available_workers = False
798 for worker in worker_list:
799 # Add a slot for each worker regardless of its status
800 installations_with_info[local_config.get('uuid')]['available_slots'] += 1
801 if worker.get('idle') and not worker.get('paused'):
802 # If any workers are idle and not paused then we have an available worker slot
803 available_workers = True
804 installations_with_info[local_config.get('uuid')]['available_workers'] = True
805 elif not worker.get('idle'):
806 # If any workers are busy with a task then also mark that as an an available worker slot
807 available_workers = True
808 installations_with_info[local_config.get('uuid')]['available_workers'] = True
809  
810 # Check if this installation is configured for preloading
811 if available_workers and local_config.get('enable_task_preloading'):
812 # Add more slots to fill up the pending task queue
813 while not current_pending_tasks > max_pending_tasks:
814 installations_with_info[local_config.get('uuid')]['available_slots'] += 1
815 current_pending_tasks += 1
816  
817 except Exception as e:
818 self._log("Failed to contact remote installation '{}'".format(local_config.get('address')), message2=str(e),
819 level='warning')
820 continue
821  
822 return installations_with_info
823  
824 def within_enabled_link_limits(self, frontend_messages=None):
825 """
826 Ensure enabled plugins are within limits
827  
828 :param frontend_messages:
829 :return:
830 """
831 # Fetch level from session
832 s = Session()
833 s.register_unmanic()
834 if s.level > 1:
835 return True
836  
837 # Fetch all linked remote installations
838 remote_installations = self.settings.get_remote_installations()
839  
840 def add_frontend_message():
841 # If the frontend messages queue was included in request, append a message
842 if frontend_messages:
843 frontend_messages.put(
844 {
845 'id': 'linkedInstallationLimits',
846 'type': 'error',
847 'code': 'linkedInstallationLimits',
848 'message': '',
849 'timeout': 0
850 }
851 )
852  
853 # Ensure remote installations are within limits
854 # Function was returned above if the user was logged in and able to use infinite
855 if len(remote_installations) > s.link_count:
856 add_frontend_message()
857 return False
858 return True
859  
860 def new_pending_task_create_on_remote_installation(self, remote_config: dict, abspath: str, library_id: int):
861 """
862 Create a new pending task on a remote installation.
863 The remote installation will return the ID of a generated task.
864  
865 :param remote_config:
866 :param abspath:
867 :param library_id:
868 :return:
869 """
870 try:
871 request_handler = RequestHandler(
872 auth=remote_config.get('auth'),
873 username=remote_config.get('username'),
874 password=remote_config.get('password'),
875 )
876 address = self.__format_address(remote_config.get('address'))
877 url = "{}/unmanic/api/v2/pending/create".format(address)
878 data = {
879 "path": abspath,
880 "library_id": library_id,
881 "type": 'remote',
882 }
883 res = request_handler.post(url, json=data, timeout=2)
884 if res.status_code in [200, 400]:
885 return res.json()
886 elif res.status_code in [404, 405, 500]:
887 json_data = res.json()
888 self._log("Error while creating new remote pending task. Message: '{}'".format(json_data.get('error')),
889 message2=json_data.get('traceback', []), level='error')
890 return {}
891 except requests.exceptions.Timeout:
892 self._log("Request to create remote pending task timed out '{}'".format(abspath), level='warning')
893 return None
894 except requests.exceptions.RequestException as e:
895 self._log("Request to create remote pending task failed '{}'".format(abspath), message2=str(e), level='warning')
896 return None
897 except Exception as e:
898 self._log("Failed to create remote pending task '{}'".format(abspath), message2=str(e), level='error')
899 return {}
900  
901 def send_file_to_remote_installation(self, remote_config: dict, path: str):
902 """
903 Send a file to a remote installation.
904 The remote installation will return the ID of a generated task.
905  
906 :param remote_config:
907 :param path:
908 :return:
909 """
910 try:
911 results = self.remote_api_post_file(remote_config, '/unmanic/api/v2/upload/pending/file', path)
912 if results.get('error'):
913 results = {}
914 return results
915 except requests.exceptions.RequestException as e:
916 self._log("Request to upload to remote installation failed", message2=str(e), level='warning')
917 except Exception as e:
918 self._log("Failed to upload to remote installation", message2=str(e), level='error')
919 return {}
920  
921 def remove_task_from_remote_installation(self, remote_config: dict, remote_task_id: int):
922 """
923 Remove a task from the pending queue
924  
925 :param remote_config:
926 :param remote_task_id:
927 :return:
928 """
929 try:
930 data = {
931 "id_list": [remote_task_id]
932 }
933 return self.remote_api_delete(remote_config, '/unmanic/api/v2/pending/tasks', data, timeout=15)
934 except requests.exceptions.Timeout:
935 self._log("Request to remove remote task timed out", level='warning')
936 return None
937 except requests.exceptions.RequestException as e:
938 self._log("Request to remove remote task failed", message2=str(e), level='warning')
939 return None
940 except Exception as e:
941 self._log("Failed to remove remote pending task", message2=str(e), level='error')
942 return {}
943  
944 def get_the_remote_library_config_by_name(self, remote_config: dict, library_name: str):
945 """
946 Fetch a remote library's configuration by its name
947  
948 :param remote_config:
949 :param library_name:
950 :return:
951 """
952 try:
953 # Fetch remote installation libraries
954 results = self.remote_api_get(remote_config, '/unmanic/api/v2/settings/libraries', timeout=4)
955 for library in results.get('libraries', []):
956 if library.get('name') == library_name:
957 return library
958 except requests.exceptions.Timeout:
959 self._log("Request to set remote task library timed out", level='warning')
960 return None
961 except requests.exceptions.RequestException as e:
962 self._log("Request to set remote task library failed", message2=str(e), level='warning')
963 return None
964 except Exception as e:
965 self._log("Failed to set remote task library", message2=str(e), level='error')
966 return {}
967  
968 def set_the_remote_task_library(self, remote_config: dict, remote_task_id: int, library_name: str):
969 """
970 Set the library for the remote task
971 Defaults to the remote installation's default library
972  
973 :param remote_config:
974 :param remote_task_id:
975 :param library_name:
976 :return:
977 """
978 try:
979 data = {
980 "id_list": [remote_task_id],
981 "library_name": library_name,
982 }
983 results = self.remote_api_post(remote_config, '/unmanic/api/v2/pending/library/update', data, timeout=7)
984 if results.get('error'):
985 results = {}
986 return results
987 except requests.exceptions.Timeout:
988 self._log("Request to set remote task library timed out", level='warning')
989 return None
990 except requests.exceptions.RequestException as e:
991 self._log("Request to set remote task library failed", message2=str(e), level='warning')
992 return None
993 except Exception as e:
994 self._log("Failed to set remote task library", message2=str(e), level='error')
995 return {}
996  
997 def get_remote_pending_task_state(self, remote_config: dict, remote_task_id: int):
998 """
999 Get the remote pending task status
1000  
1001 :param remote_config:
1002 :param remote_task_id:
1003 :return:
1004 """
1005 try:
1006 data = {
1007 "id_list": [remote_task_id]
1008 }
1009 results = self.remote_api_post(remote_config, '/unmanic/api/v2/pending/status/get', data, timeout=7)
1010 return results
1011 except requests.exceptions.Timeout:
1012 self._log("Request to get status of remote task timed out", level='warning')
1013 except requests.exceptions.RequestException as e:
1014 self._log("Request to get status of remote task failed", message2=str(e), level='warning')
1015 except Exception as e:
1016 self._log("Failed to get status of remote pending task", message2=str(e), level='error')
1017 return None
1018  
1019 def start_the_remote_task_by_id(self, remote_config: dict, remote_task_id: int):
1020 """
1021 Start the remote pending task
1022  
1023 :param remote_config:
1024 :param remote_task_id:
1025 :return:
1026 """
1027 try:
1028 data = {
1029 "id_list": [remote_task_id]
1030 }
1031 results = self.remote_api_post(remote_config, '/unmanic/api/v2/pending/status/set/ready', data, timeout=7)
1032 if results.get('error'):
1033 results = {}
1034 return results
1035 except requests.exceptions.Timeout:
1036 self._log("Request to start remote task timed out", level='warning')
1037 return None
1038 except requests.exceptions.RequestException as e:
1039 self._log("Request to start remote task failed", message2=str(e), level='warning')
1040 return None
1041 except Exception as e:
1042 self._log("Failed to start remote pending task", message2=str(e), level='error')
1043 return {}
1044  
1045 def get_all_worker_status(self, remote_config: dict):
1046 """
1047 Start the remote pending task
1048  
1049 :param remote_config:
1050 :return:
1051 """
1052 try:
1053 results = self.remote_api_get(remote_config, '/unmanic/api/v2/workers/status')
1054 return results.get('workers_status', [])
1055 except requests.exceptions.Timeout:
1056 self._log("Request to get worker status timed out", level='warning')
1057 except requests.exceptions.RequestException as e:
1058 self._log("Request to get worker status failed", message2=str(e), level='warning')
1059 except Exception as e:
1060 self._log("Failed to get worker status", message2=str(e), level='error')
1061 return []
1062  
1063 def get_single_worker_status(self, remote_config: dict, worker_id: str):
1064 """
1065 Start the remote pending task
1066  
1067 :param remote_config:
1068 :param worker_id:
1069 :return:
1070 """
1071 workers_status = self.get_all_worker_status(remote_config)
1072 for worker in workers_status:
1073 if worker.get('id') == worker_id:
1074 return worker
1075 return {}
1076  
1077 def terminate_remote_worker(self, remote_config: dict, worker_id: str):
1078 """
1079 Start the remote pending task
1080  
1081 :param remote_config:
1082 :param worker_id:
1083 :return:
1084 """
1085 try:
1086 data = {
1087 "worker_id": [worker_id]
1088 }
1089 return self.remote_api_delete(remote_config, '/unmanic/api/v2/workers/worker/terminate', data)
1090 except requests.exceptions.Timeout:
1091 self._log("Request to terminate remote worker timed out", level='warning')
1092 except requests.exceptions.RequestException as e:
1093 self._log("Request to terminate remote worker failed", message2=str(e), level='warning')
1094 except Exception as e:
1095 self._log("Failed to terminate remote worker", message2=str(e), level='error')
1096 return {}
1097  
1098 def fetch_remote_task_data(self, remote_config: dict, remote_task_id: int, path: str):
1099 """
1100 Fetch the completed remote task data
1101  
1102 :param remote_config:
1103 :param remote_task_id:
1104 :param path:
1105 :return:
1106 """
1107 task_data = {}
1108 try:
1109 # Request API generate a DL link
1110 link_info = self.remote_api_get(remote_config,
1111 '/unmanic/api/v2/pending/download/data/id/{}'.format(remote_task_id))
1112 if link_info.get('link_id'):
1113 # Download the data file
1114 res = self.remote_api_get_download(remote_config, '/unmanic/downloads/{}'.format(link_info.get('link_id')),
1115 path)
1116 if res and os.path.exists(path):
1117 with open(path) as f:
1118 task_data = json.load(f)
1119 except requests.exceptions.Timeout:
1120 self._log("Request to fetch remote task data timed out", level='warning')
1121 except requests.exceptions.RequestException as e:
1122 self._log("Request to fetch remote task data failed", message2=str(e), level='warning')
1123 except Exception as e:
1124 self._log("Failed to fetch remote task data", message2=str(e), level='error')
1125 return task_data
1126  
1127 def fetch_remote_task_completed_file(self, remote_config: dict, remote_task_id: int, path: str):
1128 """
1129 Fetch the completed remote task file
1130  
1131 :param remote_config:
1132 :param remote_task_id:
1133 :param path:
1134 :return:
1135 """
1136 try:
1137 # Request API generate a DL link
1138 link_info = self.remote_api_get(remote_config,
1139 '/unmanic/api/v2/pending/download/file/id/{}'.format(remote_task_id))
1140 if link_info.get('link_id'):
1141 # Download the file
1142 res = self.remote_api_get_download(remote_config, '/unmanic/downloads/{}'.format(link_info.get('link_id')),
1143 path)
1144 if res and os.path.exists(path):
1145 return True
1146 except requests.exceptions.Timeout:
1147 self._log("Request to fetch remote task completed file timed out", level='warning')
1148 except requests.exceptions.RequestException as e:
1149 self._log("Request to fetch remote task completed file failed", message2=str(e), level='warning')
1150 except Exception as e:
1151 self._log("Failed to fetch remote task completed file", message2=str(e), level='error')
1152 return False
1153  
1154 def import_remote_library_config(self, remote_config: dict, import_data: dict):
1155 """
1156 Import a library config on a remote installation
1157  
1158 :param remote_config:
1159 :param import_data:
1160 :return:
1161 """
1162 try:
1163 results = self.remote_api_post(remote_config, '/unmanic/api/v2/settings/library/import', import_data, timeout=60)
1164 if results.get('error'):
1165 results = {}
1166 return results
1167 except requests.exceptions.Timeout:
1168 self._log("Request to import remote library timed out", level='warning')
1169 return None
1170 except requests.exceptions.RequestException as e:
1171 self._log("Request to import remote library failed", message2=str(e), level='warning')
1172 return None
1173 except Exception as e:
1174 self._log("Failed to import remote library", message2=str(e), level='error')
1175 return {}
1176  
1177  
1178 class RemoteTaskManager(threading.Thread):
1179 paused = False
1180  
1181 current_task = None
1182 worker_log = None
1183 start_time = None
1184 finish_time = None
1185  
1186 worker_subprocess_percent = None
1187 worker_subprocess_elapsed = None
1188  
1189 worker_runners_info = {}
1190  
1191 def __init__(self, thread_id, name, installation_info, pending_queue, complete_queue, event):
1192 super(RemoteTaskManager, self).__init__(name=name)
1193 self.thread_id = thread_id
1194 self.name = name
1195 self.event = event
1196 self.installation_info = installation_info
1197 self.pending_queue = pending_queue
1198 self.complete_queue = complete_queue
1199  
1200 self.links = Links()
1201  
1202 # Create 'redundancy' flag. When this is set, the worker should die
1203 self.redundant_flag = threading.Event()
1204 self.redundant_flag.clear()
1205  
1206 # Create 'paused' flag. When this is set, the worker should be paused
1207 self.paused_flag = threading.Event()
1208 self.paused_flag.clear()
1209  
1210 # Create logger for this worker
1211 unmanic_logging = unlogger.UnmanicLogger.__call__()
1212 self.logger = unmanic_logging.get_logger(self.name)
1213  
1214 def _log(self, message, message2='', level="info"):
1215 message = common.format_message(message, message2)
1216 getattr(self.logger, level)(message)
1217  
1218 def get_info(self):
1219 return {
1220 'name': self.name,
1221 'installation_info': self.installation_info,
1222 }
1223  
1224 def run(self):
1225 # A manager should only run for a single task and connection to a single worker.
1226 # If either of these become unavailable, then the manager should exit
1227 self._log("Starting remote task manager {} - {}".format(self.thread_id, self.installation_info.get('address')))
1228 # Pull task
1229 try:
1230 # Pending task queue has an item available. Fetch it.
1231 next_task = self.pending_queue.get_nowait()
1232  
1233 # Configure worker for this task
1234 self.__set_current_task(next_task)
1235  
1236 # Process the set task
1237 self.__process_task_queue_item()
1238  
1239 except queue.Empty:
1240 self._log("Remote task manager started by the pending queue was empty", level="warning")
1241 except Exception as e:
1242 self._log("Exception in processing job with {}:".format(self.name), message2=str(e),
1243 level="exception")
1244  
1245 self._log("Stopping remote task manager {} - {}".format(self.thread_id, self.installation_info.get('address')))
1246  
1247 def __set_current_task(self, current_task):
1248 """Sets the given task to the worker class"""
1249 self.current_task = current_task
1250 self.worker_log = []
1251  
1252 def __unset_current_task(self):
1253 self.current_task = None
1254 self.worker_runners_info = {}
1255 self.worker_log = []
1256  
1257 def __process_task_queue_item(self):
1258 """
1259 Processes the set task.
1260  
1261 :return:
1262 """
1263 # Set the progress to an empty string
1264 self.worker_subprocess_percent = ''
1265 self.worker_subprocess_elapsed = '0'
1266  
1267 # Log the start of the job
1268 self._log("Picked up job - {}".format(self.current_task.get_source_abspath()))
1269  
1270 # Mark as being "in progress"
1271 self.current_task.set_status('in_progress')
1272  
1273 # Start current task stats
1274 self.__set_start_task_stats()
1275  
1276 # Process the file. Will return true if success, otherwise false
1277 success = self.__send_task_to_remote_worker_and_monitor()
1278 # Mark the task as either success or not
1279 self.current_task.set_success(success)
1280  
1281 # Mark task completion statistics
1282 self.__set_finish_task_stats()
1283  
1284 # Log completion of job
1285 self._log("Finished job - {}".format(self.current_task.get_source_abspath()))
1286  
1287 # Place the task into the completed queue
1288 self.complete_queue.put(self.current_task)
1289  
1290 # Reset the current file info for the next task
1291 self.__unset_current_task()
1292  
1293 def __set_start_task_stats(self):
1294 """Sets the initial stats for the start of a task"""
1295 # Set the start time to now
1296 self.start_time = time.time()
1297  
1298 # Clear the finish time
1299 self.finish_time = None
1300  
1301 # Format our starting statistics data
1302 self.current_task.task.processed_by_worker = self.name
1303 self.current_task.task.start_time = self.start_time
1304 self.current_task.task.finish_time = self.finish_time
1305  
1306 def __set_finish_task_stats(self):
1307 """Sets the final stats for the end of a task"""
1308 # Set the finish time to now
1309 self.finish_time = time.time()
1310  
1311 # Set the finish time in the statistics data
1312 self.current_task.task.finish_time = self.finish_time
1313  
1314 def __write_failure_to_worker_log(self):
1315 # Append long entry to say the worker was terminated
1316 self.worker_log.append("\n\nREMOTE TASK FAILED!")
1317 self.worker_log.append("\nAn error occurred during one of these stages:")
1318 self.worker_log.append("\n - while sending task to remote installation")
1319 self.worker_log.append("\n - during the remote task processing")
1320 self.worker_log.append("\n - while attempting to retrieve the completed task from the remote installation")
1321 self.worker_log.append("\nCheck Unmanic logs for more information.")
1322 self.worker_log.append("\nRelevant logs will be prefixed with 'ERROR:Unmanic.{}'".format(self.name))
1323 self.current_task.save_command_log(self.worker_log)
1324  
1325 def __send_task_to_remote_worker_and_monitor(self):
1326 """
1327 Sends the task file to the remote installation to process.
1328 Monitors progress and then fetches the results
1329  
1330 TODO: Manage network disconnections.
1331 - This manager object should be able to handle a network disconnect. However, we should terminate
1332 this manager if the remote task no longer exists.
1333 - Catch all API request exceptions.
1334 - Remove the failed_status_count - losing contact should be ok. What matters is when contact is made that
1335 the task still exists to be downloaded or status updated.
1336  
1337 :return:
1338 """
1339 # Set the absolute path to the original file
1340 original_abspath = self.current_task.get_source_abspath()
1341  
1342 # Ensure file exists
1343 if not os.path.exists(original_abspath):
1344 self._log("File no longer exists '{}'. Was it removed?".format(original_abspath), level='warning')
1345 self.__write_failure_to_worker_log()
1346 return False
1347  
1348 # Set the remote worker address
1349 address = self.installation_info.get('address')
1350  
1351 lock_key = None
1352  
1353 # Fetch the library name and path this task is for
1354 library_id = self.current_task.get_task_library_id()
1355 try:
1356 library = Library(library_id)
1357 except Exception as e:
1358 self._log("Unable to fetch library config for ID {}".format(library_id), level='exception')
1359 self.__write_failure_to_worker_log()
1360 return False
1361 library_name = library.get_name()
1362 library_path = library.get_path()
1363  
1364 # Check if we can create the remote task with just a relative path
1365 # only create checksum and send file if the remote library path cannot accept relative paths or
1366 # it is configured for only receiving remote files
1367 send_file = False
1368 library_config = self.links.get_the_remote_library_config_by_name(self.installation_info, library_name)
1369  
1370 # Check if remote library is configured only for receiving remote files
1371 if library_config.get('enable_remote_only'):
1372 send_file = True
1373  
1374 # First attempt to create a task with an abspath on the remote installation
1375 remote_task_id = None
1376 if not send_file:
1377 remote_library_id = library_config.get('id')
1378  
1379 # Remove library path from file abspath to create a relative path
1380 original_relpath = os.path.relpath(original_abspath, library_path)
1381 # Join remote library path to the relative path to form a remote library abspath to the file
1382 remote_original_abspath = os.path.join(library_config.get('path'), original_relpath)
1383 # Post the task creation. This will error if the file does not exist
1384 info = self.links.new_pending_task_create_on_remote_installation(self.installation_info,
1385 remote_original_abspath,
1386 remote_library_id)
1387 if not info:
1388 self._log("Unable to create remote pending task for path '{}'. Fallback to sending file.".format(
1389 remote_original_abspath), level='debug')
1390 send_file = True
1391 elif 'path does not exist' in info.get('error', '').lower():
1392 self._log("Unable to find file in remote library's path '{}'. Fallback to sending file.".format(
1393 remote_original_abspath), level='debug')
1394 send_file = True
1395 elif 'task already exists' in info.get('error', '').lower():
1396 self._log("A remote task already exists with the path '{}'. Fallback to sending file.".format(
1397 remote_original_abspath), level='error')
1398 self.__write_failure_to_worker_log()
1399 return False
1400  
1401 # Set the remote task ID
1402 remote_task_id = info.get('id')
1403  
1404 if send_file:
1405 initial_checksum = None
1406 if self.installation_info.get('enable_checksum_validation', False):
1407 # Get source file checksum
1408 initial_checksum = common.get_file_checksum(original_abspath)
1409 initial_file_size = os.path.getsize(original_abspath)
1410  
1411 # Loop until we are able to upload the file to the remote installation
1412 info = {}
1413 while not self.redundant_flag.is_set():
1414 # For files smaller than 100MB, just transfer them in parallel
1415 # Smaller files add a lot of time overhead with the waiting in line and it slows the whole process down
1416 # Larger files benefit from being transferred one at a time.
1417 if initial_file_size > 100000000:
1418 # Check for network transfer lock
1419 lock_key = self.links.acquire_network_transfer_lock(address, transfer_limit=1, lock_type='send')
1420 if not lock_key:
1421 self.event.wait(1)
1422 continue
1423  
1424 # Send a file to a remote installation.
1425 self._log("Uploading file to remote installation '{}'".format(original_abspath), level='debug')
1426 info = self.links.send_file_to_remote_installation(self.installation_info, original_abspath)
1427 self.links.release_network_transfer_lock(lock_key)
1428 if not info:
1429 self._log("Failed to upload the file '{}'".format(original_abspath), level='error')
1430 self.__write_failure_to_worker_log()
1431 return False
1432 break
1433  
1434 # Set the remote task ID
1435 remote_task_id = info.get('id')
1436  
1437 # Compare uploaded file md5checksum
1438 if initial_checksum and info.get('checksum') != initial_checksum:
1439 self._log("The uploaded file did not return a correct checksum '{}'".format(original_abspath), level='error')
1440 # Send request to terminate the remote worker then return
1441 self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id)
1442 self.__write_failure_to_worker_log()
1443 return False
1444  
1445 # Ensure at this point we have set the remote_task_id
1446 if remote_task_id is None:
1447 self._log("Failed to create remote task. Var remote_task_id is still None", level='error')
1448 self.__write_failure_to_worker_log()
1449 return False
1450  
1451 # Set the library of the remote task using the library's name
1452 while not self.redundant_flag.is_set():
1453 result = self.links.set_the_remote_task_library(self.installation_info, remote_task_id, library_name)
1454 if result is None:
1455 # Unable to reach remote installation
1456 self.event.wait(2)
1457 continue
1458 if not result.get('success'):
1459 self._log(
1460 "Failed to match a remote library named '{}'. Remote installation will use the default library".format(
1461 library_name), level='warning')
1462 # Just log the warning for this. If no matching library name is found it will remain set as the default library
1463 break
1464 if result.get('success'):
1465 break
1466  
1467 # Start the remote task
1468 while not self.redundant_flag.is_set():
1469 result = self.links.start_the_remote_task_by_id(self.installation_info, remote_task_id)
1470 if not result:
1471 # Unable to reach remote installation
1472 self.event.wait(2)
1473 continue
1474 if not result.get('success'):
1475 self._log("Failed to set initial remote pending task to status '{}'".format(original_abspath), level='error')
1476 # Send request to terminate the remote worker then return
1477 self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id)
1478 self.__write_failure_to_worker_log()
1479 return False
1480 if result.get('success'):
1481 break
1482  
1483 # Loop while redundant_flag not set (while true because of below)
1484 worker_id = None
1485 task_status = ''
1486 last_status_fetch = 0
1487 polling_delay = 5
1488 while task_status != 'complete':
1489 self.event.wait(1)
1490 if self.redundant_flag.is_set():
1491 # Send request to terminate the remote worker then exit
1492 if worker_id:
1493 self.links.terminate_remote_worker(self.installation_info, worker_id)
1494 break
1495  
1496 # Only fetch the status every 5 seconds
1497 time_now = time.time()
1498 if last_status_fetch > (time_now - polling_delay):
1499 continue
1500  
1501 # Fetch task status
1502 all_task_states = self.links.get_remote_pending_task_state(self.installation_info, remote_task_id)
1503 task_status = ''
1504 polling_delay = 5
1505 if all_task_states:
1506 for ts in all_task_states.get('results', []):
1507 if str(ts.get('id')) == str(remote_task_id):
1508 # Task is complete. Exit loop but do not set redundant flag on link manager
1509 task_status = ts.get('status')
1510 break
1511 if not all_task_states.get('results', []):
1512 # Remote task list is empty
1513 task_status = 'removed'
1514 elif all_task_states.get('results') and task_status == '':
1515 # Remote task list did not contain this task
1516 task_status = 'removed'
1517  
1518 # If the task status is 'complete', break the loop here and move onto the result retrieval
1519 # If all_task_states returned no results (we are unable to connect to the remote installation)
1520 # If all_task_states did return results but our task_status was found, the remote installation has removed our task
1521 # If the task status is not 'in_progress', loop here and wait for task to be picked up by a worker
1522 if task_status == 'complete':
1523 break
1524 elif not all_task_states:
1525 polling_delay = 10
1526 last_status_fetch = time_now
1527 continue
1528 elif task_status == 'removed':
1529 self._log("Task has been removed by remote installation '{}'".format(original_abspath), level='error')
1530 self.__write_failure_to_worker_log()
1531 return False
1532 elif task_status != 'in_progress':
1533 # Mark this as the last time run
1534 last_status_fetch = time_now
1535 polling_delay = 10
1536 continue
1537  
1538 # Check if we know the task's worker ID already
1539 if not worker_id:
1540 # The task has been picked up by a worker, find out which one...
1541 workers_status = self.links.get_all_worker_status(self.installation_info)
1542 if not workers_status:
1543 # The request failed for some reason... Perhaps we lost contact with the remote installation
1544 # Mark this as the last time run
1545 last_status_fetch = time_now
1546 continue
1547 for worker in workers_status:
1548 if str(worker.get('current_task')) == str(remote_task_id):
1549 worker_id = worker.get('id')
1550  
1551 # Fetch worker progress
1552 worker_status = self.links.get_single_worker_status(self.installation_info, worker_id)
1553 if not worker_status:
1554 # Mark this as the last time run
1555 last_status_fetch = time_now
1556 continue
1557  
1558 # Update status
1559 self.paused = worker_status.get('paused')
1560 self.worker_log = worker_status.get('worker_log_tail')
1561 self.worker_runners_info = worker_status.get('runners_info')
1562 self.worker_subprocess_percent = worker_status.get('subprocess', {}).get('percent')
1563 self.worker_subprocess_elapsed = worker_status.get('subprocess', {}).get('elapsed')
1564  
1565 # Mark this as the last time run
1566 last_status_fetch = time_now
1567  
1568 # If the previous loop was broken because this tread needs to terminate, return False here (did not complete)
1569 if self.redundant_flag.is_set():
1570 self.worker_log += ["\n\nREMOTE LINK MANAGER TERMINATED!"]
1571 self.current_task.save_command_log(self.worker_log)
1572 return False
1573  
1574 self._log("Remote task completed '{}'".format(original_abspath), level='info')
1575  
1576 # Create local cache path to download results
1577 task_cache_path = self.current_task.get_cache_path()
1578 # Ensure the cache directory exists
1579 cache_directory = os.path.dirname(os.path.abspath(task_cache_path))
1580 if not os.path.exists(cache_directory):
1581 os.makedirs(cache_directory)
1582  
1583 # Fetch remote task result data
1584 data = self.links.fetch_remote_task_data(self.installation_info, remote_task_id,
1585 os.path.join(cache_directory, 'remote_data.json'))
1586  
1587 if not data:
1588 self._log(
1589 "Failed to retrieve remote task data for '{}'. NOTE: The cached files have not been removed from the remote host.".format(
1590 original_abspath), level='error')
1591 self.__write_failure_to_worker_log()
1592 return False
1593 self.worker_log = [data.get('log')]
1594  
1595 # Save the completed command log
1596 self.current_task.save_command_log(self.worker_log)
1597  
1598 # Fetch remote task file
1599 if data.get('task_success'):
1600 task_label = data.get('task_label')
1601 self._log(
1602 "Remote task #{} was successful, proceeding to download the completed file '{}'".format(remote_task_id,
1603 task_label),
1604 level='debug')
1605 # Set the new file out as the extension may have changed
1606 split_file_name = os.path.splitext(data.get('abspath'))
1607 file_extension = split_file_name[1].lstrip('.')
1608 self.current_task.set_cache_path(cache_directory, file_extension)
1609 # Read the updated cache path
1610 task_cache_path = self.current_task.get_cache_path()
1611  
1612 # Loop until we are able to upload the file to the remote installation
1613 while not self.redundant_flag.is_set():
1614 # Check for network transfer lock
1615 lock_key = self.links.acquire_network_transfer_lock(address, transfer_limit=2, lock_type='receive')
1616 if not lock_key:
1617 self.event.wait(1)
1618 continue
1619 # Download the file
1620 self._log("Downloading file from remote installation '{}'".format(task_label), level='debug')
1621 success = self.links.fetch_remote_task_completed_file(self.installation_info, remote_task_id, task_cache_path)
1622 self.links.release_network_transfer_lock(lock_key)
1623 if not success:
1624 self._log("Failed to download file '{}'".format(os.path.basename(data.get('abspath'))), level='error')
1625 # Send request to terminate the remote worker then return
1626 self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id)
1627 self.__write_failure_to_worker_log()
1628 return False
1629 break
1630  
1631 # If the previous loop was broken because this tread needs to terminate, return False here (did not complete)
1632 if self.redundant_flag.is_set():
1633 self.worker_log += ["\n\nREMOTE LINK MANAGER TERMINATED!"]
1634 self.current_task.save_command_log(self.worker_log)
1635 return False
1636  
1637 # Match checksum from task result data with downloaded file
1638 if self.installation_info.get('enable_checksum_validation', False):
1639 downloaded_checksum = common.get_file_checksum(task_cache_path)
1640 if downloaded_checksum != data.get('checksum'):
1641 self._log("The downloaded file did not produce a correct checksum '{}'".format(task_cache_path),
1642 level='error')
1643 # Send request to terminate the remote worker then return
1644 self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id)
1645 self.__write_failure_to_worker_log()
1646 return False
1647  
1648 # Send request to terminate the remote worker then return
1649 self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id)
1650  
1651 return True
1652  
1653 self.__write_failure_to_worker_log()
1654 return False